4 Commits

20 changed files with 890 additions and 1005 deletions

View File

@@ -5,84 +5,26 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.4.6] - 2025-09-13 ## [Unreleased]
### Added ### Added
- **Quality-Based CDM Selection**: Dynamic CDM selection based on video resolution - **Custom Output Templates**: Flexible filename customization system
- Automatically selects appropriate CDM (L3/L1) based on video track quality - New `output_template` configuration in unshackle.yaml for movies, series, and songs
- Supports quality thresholds in configuration (>=, >, <=, <, exact match) - Support for conditional variables using `?` suffix (e.g., `{year?}`, `{hdr?}`)
- Pre-selects optimal CDM based on highest quality across all video tracks - Comprehensive template variables for title, quality, audio, video, and metadata
- Maintains backward compatibility with existing CDM configurations - Multiple naming styles: Scene-style (dot-separated), Plex-friendly (space-separated), minimal, custom
- **Automatic Audio Language Metadata**: Intelligent embedded audio language detection - Automatic template validation and enhanced error handling
- Automatically sets audio language metadata when no separate audio tracks exist - **Full backward compatibility**: Old `scene_naming` option still works and automatically converts to equivalent templates
- Smart video track selection based on title language with fallbacks - Folder naming now follows series template patterns (excluding episode-specific variables)
- Enhanced FFmpeg repackaging with audio stream metadata injection - Deprecation warnings guide users to migrate from `scene_naming` to `output_template`
- **Lazy DRM Loading**: Deferred DRM loading for multi-track key retrieval optimization
- Add deferred DRM loading to M3U8 parser to mark tracks for later processing
- Just-in-time DRM loading during download process for better performance
### Changed ### Changed
- **Enhanced CDM Management**: Improved CDM switching logic for multi-quality downloads - **Filename Generation**: Updated all title classes (Movie, Episode, Song) to use new template system
- CDM selection now based on highest quality track to avoid inefficient switching - Enhanced context building for template variable substitution
- Quality-based selection only within same DRM type (Widevine-to-Widevine, PlayReady-to-PlayReady) - Improved separator handling based on template style detection
- Single CDM used per session for better performance and reliability - Better handling of conditional content like HDR, Atmos, and multi-language audio
### Fixed
- **Vault Caching Issues**: Fixed vault count display and NoneType iteration errors
- Fix 'NoneType' object is not iterable error in DecryptLabsRemoteCDM
- Fix vault count display showing 0/3 instead of actual successful vault count
- **Service Name Transmission**: Resolved DecryptLabsRemoteCDM service name issues
- Fixed DecryptLabsRemoteCDM sending 'generic' instead of proper service names
- Added case-insensitive vault lookups for SQLite/MySQL vaults
- Added local vault integration to DecryptLabsRemoteCDM
- **Import Organization**: Improved import ordering and code formatting
- Reorder imports in decrypt_labs_remote_cdm.py for better organization
- Clean up trailing whitespace in vault files
### Configuration
- **New CDM Configuration Format**: Extended `cdm:` section supports quality-based selection
```yaml
cdm:
SERVICE_NAME:
"<=1080": l3_cdm_name
">1080": l1_cdm_name
default: l3_cdm_name
```
## [1.4.5] - 2025-09-09
### Added
- **Enhanced CDM Key Caching**: Improved key caching and session management for L1/L2 devices
- Optimized `get_cached_keys_if_exists` functionality for better performance with L1/L2 devices
- Enhanced cached key retrieval logic with improved session handling
- **Widevine Common Certificate Fallback**: Added fallback to Widevine common certificate for L1 devices
- Improved compatibility for L1 devices when service certificates are unavailable
- **Enhanced Vault Loading**: Improved vault loading and key copying logic
- Better error handling and key management in vault operations
- **PSSH Display Optimization**: Truncated PSSH string display in non-debug mode for cleaner output
- **CDM Error Messaging**: Added error messages for missing service certificates in CDM sessions
### Changed
- **Dynamic Version Headers**: Updated User-Agent headers to use dynamic version strings
- DecryptLabsRemoteCDM now uses dynamic version import instead of hardcoded version
- **Intelligent CDM Caching**: Implemented intelligent caching system for CDM license requests
- Enhanced caching logic reduces redundant license requests and improves performance
- **Enhanced Tag Handling**: Improved tag handling for TV shows and movies from Simkl data
- Better metadata processing and formatting for improved media tagging
### Fixed
- **CDM Session Management**: Clean up session data when retrieving cached keys
- Remove decrypt_labs_session_id and challenge from session when cached keys exist but there are missing kids
- Ensures clean state for subsequent requests and prevents session conflicts
- **Tag Formatting**: Fixed formatting issues in tag processing
- **Import Order**: Fixed import order issues in tags module
## [1.4.4] - 2025-09-02 ## [1.4.4] - 2025-09-02

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project] [project]
name = "unshackle" name = "unshackle"
version = "1.4.6" version = "1.4.4"
description = "Modular Movie, TV, and Music Archival Software." description = "Modular Movie, TV, and Music Archival Software."
authors = [{ name = "unshackle team" }] authors = [{ name = "unshackle team" }]
requires-python = ">=3.10,<3.13" requires-python = ">=3.10,<3.13"

View File

@@ -66,18 +66,6 @@ from unshackle.core.vaults import Vaults
class dl: class dl:
@staticmethod
def _truncate_pssh_for_display(pssh_string: str, drm_type: str) -> str:
"""Truncate PSSH string for display when not in debug mode."""
if logging.root.level == logging.DEBUG or not pssh_string:
return pssh_string
max_width = console.width - len(drm_type) - 12
if len(pssh_string) <= max_width:
return pssh_string
return pssh_string[: max_width - 3] + "..."
@click.command( @click.command(
short_help="Download, Decrypt, and Mux tracks for titles from a Service.", short_help="Download, Decrypt, and Mux tracks for titles from a Service.",
cls=Services, cls=Services,
@@ -862,40 +850,9 @@ class dl:
selected_tracks, tracks_progress_callables = title.tracks.tree(add_progress=True) selected_tracks, tracks_progress_callables = title.tracks.tree(add_progress=True)
for track in title.tracks:
if hasattr(track, "needs_drm_loading") and track.needs_drm_loading:
track.load_drm_if_needed(service)
download_table = Table.grid() download_table = Table.grid()
download_table.add_row(selected_tracks) download_table.add_row(selected_tracks)
video_tracks = title.tracks.videos
if video_tracks:
highest_quality = max((track.height for track in video_tracks if track.height), default=0)
if highest_quality > 0:
if isinstance(self.cdm, (WidevineCdm, DecryptLabsRemoteCDM)) and not (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
):
quality_based_cdm = self.get_cdm(
self.service, self.profile, drm="widevine", quality=highest_quality
)
if quality_based_cdm and quality_based_cdm != self.cdm:
self.log.info(
f"Pre-selecting Widevine CDM based on highest quality {highest_quality}p across all video tracks"
)
self.cdm = quality_based_cdm
elif isinstance(self.cdm, (PlayReadyCdm, DecryptLabsRemoteCDM)) and (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
):
quality_based_cdm = self.get_cdm(
self.service, self.profile, drm="playready", quality=highest_quality
)
if quality_based_cdm and quality_based_cdm != self.cdm:
self.log.info(
f"Pre-selecting PlayReady CDM based on highest quality {highest_quality}p across all video tracks"
)
self.cdm = quality_based_cdm
dl_start_time = time.time() dl_start_time = time.time()
if skip_dl: if skip_dl:
@@ -1178,13 +1135,8 @@ class dl:
with Live(Padding(progress, (0, 5, 1, 5)), console=console): with Live(Padding(progress, (0, 5, 1, 5)), console=console):
for task_id, task_tracks in multiplex_tasks: for task_id, task_tracks in multiplex_tasks:
progress.start_task(task_id) # TODO: Needed? progress.start_task(task_id) # TODO: Needed?
audio_expected = not video_only and not no_audio
muxed_path, return_code, errors = task_tracks.mux( muxed_path, return_code, errors = task_tracks.mux(
str(title), str(title), progress=partial(progress.update, task_id=task_id), delete=False
progress=partial(progress.update, task_id=task_id),
delete=False,
audio_expected=audio_expected,
title_language=title.language,
) )
muxed_paths.append(muxed_path) muxed_paths.append(muxed_path)
if return_code >= 2: if return_code >= 2:
@@ -1257,9 +1209,6 @@ class dl:
if not drm: if not drm:
return return
if isinstance(track, Video) and track.height:
pass
if isinstance(drm, Widevine): if isinstance(drm, Widevine):
if not isinstance(self.cdm, (WidevineCdm, DecryptLabsRemoteCDM)) or ( if not isinstance(self.cdm, (WidevineCdm, DecryptLabsRemoteCDM)) or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
@@ -1268,7 +1217,6 @@ class dl:
if widevine_cdm: if widevine_cdm:
self.log.info("Switching to Widevine CDM for Widevine content") self.log.info("Switching to Widevine CDM for Widevine content")
self.cdm = widevine_cdm self.cdm = widevine_cdm
elif isinstance(drm, PlayReady): elif isinstance(drm, PlayReady):
if not isinstance(self.cdm, (PlayReadyCdm, DecryptLabsRemoteCDM)) or ( if not isinstance(self.cdm, (PlayReadyCdm, DecryptLabsRemoteCDM)) or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and not self.cdm.is_playready isinstance(self.cdm, DecryptLabsRemoteCDM) and not self.cdm.is_playready
@@ -1280,20 +1228,14 @@ class dl:
if isinstance(drm, Widevine): if isinstance(drm, Widevine):
with self.DRM_TABLE_LOCK: with self.DRM_TABLE_LOCK:
pssh_display = self._truncate_pssh_for_display(drm.pssh.dumps(), "Widevine") cek_tree = Tree(Text.assemble(("Widevine", "cyan"), (f"({drm.pssh.dumps()})", "text"), overflow="fold"))
cek_tree = Tree(Text.assemble(("Widevine", "cyan"), (f"({pssh_display})", "text"), overflow="fold"))
pre_existing_tree = next( pre_existing_tree = next(
(x for x in table.columns[0].cells if isinstance(x, Tree) and x.label == cek_tree.label), None (x for x in table.columns[0].cells if isinstance(x, Tree) and x.label == cek_tree.label), None
) )
if pre_existing_tree: if pre_existing_tree:
cek_tree = pre_existing_tree cek_tree = pre_existing_tree
need_license = False for kid in drm.kids:
all_kids = list(drm.kids)
if track_kid and track_kid not in all_kids:
all_kids.append(track_kid)
for kid in all_kids:
if kid in drm.content_keys: if kid in drm.content_keys:
continue continue
@@ -1313,51 +1255,46 @@ class dl:
if not pre_existing_tree: if not pre_existing_tree:
table.add_row(cek_tree) table.add_row(cek_tree)
raise Widevine.Exceptions.CEKNotFound(msg) raise Widevine.Exceptions.CEKNotFound(msg)
else:
need_license = True
if kid not in drm.content_keys and cdm_only: if kid not in drm.content_keys and not vaults_only:
need_license = True from_vaults = drm.content_keys.copy()
if need_license and not vaults_only: try:
from_vaults = drm.content_keys.copy() if self.service == "NF":
drm.get_NF_content_keys(cdm=self.cdm, licence=licence, certificate=certificate)
else:
drm.get_content_keys(cdm=self.cdm, licence=licence, certificate=certificate)
except Exception as e:
if isinstance(e, (Widevine.Exceptions.EmptyLicense, Widevine.Exceptions.CEKNotFound)):
msg = str(e)
else:
msg = f"An exception occurred in the Service's license function: {e}"
cek_tree.add(f"[logging.level.error]{msg}")
if not pre_existing_tree:
table.add_row(cek_tree)
raise e
try: for kid_, key in drm.content_keys.items():
if self.service == "NF": if key == "0" * 32:
drm.get_NF_content_keys(cdm=self.cdm, licence=licence, certificate=certificate) key = f"[red]{key}[/]"
else: label = f"[text2]{kid_.hex}:{key}{is_track_kid}"
drm.get_content_keys(cdm=self.cdm, licence=licence, certificate=certificate) if not any(f"{kid_.hex}:{key}" in x.label for x in cek_tree.children):
except Exception as e: cek_tree.add(label)
if isinstance(e, (Widevine.Exceptions.EmptyLicense, Widevine.Exceptions.CEKNotFound)):
msg = str(e)
else:
msg = f"An exception occurred in the Service's license function: {e}"
cek_tree.add(f"[logging.level.error]{msg}")
if not pre_existing_tree:
table.add_row(cek_tree)
raise e
for kid_, key in drm.content_keys.items(): drm.content_keys = {
if key == "0" * 32: kid_: key for kid_, key in drm.content_keys.items() if key and key.count("0") != len(key)
key = f"[red]{key}[/]" }
is_track_kid_marker = ["", "*"][kid_ == track_kid]
label = f"[text2]{kid_.hex}:{key}{is_track_kid_marker}"
if not any(f"{kid_.hex}:{key}" in x.label for x in cek_tree.children):
cek_tree.add(label)
drm.content_keys = { # The CDM keys may have returned blank content keys for KIDs we got from vaults.
kid_: key for kid_, key in drm.content_keys.items() if key and key.count("0") != len(key) # So we re-add the keys from vaults earlier overwriting blanks or removed KIDs data.
} drm.content_keys.update(from_vaults)
# The CDM keys may have returned blank content keys for KIDs we got from vaults. successful_caches = self.vaults.add_keys(drm.content_keys)
# So we re-add the keys from vaults earlier overwriting blanks or removed KIDs data. self.log.info(
drm.content_keys.update(from_vaults) f"Cached {len(drm.content_keys)} Key{'' if len(drm.content_keys) == 1 else 's'} to "
f"{successful_caches}/{len(self.vaults)} Vaults"
successful_caches = self.vaults.add_keys(drm.content_keys) )
self.log.info( break # licensing twice will be unnecessary
f"Cached {len(drm.content_keys)} Key{'' if len(drm.content_keys) == 1 else 's'} to "
f"{successful_caches}/{len(self.vaults)} Vaults"
)
if track_kid and track_kid not in drm.content_keys: if track_kid and track_kid not in drm.content_keys:
msg = f"No Content Key for KID {track_kid.hex} was returned in the License" msg = f"No Content Key for KID {track_kid.hex} was returned in the License"
@@ -1383,11 +1320,10 @@ class dl:
elif isinstance(drm, PlayReady): elif isinstance(drm, PlayReady):
with self.DRM_TABLE_LOCK: with self.DRM_TABLE_LOCK:
pssh_display = self._truncate_pssh_for_display(drm.pssh_b64 or "", "PlayReady")
cek_tree = Tree( cek_tree = Tree(
Text.assemble( Text.assemble(
("PlayReady", "cyan"), ("PlayReady", "cyan"),
(f"({pssh_display})", "text"), (f"({drm.pssh_b64 or ''})", "text"),
overflow="fold", overflow="fold",
) )
) )
@@ -1397,12 +1333,7 @@ class dl:
if pre_existing_tree: if pre_existing_tree:
cek_tree = pre_existing_tree cek_tree = pre_existing_tree
need_license = False for kid in drm.kids:
all_kids = list(drm.kids)
if track_kid and track_kid not in all_kids:
all_kids.append(track_kid)
for kid in all_kids:
if kid in drm.content_keys: if kid in drm.content_keys:
continue continue
@@ -1422,40 +1353,35 @@ class dl:
if not pre_existing_tree: if not pre_existing_tree:
table.add_row(cek_tree) table.add_row(cek_tree)
raise PlayReady.Exceptions.CEKNotFound(msg) raise PlayReady.Exceptions.CEKNotFound(msg)
else:
need_license = True
if kid not in drm.content_keys and cdm_only: if kid not in drm.content_keys and not vaults_only:
need_license = True from_vaults = drm.content_keys.copy()
if need_license and not vaults_only: try:
from_vaults = drm.content_keys.copy() drm.get_content_keys(cdm=self.cdm, licence=licence, certificate=certificate)
except Exception as e:
if isinstance(e, (PlayReady.Exceptions.EmptyLicense, PlayReady.Exceptions.CEKNotFound)):
msg = str(e)
else:
msg = f"An exception occurred in the Service's license function: {e}"
cek_tree.add(f"[logging.level.error]{msg}")
if not pre_existing_tree:
table.add_row(cek_tree)
raise e
try: for kid_, key in drm.content_keys.items():
drm.get_content_keys(cdm=self.cdm, licence=licence, certificate=certificate) label = f"[text2]{kid_.hex}:{key}{is_track_kid}"
except Exception as e: if not any(f"{kid_.hex}:{key}" in x.label for x in cek_tree.children):
if isinstance(e, (PlayReady.Exceptions.EmptyLicense, PlayReady.Exceptions.CEKNotFound)): cek_tree.add(label)
msg = str(e)
else:
msg = f"An exception occurred in the Service's license function: {e}"
cek_tree.add(f"[logging.level.error]{msg}")
if not pre_existing_tree:
table.add_row(cek_tree)
raise e
for kid_, key in drm.content_keys.items(): drm.content_keys.update(from_vaults)
is_track_kid_marker = ["", "*"][kid_ == track_kid]
label = f"[text2]{kid_.hex}:{key}{is_track_kid_marker}"
if not any(f"{kid_.hex}:{key}" in x.label for x in cek_tree.children):
cek_tree.add(label)
drm.content_keys.update(from_vaults) successful_caches = self.vaults.add_keys(drm.content_keys)
self.log.info(
successful_caches = self.vaults.add_keys(drm.content_keys) f"Cached {len(drm.content_keys)} Key{'' if len(drm.content_keys) == 1 else 's'} to "
self.log.info( f"{successful_caches}/{len(self.vaults)} Vaults"
f"Cached {len(drm.content_keys)} Key{'' if len(drm.content_keys) == 1 else 's'} to " )
f"{successful_caches}/{len(self.vaults)} Vaults" break
)
if track_kid and track_kid not in drm.content_keys: if track_kid and track_kid not in drm.content_keys:
msg = f"No Content Key for KID {track_kid.hex} was returned in the License" msg = f"No Content Key for KID {track_kid.hex} was returned in the License"
@@ -1541,11 +1467,9 @@ class dl:
service: str, service: str,
profile: Optional[str] = None, profile: Optional[str] = None,
drm: Optional[str] = None, drm: Optional[str] = None,
quality: Optional[int] = None,
) -> Optional[object]: ) -> Optional[object]:
""" """
Get CDM for a specified service (either Local or Remote CDM). Get CDM for a specified service (either Local or Remote CDM).
Now supports quality-based selection when quality is provided.
Raises a ValueError if there's a problem getting a CDM. Raises a ValueError if there's a problem getting a CDM.
""" """
cdm_name = config.cdm.get(service) or config.cdm.get("default") cdm_name = config.cdm.get(service) or config.cdm.get("default")
@@ -1553,82 +1477,23 @@ class dl:
return None return None
if isinstance(cdm_name, dict): if isinstance(cdm_name, dict):
if quality: lower_keys = {k.lower(): v for k, v in cdm_name.items()}
quality_match = None if {"widevine", "playready"} & lower_keys.keys():
quality_keys = [] drm_key = None
if drm:
for key in cdm_name.keys(): drm_key = {
if ( "wv": "widevine",
isinstance(key, str) "widevine": "widevine",
and any(op in key for op in [">=", ">", "<=", "<"]) "pr": "playready",
or (isinstance(key, str) and key.isdigit()) "playready": "playready",
): }.get(drm.lower())
quality_keys.append(key) cdm_name = lower_keys.get(drm_key or "widevine") or lower_keys.get("playready")
else:
def sort_quality_key(key): if not profile:
if key.isdigit():
return (0, int(key)) # Exact matches first
elif key.startswith(">="):
return (1, -int(key[2:])) # >= descending
elif key.startswith(">"):
return (1, -int(key[1:])) # > descending
elif key.startswith("<="):
return (2, int(key[2:])) # <= ascending
elif key.startswith("<"):
return (2, int(key[1:])) # < ascending
return (3, 0) # Other keys last
quality_keys.sort(key=sort_quality_key)
for key in quality_keys:
if key.isdigit() and quality == int(key):
quality_match = cdm_name[key]
self.log.info(f"Selected CDM based on exact quality match {quality}p: {quality_match}")
break
elif key.startswith(">="):
threshold = int(key[2:])
if quality >= threshold:
quality_match = cdm_name[key]
self.log.info(f"Selected CDM based on quality {quality}p >= {threshold}p: {quality_match}")
break
elif key.startswith(">"):
threshold = int(key[1:])
if quality > threshold:
quality_match = cdm_name[key]
self.log.info(f"Selected CDM based on quality {quality}p > {threshold}p: {quality_match}")
break
elif key.startswith("<="):
threshold = int(key[2:])
if quality <= threshold:
quality_match = cdm_name[key]
self.log.info(f"Selected CDM based on quality {quality}p <= {threshold}p: {quality_match}")
break
elif key.startswith("<"):
threshold = int(key[1:])
if quality < threshold:
quality_match = cdm_name[key]
self.log.info(f"Selected CDM based on quality {quality}p < {threshold}p: {quality_match}")
break
if quality_match:
cdm_name = quality_match
if isinstance(cdm_name, dict):
lower_keys = {k.lower(): v for k, v in cdm_name.items()}
if {"widevine", "playready"} & lower_keys.keys():
drm_key = None
if drm:
drm_key = {
"wv": "widevine",
"widevine": "widevine",
"pr": "playready",
"playready": "playready",
}.get(drm.lower())
cdm_name = lower_keys.get(drm_key or "widevine") or lower_keys.get("playready")
else:
cdm_name = cdm_name.get(profile) or cdm_name.get("default") or config.cdm.get("default")
if not cdm_name:
return None return None
cdm_name = cdm_name.get(profile) or config.cdm.get("default")
if not cdm_name:
return None
cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None) cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None)
if cdm_api: if cdm_api:

View File

@@ -12,113 +12,84 @@ from unshackle.core.vault import Vault
from unshackle.core.vaults import Vaults from unshackle.core.vaults import Vaults
def _load_vaults(vault_names: list[str]) -> Vaults:
"""Load and validate vaults by name."""
vaults = Vaults()
for vault_name in vault_names:
vault_config = next((x for x in config.key_vaults if x["name"] == vault_name), None)
if not vault_config:
raise click.ClickException(f"Vault ({vault_name}) is not defined in the config.")
vault_type = vault_config["type"]
vault_args = vault_config.copy()
del vault_args["type"]
if not vaults.load(vault_type, **vault_args):
raise click.ClickException(f"Failed to load vault ({vault_name}).")
return vaults
def _process_service_keys(from_vault: Vault, service: str, log: logging.Logger) -> dict[str, str]:
"""Get and validate keys from a vault for a specific service."""
content_keys = list(from_vault.get_keys(service))
bad_keys = {kid: key for kid, key in content_keys if not key or key.count("0") == len(key)}
for kid, key in bad_keys.items():
log.warning(f"Skipping NULL key: {kid}:{key}")
return {kid: key for kid, key in content_keys if kid not in bad_keys}
def _copy_service_data(to_vault: Vault, from_vault: Vault, service: str, log: logging.Logger) -> int:
"""Copy data for a single service between vaults."""
content_keys = _process_service_keys(from_vault, service, log)
total_count = len(content_keys)
if total_count == 0:
log.info(f"{service}: No keys found in {from_vault}")
return 0
try:
added = to_vault.add_keys(service, content_keys)
except PermissionError:
log.warning(f"{service}: No permission to create table in {to_vault}, skipped")
return 0
existed = total_count - added
if added > 0 and existed > 0:
log.info(f"{service}: {added} added, {existed} skipped ({total_count} total)")
elif added > 0:
log.info(f"{service}: {added} added ({total_count} total)")
else:
log.info(f"{service}: {existed} skipped (all existed)")
return added
@click.group(short_help="Manage and configure Key Vaults.", context_settings=context_settings) @click.group(short_help="Manage and configure Key Vaults.", context_settings=context_settings)
def kv() -> None: def kv() -> None:
"""Manage and configure Key Vaults.""" """Manage and configure Key Vaults."""
@kv.command() @kv.command()
@click.argument("to_vault_name", type=str) @click.argument("to_vault", type=str)
@click.argument("from_vault_names", nargs=-1, type=click.UNPROCESSED) @click.argument("from_vaults", nargs=-1, type=click.UNPROCESSED)
@click.option("-s", "--service", type=str, default=None, help="Only copy data to and from a specific service.") @click.option("-s", "--service", type=str, default=None, help="Only copy data to and from a specific service.")
def copy(to_vault_name: str, from_vault_names: list[str], service: Optional[str] = None) -> None: def copy(to_vault: str, from_vaults: list[str], service: Optional[str] = None) -> None:
""" """
Copy data from multiple Key Vaults into a single Key Vault. Copy data from multiple Key Vaults into a single Key Vault.
Rows with matching KIDs are skipped unless there's no KEY set. Rows with matching KIDs are skipped unless there's no KEY set.
Existing data is not deleted or altered. Existing data is not deleted or altered.
The `to_vault_name` argument is the key vault you wish to copy data to. The `to_vault` argument is the key vault you wish to copy data to.
It should be the name of a Key Vault defined in the config. It should be the name of a Key Vault defined in the config.
The `from_vault_names` argument is the key vault(s) you wish to take The `from_vaults` argument is the key vault(s) you wish to take
data from. You may supply multiple key vaults. data from. You may supply multiple key vaults.
""" """
if not from_vault_names: if not from_vaults:
raise click.ClickException("No Vaults were specified to copy data from.") raise click.ClickException("No Vaults were specified to copy data from.")
log = logging.getLogger("kv") log = logging.getLogger("kv")
all_vault_names = [to_vault_name] + list(from_vault_names) vaults = Vaults()
vaults = _load_vaults(all_vault_names) for vault_name in [to_vault] + list(from_vaults):
vault = next((x for x in config.key_vaults if x["name"] == vault_name), None)
if not vault:
raise click.ClickException(f"Vault ({vault_name}) is not defined in the config.")
vault_type = vault["type"]
vault_args = vault.copy()
del vault_args["type"]
if not vaults.load(vault_type, **vault_args):
raise click.ClickException(f"Failed to load vault ({vault_name}).")
to_vault = vaults.vaults[0] to_vault: Vault = vaults.vaults[0]
from_vaults = vaults.vaults[1:] from_vaults: list[Vault] = vaults.vaults[1:]
vault_names = ", ".join([v.name for v in from_vaults])
log.info(f"Copying data from {vault_names}{to_vault.name}")
log.info(f"Copying data from {', '.join([x.name for x in from_vaults])}, into {to_vault.name}")
if service: if service:
service = Services.get_tag(service) service = Services.get_tag(service)
log.info(f"Filtering by service: {service}") log.info(f"Only copying data for service {service}")
total_added = 0 total_added = 0
for from_vault in from_vaults: for from_vault in from_vaults:
services_to_copy = [service] if service else from_vault.get_services() if service:
services = [service]
else:
services = from_vault.get_services()
for service_ in services:
log.info(f"Getting data from {from_vault} for {service_}")
content_keys = list(from_vault.get_keys(service_)) # important as it's a generator we iterate twice
bad_keys = {kid: key for kid, key in content_keys if not key or key.count("0") == len(key)}
for kid, key in bad_keys.items():
log.warning(f"Cannot add a NULL Content Key to a Vault, skipping: {kid}:{key}")
content_keys = {kid: key for kid, key in content_keys if kid not in bad_keys}
total_count = len(content_keys)
log.info(f"Adding {total_count} Content Keys to {to_vault} for {service_}")
try:
added = to_vault.add_keys(service_, content_keys)
except PermissionError:
log.warning(f" - No permission to create table ({service_}) in {to_vault}, skipping...")
continue
for service_tag in services_to_copy:
added = _copy_service_data(to_vault, from_vault, service_tag, log)
total_added += added total_added += added
existed = total_count - added
if total_added > 0: log.info(f"{to_vault} ({service_}): {added} newly added, {existed} already existed (skipped)")
log.info(f"Successfully added {total_added} new keys to {to_vault}")
else: log.info(f"{to_vault}: {total_added} total newly added")
log.info("Copy completed - no new keys to add")
@kv.command() @kv.command()
@@ -135,9 +106,9 @@ def sync(ctx: click.Context, vaults: list[str], service: Optional[str] = None) -
if not len(vaults) > 1: if not len(vaults) > 1:
raise click.ClickException("You must provide more than one Vault to sync.") raise click.ClickException("You must provide more than one Vault to sync.")
ctx.invoke(copy, to_vault_name=vaults[0], from_vault_names=vaults[1:], service=service) ctx.invoke(copy, to_vault=vaults[0], from_vaults=vaults[1:], service=service)
for i in range(1, len(vaults)): for i in range(1, len(vaults)):
ctx.invoke(copy, to_vault_name=vaults[i], from_vault_names=[vaults[i - 1]], service=service) ctx.invoke(copy, to_vault=vaults[i], from_vaults=[vaults[i - 1]], service=service)
@kv.command() @kv.command()
@@ -164,7 +135,15 @@ def add(file: Path, service: str, vaults: list[str]) -> None:
log = logging.getLogger("kv") log = logging.getLogger("kv")
service = Services.get_tag(service) service = Services.get_tag(service)
vaults_ = _load_vaults(list(vaults)) vaults_ = Vaults()
for vault_name in vaults:
vault = next((x for x in config.key_vaults if x["name"] == vault_name), None)
if not vault:
raise click.ClickException(f"Vault ({vault_name}) is not defined in the config.")
vault_type = vault["type"]
vault_args = vault.copy()
del vault_args["type"]
vaults_.load(vault_type, **vault_args)
data = file.read_text(encoding="utf8") data = file.read_text(encoding="utf8")
kid_keys: dict[str, str] = {} kid_keys: dict[str, str] = {}
@@ -194,7 +173,15 @@ def prepare(vaults: list[str]) -> None:
"""Create Service Tables on Vaults if not yet created.""" """Create Service Tables on Vaults if not yet created."""
log = logging.getLogger("kv") log = logging.getLogger("kv")
vaults_ = _load_vaults(vaults) vaults_ = Vaults()
for vault_name in vaults:
vault = next((x for x in config.key_vaults if x["name"] == vault_name), None)
if not vault:
raise click.ClickException(f"Vault ({vault_name}) is not defined in the config.")
vault_type = vault["type"]
vault_args = vault.copy()
del vault_args["type"]
vaults_.load(vault_type, **vault_args)
for vault in vaults_: for vault in vaults_:
if hasattr(vault, "has_table") and hasattr(vault, "create_table"): if hasattr(vault, "has_table") and hasattr(vault, "create_table"):

View File

@@ -1 +1 @@
__version__ = "1.4.6" __version__ = "1.4.4"

View File

@@ -6,11 +6,9 @@ from typing import Any, Dict, List, Optional, Union
from uuid import UUID from uuid import UUID
import requests import requests
from pywidevine.cdm import Cdm as WidevineCdm
from pywidevine.device import DeviceTypes from pywidevine.device import DeviceTypes
from requests import Session from requests import Session
from unshackle.core import __version__
from unshackle.core.vaults import Vaults from unshackle.core.vaults import Vaults
@@ -81,17 +79,15 @@ class DecryptLabsRemoteCDM:
Key Features: Key Features:
- Compatible with both Widevine and PlayReady DRM schemes - Compatible with both Widevine and PlayReady DRM schemes
- Intelligent caching that compares required vs. available keys - Intelligent caching that compares required vs. available keys
- Optimized caching for L1/L2 devices (leverages API auto-optimization)
- Automatic key combination for mixed cache/license scenarios - Automatic key combination for mixed cache/license scenarios
- Seamless fallback to license requests when keys are missing - Seamless fallback to license requests when keys are missing
Intelligent Caching System: Intelligent Caching System:
1. DRM classes (PlayReady/Widevine) provide required KIDs via set_required_kids() 1. DRM classes (PlayReady/Widevine) provide required KIDs via set_required_kids()
2. get_license_challenge() first checks for cached keys 2. get_license_challenge() first checks for cached keys
3. For L1/L2 devices, always attempts cached keys first (API optimized) 3. If cached keys satisfy requirements, returns empty challenge (no license needed)
4. If cached keys satisfy requirements, returns empty challenge (no license needed) 4. If keys are missing, makes targeted license request for remaining keys
5. If keys are missing, makes targeted license request for remaining keys 5. parse_license() combines cached and license keys intelligently
6. parse_license() combines cached and license keys intelligently
""" """
service_certificate_challenge = b"\x08\x04" service_certificate_challenge = b"\x08\x04"
@@ -151,7 +147,7 @@ class DecryptLabsRemoteCDM:
{ {
"decrypt-labs-api-key": self.secret, "decrypt-labs-api-key": self.secret,
"Content-Type": "application/json", "Content-Type": "application/json",
"User-Agent": f"unshackle-decrypt-labs-cdm/{__version__}", "User-Agent": "unshackle-decrypt-labs-cdm/1.0",
} }
) )
@@ -254,14 +250,12 @@ class DecryptLabsRemoteCDM:
"pssh": None, "pssh": None,
"challenge": None, "challenge": None,
"decrypt_labs_session_id": None, "decrypt_labs_session_id": None,
"tried_cache": False,
"cached_keys": None,
} }
return session_id return session_id
def close(self, session_id: bytes) -> None: def close(self, session_id: bytes) -> None:
""" """
Close a CDM session and perform comprehensive cleanup. Close a CDM session.
Args: Args:
session_id: Session identifier session_id: Session identifier
@@ -272,8 +266,6 @@ class DecryptLabsRemoteCDM:
if session_id not in self._sessions: if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
session = self._sessions[session_id]
session.clear()
del self._sessions[session_id] del self._sessions[session_id]
def get_service_certificate(self, session_id: bytes) -> Optional[bytes]: def get_service_certificate(self, session_id: bytes) -> Optional[bytes]:
@@ -312,13 +304,8 @@ class DecryptLabsRemoteCDM:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
if certificate is None: if certificate is None:
if not self._is_playready and self.device_name == "L1": self._sessions[session_id]["service_certificate"] = None
certificate = WidevineCdm.common_privacy_cert return "Removed"
self._sessions[session_id]["service_certificate"] = base64.b64decode(certificate)
return "Using default Widevine common privacy certificate for L1"
else:
self._sessions[session_id]["service_certificate"] = None
return "No certificate set (not required for this device type)"
if isinstance(certificate, str): if isinstance(certificate, str):
certificate = base64.b64decode(certificate) certificate = base64.b64decode(certificate)
@@ -353,19 +340,15 @@ class DecryptLabsRemoteCDM:
Generate a license challenge using Decrypt Labs API with intelligent caching. Generate a license challenge using Decrypt Labs API with intelligent caching.
This method implements smart caching logic that: This method implements smart caching logic that:
1. First checks local vaults for required keys 1. First attempts to retrieve cached keys from the API
2. Attempts to retrieve cached keys from the API 2. If required KIDs are set, compares cached keys against requirements
3. If required KIDs are set, compares available keys (vault + cached) against requirements 3. Only makes a license request if keys are missing
4. Only makes a license request if keys are missing 4. Returns empty challenge if all required keys are cached
5. Returns empty challenge if all required keys are available
The intelligent caching works as follows: The intelligent caching works as follows:
- Local vaults: Always checked first if available
- For L1/L2 devices: Always prioritizes cached keys (API automatically optimizes)
- For other devices: Uses cache retry logic based on session state
- With required KIDs set: Only requests license for missing keys - With required KIDs set: Only requests license for missing keys
- Without required KIDs: Returns any available cached keys - Without required KIDs: Returns any available cached keys
- For PlayReady: Combines vault, cached, and license keys seamlessly - For PlayReady: Combines cached keys with license keys seamlessly
Args: Args:
session_id: Session identifier session_id: Session identifier
@@ -374,7 +357,7 @@ class DecryptLabsRemoteCDM:
privacy_mode: Whether to use privacy mode - for compatibility only privacy_mode: Whether to use privacy mode - for compatibility only
Returns: Returns:
License challenge as bytes, or empty bytes if available keys satisfy requirements License challenge as bytes, or empty bytes if cached keys satisfy requirements
Raises: Raises:
InvalidSession: If session ID is invalid InvalidSession: If session ID is invalid
@@ -382,8 +365,6 @@ class DecryptLabsRemoteCDM:
Note: Note:
Call set_required_kids() before this method for optimal caching behavior. Call set_required_kids() before this method for optimal caching behavior.
L1/L2 devices automatically use cached keys when available per API design.
Local vault keys are always checked first when vaults are available.
""" """
_ = license_type, privacy_mode _ = license_type, privacy_mode
@@ -396,43 +377,13 @@ class DecryptLabsRemoteCDM:
init_data = self._get_init_data_from_pssh(pssh_or_wrm) init_data = self._get_init_data_from_pssh(pssh_or_wrm)
already_tried_cache = session.get("tried_cache", False) already_tried_cache = session.get("tried_cache", False)
if self.vaults and self._required_kids:
vault_keys = []
for kid_str in self._required_kids:
try:
clean_kid = kid_str.replace("-", "")
if len(clean_kid) == 32:
kid_uuid = UUID(hex=clean_kid)
else:
kid_uuid = UUID(hex=clean_kid.ljust(32, "0"))
key, _ = self.vaults.get_key(kid_uuid)
if key and key.count("0") != len(key):
vault_keys.append({"kid": kid_str, "key": key, "type": "CONTENT"})
except (ValueError, TypeError):
continue
if vault_keys:
vault_kids = set(k["kid"] for k in vault_keys)
required_kids = set(self._required_kids)
if required_kids.issubset(vault_kids):
session["keys"] = vault_keys
return b""
else:
session["vault_keys"] = vault_keys
if self.device_name in ["L1", "L2"]:
get_cached_keys = True
else:
get_cached_keys = not already_tried_cache
request_data = { request_data = {
"scheme": self.device_name, "scheme": self.device_name,
"init_data": init_data, "init_data": init_data,
"get_cached_keys_if_exists": get_cached_keys, "get_cached_keys_if_exists": not already_tried_cache,
} }
if self.service_name: if self.device_name in ["L1", "L2", "SL2", "SL3"] and self.service_name:
request_data["service"] = self.service_name request_data["service"] = self.service_name
if session["service_certificate"]: if session["service_certificate"]:
@@ -469,49 +420,22 @@ class DecryptLabsRemoteCDM:
""" """
cached_keys = data.get("cached_keys", []) cached_keys = data.get("cached_keys", [])
parsed_keys = self._parse_cached_keys(cached_keys) parsed_keys = self._parse_cached_keys(cached_keys)
session["keys"] = parsed_keys
all_available_keys = list(parsed_keys)
if "vault_keys" in session:
all_available_keys.extend(session["vault_keys"])
session["keys"] = all_available_keys
session["tried_cache"] = True session["tried_cache"] = True
if self._required_kids: if self._required_kids:
available_kids = set() cached_kids = set()
for key in all_available_keys: for key in parsed_keys:
if isinstance(key, dict) and "kid" in key: if isinstance(key, dict) and "kid" in key:
available_kids.add(key["kid"].replace("-", "").lower()) cached_kids.add(key["kid"].replace("-", "").lower())
required_kids = set(self._required_kids) required_kids = set(self._required_kids)
missing_kids = required_kids - available_kids missing_kids = required_kids - cached_kids
if missing_kids: if missing_kids:
session["cached_keys"] = parsed_keys session["cached_keys"] = parsed_keys
request_data["get_cached_keys_if_exists"] = False
if self.device_name in ["L1", "L2"]: response = self._http_session.post(f"{self.host}/get-request", json=request_data, timeout=30)
license_request_data = {
"scheme": self.device_name,
"init_data": init_data,
"get_cached_keys_if_exists": False,
}
if self.service_name:
license_request_data["service"] = self.service_name
if session["service_certificate"]:
license_request_data["service_certificate"] = base64.b64encode(
session["service_certificate"]
).decode("utf-8")
else:
license_request_data = request_data.copy()
license_request_data["get_cached_keys_if_exists"] = False
session["decrypt_labs_session_id"] = None
session["challenge"] = None
session["tried_cache"] = False
response = self._http_session.post(
f"{self.host}/get-request", json=license_request_data, timeout=30
)
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
if data.get("message") == "success" and "challenge" in data: if data.get("message") == "success" and "challenge" in data:
@@ -618,63 +542,50 @@ class DecryptLabsRemoteCDM:
license_keys = self._parse_keys_response(data) license_keys = self._parse_keys_response(data)
all_keys = [] if self.is_playready and "cached_keys" in session:
"""
Combine cached keys with license keys for PlayReady content.
if "vault_keys" in session: This ensures we have both the cached keys (obtained earlier) and
all_keys.extend(session["vault_keys"]) any additional keys from the license response, without duplicates.
"""
if "cached_keys" in session:
cached_keys = session.get("cached_keys", []) cached_keys = session.get("cached_keys", [])
if cached_keys: all_keys = list(cached_keys)
for cached_key in cached_keys:
all_keys.append(cached_key)
for license_key in license_keys: for license_key in license_keys:
already_exists = False already_exists = False
license_kid = None license_kid = None
if isinstance(license_key, dict) and "kid" in license_key: if isinstance(license_key, dict) and "kid" in license_key:
license_kid = license_key["kid"].replace("-", "").lower() license_kid = license_key["kid"].replace("-", "").lower()
elif hasattr(license_key, "kid"): elif hasattr(license_key, "kid"):
license_kid = str(license_key.kid).replace("-", "").lower() license_kid = str(license_key.kid).replace("-", "").lower()
elif hasattr(license_key, "key_id"): elif hasattr(license_key, "key_id"):
license_kid = str(license_key.key_id).replace("-", "").lower() license_kid = str(license_key.key_id).replace("-", "").lower()
if license_kid: if license_kid:
for existing_key in all_keys: for cached_key in cached_keys:
existing_kid = None cached_kid = None
if isinstance(existing_key, dict) and "kid" in existing_key: if isinstance(cached_key, dict) and "kid" in cached_key:
existing_kid = existing_key["kid"].replace("-", "").lower() cached_kid = cached_key["kid"].replace("-", "").lower()
elif hasattr(existing_key, "kid"): elif hasattr(cached_key, "kid"):
existing_kid = str(existing_key.kid).replace("-", "").lower() cached_kid = str(cached_key.kid).replace("-", "").lower()
elif hasattr(existing_key, "key_id"): elif hasattr(cached_key, "key_id"):
existing_kid = str(existing_key.key_id).replace("-", "").lower() cached_kid = str(cached_key.key_id).replace("-", "").lower()
if existing_kid == license_kid: if cached_kid == license_kid:
already_exists = True already_exists = True
break break
if not already_exists: if not already_exists:
all_keys.append(license_key) all_keys.append(license_key)
session["keys"] = all_keys session["keys"] = all_keys
session.pop("cached_keys", None) else:
session.pop("vault_keys", None) session["keys"] = license_keys
if self.vaults and session["keys"]: if self.vaults and session["keys"]:
key_dict = {} key_dict = {UUID(hex=key["kid"]): key["key"] for key in session["keys"] if key["type"] == "CONTENT"}
for key in session["keys"]: self.vaults.add_keys(key_dict)
if key["type"] == "CONTENT":
try:
clean_kid = key["kid"].replace("-", "")
if len(clean_kid) == 32:
kid_uuid = UUID(hex=clean_kid)
else:
kid_uuid = UUID(hex=clean_kid.ljust(32, "0"))
key_dict[kid_uuid] = key["key"]
except (ValueError, TypeError):
continue
if key_dict:
self.vaults.add_keys(key_dict)
def get_keys(self, session_id: bytes, type_: Optional[str] = None) -> List[Key]: def get_keys(self, session_id: bytes, type_: Optional[str] = None) -> List[Key]:
""" """

View File

@@ -1,5 +1,7 @@
from __future__ import annotations from __future__ import annotations
import re
import warnings
from pathlib import Path from pathlib import Path
from typing import Any, Optional from typing import Any, Optional
@@ -90,13 +92,116 @@ class Config:
self.tmdb_api_key: str = kwargs.get("tmdb_api_key") or "" self.tmdb_api_key: str = kwargs.get("tmdb_api_key") or ""
self.update_checks: bool = kwargs.get("update_checks", True) self.update_checks: bool = kwargs.get("update_checks", True)
self.update_check_interval: int = kwargs.get("update_check_interval", 24) self.update_check_interval: int = kwargs.get("update_check_interval", 24)
self.scene_naming: bool = kwargs.get("scene_naming", True)
self.series_year: bool = kwargs.get("series_year", True) # Handle backward compatibility for scene_naming option
self.scene_naming: Optional[bool] = kwargs.get("scene_naming")
self.output_template: dict = kwargs.get("output_template") or {}
# Apply scene_naming compatibility if no output_template is defined
self._apply_scene_naming_compatibility()
# Validate output templates
self._validate_output_templates()
self.title_cache_time: int = kwargs.get("title_cache_time", 1800) # 30 minutes default self.title_cache_time: int = kwargs.get("title_cache_time", 1800) # 30 minutes default
self.title_cache_max_retention: int = kwargs.get("title_cache_max_retention", 86400) # 24 hours default self.title_cache_max_retention: int = kwargs.get("title_cache_max_retention", 86400) # 24 hours default
self.title_cache_enabled: bool = kwargs.get("title_cache_enabled", True) self.title_cache_enabled: bool = kwargs.get("title_cache_enabled", True)
def _apply_scene_naming_compatibility(self) -> None:
"""Apply backward compatibility for the old scene_naming option."""
if self.scene_naming is not None:
# Only apply if no output_template is already defined
if not self.output_template.get("movies") and not self.output_template.get("series"):
if self.scene_naming:
# scene_naming: true = scene-style templates
self.output_template.update(
{
"movies": "{title}.{year}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}",
"series": "{title}.{year?}.{season_episode}.{episode_name?}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}",
"songs": "{track_number}.{title}.{source?}.WEB-DL.{audio_full}.{atmos?}-{tag}",
}
)
else:
# scene_naming: false = Plex-friendly templates
self.output_template.update(
{
"movies": "{title} ({year}) {quality}",
"series": "{title} {season_episode} {episode_name?}",
"songs": "{track_number}. {title}",
}
)
# Warn about deprecated option
warnings.warn(
"The 'scene_naming' option is deprecated. Please use 'output_template' instead. "
"Your current setting has been converted to equivalent templates.",
DeprecationWarning,
stacklevel=2,
)
def _validate_output_templates(self) -> None:
"""Validate output template configurations and warn about potential issues."""
if not self.output_template:
return
# Known template variables for validation
valid_variables = {
# Basic variables
"title",
"year",
"season",
"episode",
"season_episode",
"episode_name",
"quality",
"resolution",
"source",
"tag",
"track_number",
"artist",
"album",
"disc",
# Audio variables
"audio",
"audio_channels",
"audio_full",
"atmos",
"dual",
"multi",
# Video variables
"video",
"hdr",
"hfr",
}
# Filesystem-unsafe characters that could cause issues
unsafe_chars = r'[<>:"/\\|?*]'
for template_type, template_str in self.output_template.items():
if not isinstance(template_str, str):
warnings.warn(f"Template '{template_type}' must be a string, got {type(template_str).__name__}")
continue
# Extract variables from template
variables = re.findall(r"\{([^}]+)\}", template_str)
# Check for unknown variables
for var in variables:
# Remove conditional suffix if present
var_clean = var.rstrip("?")
if var_clean not in valid_variables:
warnings.warn(f"Unknown template variable '{var}' in {template_type} template")
# Check for filesystem-unsafe characters outside of variables
# Replace variables with safe placeholders for testing
test_template = re.sub(r"\{[^}]+\}", "TEST", template_str)
if re.search(unsafe_chars, test_template):
warnings.warn(f"Template '{template_type}' may contain filesystem-unsafe characters")
# Check for empty template
if not template_str.strip():
warnings.warn(f"Template '{template_type}' is empty")
@classmethod @classmethod
def from_yaml(cls, path: Path) -> Config: def from_yaml(cls, path: Path) -> Config:
if not path.exists(): if not path.exists():

View File

@@ -2,11 +2,17 @@
from __future__ import annotations from __future__ import annotations
from typing import Optional from typing import Optional, Union
import httpx
import m3u8 import m3u8
from pyplayready.cdm import Cdm as PlayReadyCdm
from pyplayready.system.pssh import PSSH as PR_PSSH
from pywidevine.cdm import Cdm as WidevineCdm
from pywidevine.pssh import PSSH as WV_PSSH
from requests import Session from requests import Session
from unshackle.core.drm import PlayReady, Widevine
from unshackle.core.manifests.hls import HLS from unshackle.core.manifests.hls import HLS
from unshackle.core.tracks import Tracks from unshackle.core.tracks import Tracks
@@ -15,17 +21,54 @@ def parse(
master: m3u8.M3U8, master: m3u8.M3U8,
language: str, language: str,
*, *,
session: Optional[Session] = None, session: Optional[Union[Session, httpx.Client]] = None,
) -> Tracks: ) -> Tracks:
"""Parse a variant playlist to ``Tracks`` with basic information, defer DRM loading.""" """Parse a variant playlist to ``Tracks`` with DRM information."""
tracks = HLS(master, session=session).to_tracks(language) tracks = HLS(master, session=session).to_tracks(language)
bool(master.session_keys or HLS.parse_session_data_keys(master, session or Session())) need_wv = not any(isinstance(d, Widevine) for t in tracks for d in (t.drm or []))
need_pr = not any(isinstance(d, PlayReady) for t in tracks for d in (t.drm or []))
if True: if (need_wv or need_pr) and tracks.videos:
for t in tracks.videos + tracks.audio: if not session:
t.needs_drm_loading = True session = Session()
t.session = session
session_keys = list(master.session_keys or [])
session_keys.extend(HLS.parse_session_data_keys(master, session))
for drm_obj in HLS.get_all_drm(session_keys):
if need_wv and isinstance(drm_obj, Widevine):
for t in tracks.videos + tracks.audio:
t.drm = [d for d in (t.drm or []) if not isinstance(d, Widevine)] + [drm_obj]
need_wv = False
elif need_pr and isinstance(drm_obj, PlayReady):
for t in tracks.videos + tracks.audio:
t.drm = [d for d in (t.drm or []) if not isinstance(d, PlayReady)] + [drm_obj]
need_pr = False
if not need_wv and not need_pr:
break
if (need_wv or need_pr) and tracks.videos:
first_video = tracks.videos[0]
playlist = m3u8.load(first_video.url)
for key in playlist.keys or []:
if not key or not key.keyformat:
continue
fmt = key.keyformat.lower()
if need_wv and fmt == WidevineCdm.urn:
pssh_b64 = key.uri.split(",")[-1]
drm = Widevine(pssh=WV_PSSH(pssh_b64))
for t in tracks.videos + tracks.audio:
t.drm = [d for d in (t.drm or []) if not isinstance(d, Widevine)] + [drm]
need_wv = False
elif need_pr and (fmt == PlayReadyCdm or "com.microsoft.playready" in fmt):
pssh_b64 = key.uri.split(",")[-1]
drm = PlayReady(pssh=PR_PSSH(pssh_b64), pssh_b64=pssh_b64)
for t in tracks.videos + tracks.audio:
t.drm = [d for d in (t.drm or []) if not isinstance(d, PlayReady)] + [drm]
need_pr = False
if not need_wv and not need_pr:
break
return tracks return tracks

View File

@@ -12,6 +12,7 @@ from unshackle.core.config import config
from unshackle.core.constants import AUDIO_CODEC_MAP, DYNAMIC_RANGE_MAP, VIDEO_CODEC_MAP from unshackle.core.constants import AUDIO_CODEC_MAP, DYNAMIC_RANGE_MAP, VIDEO_CODEC_MAP
from unshackle.core.titles.title import Title from unshackle.core.titles.title import Title
from unshackle.core.utilities import sanitize_filename from unshackle.core.utilities import sanitize_filename
from unshackle.core.utils.template_formatter import TemplateFormatter
class Episode(Title): class Episode(Title):
@@ -78,116 +79,154 @@ class Episode(Title):
self.year = year self.year = year
self.description = description self.description = description
def _build_template_context(self, media_info: MediaInfo, show_service: bool = True) -> dict:
"""Build template context dictionary from MediaInfo."""
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
context = {
"title": self.title.replace("$", "S"),
"year": self.year or "",
"season": f"S{self.season:02}",
"episode": f"E{self.number:02}",
"season_episode": f"S{self.season:02}E{self.number:02}",
"episode_name": self.name or "",
"tag": config.tag or "",
"source": self.service.__name__ if show_service else "",
}
# Video information
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")]
if len(aspect_ratio) == 1:
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
resolution = int(primary_video_track.width * (9 / 16))
context.update(
{
"quality": f"{resolution}p",
"resolution": str(resolution),
"video": VIDEO_CODEC_MAP.get(primary_video_track.format, primary_video_track.format),
}
)
# HDR information
hdr_format = primary_video_track.hdr_format_commercial
trc = primary_video_track.transfer_characteristics or primary_video_track.transfer_characteristics_original
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
context["hdr"] = "DV"
base_layer = DYNAMIC_RANGE_MAP.get(hdr_format)
if base_layer and base_layer != "DV":
context["hdr"] += f".{base_layer}"
else:
context["hdr"] = DYNAMIC_RANGE_MAP.get(hdr_format, "")
elif trc and "HLG" in trc:
context["hdr"] = "HLG"
else:
context["hdr"] = ""
# High frame rate
frame_rate = float(primary_video_track.frame_rate)
context["hfr"] = "HFR" if frame_rate > 30 else ""
# Audio information
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
context.update(
{
"audio": AUDIO_CODEC_MAP.get(codec, codec),
"audio_channels": f"{channels:.1f}",
"audio_full": f"{AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}",
"atmos": "Atmos" if ("JOC" in features or primary_audio_track.joc) else "",
}
)
# Multi-language audio
if unique_audio_languages == 2:
context["dual"] = "DUAL"
context["multi"] = ""
elif unique_audio_languages > 2:
context["dual"] = ""
context["multi"] = "MULTi"
else:
context["dual"] = ""
context["multi"] = ""
return context
def __str__(self) -> str: def __str__(self) -> str:
return "{title}{year} S{season:02}E{number:02} {name}".format( return "{title}{year} S{season:02}E{number:02} {name}".format(
title=self.title, title=self.title,
year=f" {self.year}" if self.year and config.series_year else "", year=f" {self.year}" if self.year else "",
season=self.season, season=self.season,
number=self.number, number=self.number,
name=self.name or "", name=self.name or "",
).strip() ).strip()
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str: def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
# Title [Year] SXXEXX Name (or Title [Year] SXX if folder)
if folder: if folder:
name = f"{self.title}" # For folders, use the series template but exclude episode-specific variables
if self.year and config.series_year: series_template = config.output_template.get("series")
name += f" {self.year}" if series_template:
name += f" S{self.season:02}" # Create a folder-friendly version by removing episode-specific variables
else: folder_template = series_template
name = "{title}{year} S{season:02}E{number:02} {name}".format( # Remove episode number and episode name from template for folders
title=self.title.replace("$", "S"), # e.g., Arli$$ folder_template = re.sub(r'\{episode\}', '', folder_template)
year=f" {self.year}" if self.year and config.series_year else "", folder_template = re.sub(r'\{episode_name\?\}', '', folder_template)
season=self.season, folder_template = re.sub(r'\{episode_name\}', '', folder_template)
number=self.number, folder_template = re.sub(r'\{season_episode\}', '{season}', folder_template)
name=self.name or "",
).strip()
if config.scene_naming: # Clean up any double separators that might result
# Resolution folder_template = re.sub(r'\.{2,}', '.', folder_template)
if primary_video_track: folder_template = re.sub(r'\s{2,}', ' ', folder_template)
resolution = primary_video_track.height folder_template = re.sub(r'^[\.\s]+|[\.\s]+$', '', folder_template)
aspect_ratio = [
int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")
]
if len(aspect_ratio) == 1:
# e.g., aspect ratio of 2 (2.00:1) would end up as `(2.0,)`, add 1
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
# We want the resolution represented in a 4:3 or 16:9 canvas.
# If it's not 4:3 or 16:9, calculate as if it's inside a 16:9 canvas,
# otherwise the track's height value is fine.
# We are assuming this title is some weird aspect ratio so most
# likely a movie or HD source, so it's most likely widescreen so
# 16:9 canvas makes the most sense.
resolution = int(primary_video_track.width * (9 / 16))
name += f" {resolution}p"
# Service formatter = TemplateFormatter(folder_template)
if show_service: context = self._build_template_context(media_info, show_service)
name += f" {self.service.__name__}" # Override season_episode with just season for folders
context['season'] = f"S{self.season:02}"
# 'WEB-DL' folder_name = formatter.format(context)
name += " WEB-DL"
# DUAL # Keep the same separator style as the series template
if unique_audio_languages == 2: if '.' in series_template and ' ' not in series_template:
name += " DUAL" # Dot-based template - use dot separator for folders too
return sanitize_filename(folder_name, ".")
# MULTi
if unique_audio_languages > 2:
name += " MULTi"
# Audio Codec + Channels (+ feature)
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(
sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" "))
)
else: else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0 # Space-based template - use space separator
channels = float(channel_count) return sanitize_filename(folder_name, " ")
else:
# Fallback to simple naming if no template defined
name = f"{self.title}"
if self.year:
name += f" {self.year}"
name += f" S{self.season:02}"
return sanitize_filename(name, " ")
features = primary_audio_track.format_additionalfeatures or "" # Use template from output_template (which includes scene_naming compatibility)
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}" # or fallback to default scene-style template
if "JOC" in features or primary_audio_track.joc: template = (
name += " Atmos" config.output_template.get("series")
or "{title}.{year?}.{season_episode}.{episode_name?}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hfr?}.{video}-{tag}"
)
# Video (dynamic range + hfr +) Codec formatter = TemplateFormatter(template)
if primary_video_track: context = self._build_template_context(media_info, show_service)
codec = primary_video_track.format return formatter.format(context)
hdr_format = primary_video_track.hdr_format_commercial
trc = (
primary_video_track.transfer_characteristics
or primary_video_track.transfer_characteristics_original
)
frame_rate = float(primary_video_track.frame_rate)
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
name += " DV"
if DYNAMIC_RANGE_MAP.get(hdr_format) and DYNAMIC_RANGE_MAP.get(hdr_format) != "DV":
name += " HDR"
else:
name += f" {DYNAMIC_RANGE_MAP.get(hdr_format)} "
elif trc and "HLG" in trc:
name += " HLG"
if frame_rate > 30:
name += " HFR"
name += f" {VIDEO_CODEC_MAP.get(codec, codec)}"
if config.tag:
name += f"-{config.tag}"
return sanitize_filename(name)
else:
# Simple naming style without technical details - use spaces instead of dots
return sanitize_filename(name, " ")
class Series(SortedKeyList, ABC): class Series(SortedKeyList, ABC):
@@ -197,7 +236,7 @@ class Series(SortedKeyList, ABC):
def __str__(self) -> str: def __str__(self) -> str:
if not self: if not self:
return super().__str__() return super().__str__()
return self[0].title + (f" ({self[0].year})" if self[0].year and config.series_year else "") return self[0].title + (f" ({self[0].year})" if self[0].year else "")
def tree(self, verbose: bool = False) -> Tree: def tree(self, verbose: bool = False) -> Tree:
seasons = Counter(x.season for x in self) seasons = Counter(x.season for x in self)

View File

@@ -9,7 +9,7 @@ from sortedcontainers import SortedKeyList
from unshackle.core.config import config from unshackle.core.config import config
from unshackle.core.constants import AUDIO_CODEC_MAP, DYNAMIC_RANGE_MAP, VIDEO_CODEC_MAP from unshackle.core.constants import AUDIO_CODEC_MAP, DYNAMIC_RANGE_MAP, VIDEO_CODEC_MAP
from unshackle.core.titles.title import Title from unshackle.core.titles.title import Title
from unshackle.core.utilities import sanitize_filename from unshackle.core.utils.template_formatter import TemplateFormatter
class Movie(Title): class Movie(Title):
@@ -45,100 +45,107 @@ class Movie(Title):
self.year = year self.year = year
self.description = description self.description = description
def _build_template_context(self, media_info: MediaInfo, show_service: bool = True) -> dict:
"""Build template context dictionary from MediaInfo."""
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
context = {
"title": self.name.replace("$", "S"),
"year": self.year or "",
"tag": config.tag or "",
"source": self.service.__name__ if show_service else "",
}
# Video information
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")]
if len(aspect_ratio) == 1:
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
resolution = int(primary_video_track.width * (9 / 16))
context.update(
{
"quality": f"{resolution}p",
"resolution": str(resolution),
"video": VIDEO_CODEC_MAP.get(primary_video_track.format, primary_video_track.format),
}
)
# HDR information
hdr_format = primary_video_track.hdr_format_commercial
trc = primary_video_track.transfer_characteristics or primary_video_track.transfer_characteristics_original
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
context["hdr"] = "DV"
base_layer = DYNAMIC_RANGE_MAP.get(hdr_format)
if base_layer and base_layer != "DV":
context["hdr"] += f".{base_layer}"
else:
context["hdr"] = DYNAMIC_RANGE_MAP.get(hdr_format, "")
elif trc and "HLG" in trc:
context["hdr"] = "HLG"
else:
context["hdr"] = ""
# High frame rate
frame_rate = float(primary_video_track.frame_rate)
context["hfr"] = "HFR" if frame_rate > 30 else ""
# Audio information
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
context.update(
{
"audio": AUDIO_CODEC_MAP.get(codec, codec),
"audio_channels": f"{channels:.1f}",
"audio_full": f"{AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}",
"atmos": "Atmos" if ("JOC" in features or primary_audio_track.joc) else "",
}
)
# Multi-language audio
if unique_audio_languages == 2:
context["dual"] = "DUAL"
context["multi"] = ""
elif unique_audio_languages > 2:
context["dual"] = ""
context["multi"] = "MULTi"
else:
context["dual"] = ""
context["multi"] = ""
return context
def __str__(self) -> str: def __str__(self) -> str:
if self.year: if self.year:
return f"{self.name} ({self.year})" return f"{self.name} ({self.year})"
return self.name return self.name
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str: def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
primary_video_track = next(iter(media_info.video_tracks), None) # Use template from output_template (which includes scene_naming compatibility)
primary_audio_track = next(iter(media_info.audio_tracks), None) # or fallback to default scene-style template
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language}) template = (
config.output_template.get("movies")
or "{title}.{year}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}"
)
# Name (Year) formatter = TemplateFormatter(template)
name = str(self).replace("$", "S") # e.g., Arli$$ context = self._build_template_context(media_info, show_service)
return formatter.format(context)
if config.scene_naming:
# Resolution
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [
int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")
]
if len(aspect_ratio) == 1:
# e.g., aspect ratio of 2 (2.00:1) would end up as `(2.0,)`, add 1
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
# We want the resolution represented in a 4:3 or 16:9 canvas.
# If it's not 4:3 or 16:9, calculate as if it's inside a 16:9 canvas,
# otherwise the track's height value is fine.
# We are assuming this title is some weird aspect ratio so most
# likely a movie or HD source, so it's most likely widescreen so
# 16:9 canvas makes the most sense.
resolution = int(primary_video_track.width * (9 / 16))
name += f" {resolution}p"
# Service
if show_service:
name += f" {self.service.__name__}"
# 'WEB-DL'
name += " WEB-DL"
# DUAL
if unique_audio_languages == 2:
name += " DUAL"
# MULTi
if unique_audio_languages > 2:
name += " MULTi"
# Audio Codec + Channels (+ feature)
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(
sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" "))
)
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or primary_audio_track.joc:
name += " Atmos"
# Video (dynamic range + hfr +) Codec
if primary_video_track:
codec = primary_video_track.format
hdr_format = primary_video_track.hdr_format_commercial
trc = (
primary_video_track.transfer_characteristics
or primary_video_track.transfer_characteristics_original
)
frame_rate = float(primary_video_track.frame_rate)
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
name += " DV"
if DYNAMIC_RANGE_MAP.get(hdr_format) and DYNAMIC_RANGE_MAP.get(hdr_format) != "DV":
name += " HDR"
else:
name += f" {DYNAMIC_RANGE_MAP.get(hdr_format)} "
elif trc and "HLG" in trc:
name += " HLG"
if frame_rate > 30:
name += " HFR"
name += f" {VIDEO_CODEC_MAP.get(codec, codec)}"
if config.tag:
name += f"-{config.tag}"
return sanitize_filename(name)
else:
# Simple naming style without technical details - use spaces instead of dots
return sanitize_filename(name, " ")
class Movies(SortedKeyList, ABC): class Movies(SortedKeyList, ABC):

View File

@@ -10,6 +10,7 @@ from unshackle.core.config import config
from unshackle.core.constants import AUDIO_CODEC_MAP from unshackle.core.constants import AUDIO_CODEC_MAP
from unshackle.core.titles.title import Title from unshackle.core.titles.title import Title
from unshackle.core.utilities import sanitize_filename from unshackle.core.utilities import sanitize_filename
from unshackle.core.utils.template_formatter import TemplateFormatter
class Song(Title): class Song(Title):
@@ -81,46 +82,63 @@ class Song(Title):
artist=self.artist, album=self.album, year=self.year, track=self.track, name=self.name artist=self.artist, album=self.album, year=self.year, track=self.track, name=self.name
).strip() ).strip()
def _build_template_context(self, media_info: MediaInfo, show_service: bool = True) -> dict:
"""Build template context dictionary from MediaInfo."""
primary_audio_track = next(iter(media_info.audio_tracks), None)
context = {
"artist": self.artist.replace("$", "S"),
"album": self.album.replace("$", "S"),
"title": self.name.replace("$", "S"),
"track_number": f"{self.track:02}",
"disc": f"{self.disc:02}" if self.disc > 1 else "",
"year": self.year or "",
"tag": config.tag or "",
"source": self.service.__name__ if show_service else "",
}
# Audio information
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
context.update(
{
"audio": AUDIO_CODEC_MAP.get(codec, codec),
"audio_channels": f"{channels:.1f}",
"audio_full": f"{AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}",
"atmos": "Atmos" if ("JOC" in features or primary_audio_track.joc) else "",
}
)
return context
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str: def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
audio_track = next(iter(media_info.audio_tracks), None)
codec = audio_track.format
channel_layout = audio_track.channel_layout or audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = audio_track.channel_s or audio_track.channels or 0
channels = float(channel_count)
features = audio_track.format_additionalfeatures or ""
if folder: if folder:
# Artist - Album (Year) # For folders, use simple naming: "Artist - Album (Year)"
name = str(self).split(" / ")[0] name = f"{self.artist} - {self.album}"
else: if self.year:
# NN. Song Name name += f" ({self.year})"
name = str(self).split(" / ")[1]
if config.scene_naming:
# Service
if show_service:
name += f" {self.service.__name__}"
# 'WEB-DL'
name += " WEB-DL"
# Audio Codec + Channels (+ feature)
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or audio_track.joc:
name += " Atmos"
if config.tag:
name += f"-{config.tag}"
return sanitize_filename(name, " ")
else:
# Simple naming style without technical details
return sanitize_filename(name, " ") return sanitize_filename(name, " ")
# Use template from output_template (which includes scene_naming compatibility)
# or fallback to default scene-style template
template = (
config.output_template.get("songs") or "{track_number}.{title}.{source?}.WEB-DL.{audio_full}.{atmos?}-{tag}"
)
formatter = TemplateFormatter(template)
context = self._build_template_context(media_info, show_service)
return formatter.format(context)
class Album(SortedKeyList, ABC): class Album(SortedKeyList, ABC):
def __init__(self, iterable: Optional[Iterable] = None): def __init__(self, iterable: Optional[Iterable] = None):

View File

@@ -420,7 +420,7 @@ class Track:
for drm in self.drm: for drm in self.drm:
if isinstance(drm, PlayReady): if isinstance(drm, PlayReady):
return drm return drm
elif hasattr(cdm, "is_playready"): elif hasattr(cdm, 'is_playready'):
if cdm.is_playready: if cdm.is_playready:
for drm in self.drm: for drm in self.drm:
if isinstance(drm, PlayReady): if isinstance(drm, PlayReady):
@@ -473,83 +473,6 @@ class Track:
if tenc.key_ID.int != 0: if tenc.key_ID.int != 0:
return tenc.key_ID return tenc.key_ID
def load_drm_if_needed(self, service=None) -> bool:
"""
Load DRM information for this track if it was deferred during parsing.
Args:
service: Service instance that can fetch track-specific DRM info
Returns:
True if DRM was loaded or already present, False if failed
"""
if not getattr(self, "needs_drm_loading", False):
return bool(self.drm)
if self.drm:
self.needs_drm_loading = False
return True
if not service or not hasattr(service, "get_track_drm"):
return self.load_drm_from_playlist()
try:
track_drm = service.get_track_drm(self)
if track_drm:
self.drm = track_drm if isinstance(track_drm, list) else [track_drm]
self.needs_drm_loading = False
return True
except Exception as e:
raise ValueError(f"Failed to load DRM from service for track {self.id}: {e}")
return self.load_drm_from_playlist()
def load_drm_from_playlist(self) -> bool:
"""
Fallback method to load DRM by fetching this track's individual playlist.
"""
if self.drm:
self.needs_drm_loading = False
return True
try:
import m3u8
from pyplayready.cdm import Cdm as PlayReadyCdm
from pyplayready.system.pssh import PSSH as PR_PSSH
from pywidevine.cdm import Cdm as WidevineCdm
from pywidevine.pssh import PSSH as WV_PSSH
session = getattr(self, "session", None) or Session()
response = session.get(self.url)
playlist = m3u8.loads(response.text, self.url)
drm_list = []
for key in playlist.keys or []:
if not key or not key.keyformat:
continue
fmt = key.keyformat.lower()
if fmt == WidevineCdm.urn:
pssh_b64 = key.uri.split(",")[-1]
drm = Widevine(pssh=WV_PSSH(pssh_b64))
drm_list.append(drm)
elif fmt == PlayReadyCdm or "com.microsoft.playready" in fmt:
pssh_b64 = key.uri.split(",")[-1]
drm = PlayReady(pssh=PR_PSSH(pssh_b64), pssh_b64=pssh_b64)
drm_list.append(drm)
if drm_list:
self.drm = drm_list
self.needs_drm_loading = False
return True
except Exception as e:
raise ValueError(f"Failed to load DRM from playlist for track {self.id}: {e}")
return False
def get_init_segment( def get_init_segment(
self, self,
maximum_size: int = 20000, maximum_size: int = 20000,
@@ -644,32 +567,15 @@ class Track:
output_path = original_path.with_stem(f"{original_path.stem}_repack") output_path = original_path.with_stem(f"{original_path.stem}_repack")
def _ffmpeg(extra_args: list[str] = None): def _ffmpeg(extra_args: list[str] = None):
args = [ subprocess.run(
binaries.FFMPEG,
"-hide_banner",
"-loglevel",
"error",
"-i",
original_path,
*(extra_args or []),
]
if hasattr(self, "data") and self.data.get("audio_language"):
audio_lang = self.data["audio_language"]
audio_name = self.data.get("audio_language_name", audio_lang)
args.extend(
[
"-metadata:s:a:0",
f"language={audio_lang}",
"-metadata:s:a:0",
f"title={audio_name}",
"-metadata:s:a:0",
f"handler_name={audio_name}",
]
)
args.extend(
[ [
binaries.FFMPEG,
"-hide_banner",
"-loglevel",
"error",
"-i",
original_path,
*(extra_args or []),
# Following are very important! # Following are very important!
"-map_metadata", "-map_metadata",
"-1", # don't transfer metadata to output file "-1", # don't transfer metadata to output file
@@ -678,11 +584,7 @@ class Track:
"-codec", "-codec",
"copy", "copy",
str(output_path), str(output_path),
] ],
)
subprocess.run(
args,
check=True, check=True,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,

View File

@@ -305,14 +305,7 @@ class Tracks:
) )
return selected return selected
def mux( def mux(self, title: str, delete: bool = True, progress: Optional[partial] = None) -> tuple[Path, int, list[str]]:
self,
title: str,
delete: bool = True,
progress: Optional[partial] = None,
audio_expected: bool = True,
title_language: Optional[Language] = None,
) -> tuple[Path, int, list[str]]:
""" """
Multiplex all the Tracks into a Matroska Container file. Multiplex all the Tracks into a Matroska Container file.
@@ -322,28 +315,7 @@ class Tracks:
delete: Delete all track files after multiplexing. delete: Delete all track files after multiplexing.
progress: Update a rich progress bar via `completed=...`. This must be the progress: Update a rich progress bar via `completed=...`. This must be the
progress object's update() func, pre-set with task id via functools.partial. progress object's update() func, pre-set with task id via functools.partial.
audio_expected: Whether audio is expected in the output. Used to determine
if embedded audio metadata should be added.
title_language: The title's intended language. Used to select the best video track
for audio metadata when multiple video tracks exist.
""" """
if self.videos and not self.audio and audio_expected:
video_track = None
if title_language:
video_track = next((v for v in self.videos if v.language == title_language), None)
if not video_track:
video_track = next((v for v in self.videos if v.is_original_lang), None)
video_track = video_track or self.videos[0]
if video_track.language.is_valid():
lang_code = str(video_track.language)
lang_name = video_track.language.display_name()
for video in self.videos:
video.needs_repack = True
video.data["audio_language"] = lang_code
video.data["audio_language_name"] = lang_name
if not binaries.MKVToolNix: if not binaries.MKVToolNix:
raise RuntimeError("MKVToolNix (mkvmerge) is required for muxing but was not found") raise RuntimeError("MKVToolNix (mkvmerge) is required for muxing but was not found")
@@ -360,20 +332,12 @@ class Tracks:
raise ValueError("Video Track must be downloaded before muxing...") raise ValueError("Video Track must be downloaded before muxing...")
events.emit(events.Types.TRACK_MULTIPLEX, track=vt) events.emit(events.Types.TRACK_MULTIPLEX, track=vt)
is_default = False
if title_language:
is_default = vt.language == title_language
if not any(v.language == title_language for v in self.videos):
is_default = vt.is_original_lang or i == 0
else:
is_default = i == 0
# Prepare base arguments # Prepare base arguments
video_args = [ video_args = [
"--language", "--language",
f"0:{vt.language}", f"0:{vt.language}",
"--default-track", "--default-track",
f"0:{is_default}", f"0:{i == 0}",
"--original-flag", "--original-flag",
f"0:{vt.is_original_lang}", f"0:{vt.is_original_lang}",
"--compression", "--compression",
@@ -399,18 +363,6 @@ class Tracks:
] ]
) )
if hasattr(vt, "data") and vt.data.get("audio_language"):
audio_lang = vt.data["audio_language"]
audio_name = vt.data.get("audio_language_name", audio_lang)
video_args.extend(
[
"--language",
f"1:{audio_lang}",
"--track-name",
f"1:{audio_name}",
]
)
cl.extend(video_args + ["(", str(vt.path), ")"]) cl.extend(video_args + ["(", str(vt.path), ")"])
for i, at in enumerate(self.audio): for i, at in enumerate(self.audio):

View File

@@ -0,0 +1,147 @@
import logging
import re
from typing import Any, Dict, List
from unshackle.core.utilities import sanitize_filename
class TemplateFormatter:
"""
Template formatter for custom filename patterns.
Supports variable substitution and conditional variables.
Example: '{title}.{year}.{quality?}.{source}-{tag}'
"""
def __init__(self, template: str):
"""Initialize the template formatter.
Args:
template: Template string with variables in {variable} format
"""
self.template = template
self.variables = self._extract_variables()
def _extract_variables(self) -> List[str]:
"""Extract all variables from the template."""
pattern = r"\{([^}]+)\}"
matches = re.findall(pattern, self.template)
return [match.strip() for match in matches]
def format(self, context: Dict[str, Any]) -> str:
"""Format the template with the provided context.
Args:
context: Dictionary containing variable values
Returns:
Formatted filename string
Raises:
ValueError: If required template variables are missing from context
"""
logger = logging.getLogger(__name__)
# Validate that all required variables are present
is_valid, missing_vars = self.validate(context)
if not is_valid:
error_msg = f"Missing required template variables: {', '.join(missing_vars)}"
logger.error(error_msg)
raise ValueError(error_msg)
try:
result = self.template
for variable in self.variables:
placeholder = "{" + variable + "}"
is_conditional = variable.endswith("?")
if is_conditional:
# Remove the ? for conditional variables
var_name = variable[:-1]
value = context.get(var_name, "")
if value:
# Replace with actual value, ensuring it's string and safe
safe_value = str(value).strip()
result = result.replace(placeholder, safe_value)
else:
# Remove the placeholder entirely for empty conditional variables
result = result.replace(placeholder, "")
else:
# Regular variable
value = context.get(variable, "")
if value is None:
logger.warning(f"Template variable '{variable}' is None, using empty string")
value = ""
safe_value = str(value).strip()
result = result.replace(placeholder, safe_value)
# Clean up multiple consecutive dots/separators and other artifacts
result = re.sub(r"\.{2,}", ".", result) # Multiple dots -> single dot
result = re.sub(r"\s{2,}", " ", result) # Multiple spaces -> single space
result = re.sub(r"^[\.\s]+|[\.\s]+$", "", result) # Remove leading/trailing dots and spaces
result = re.sub(r"\.-", "-", result) # Remove dots before dashes (for dot-based templates)
result = re.sub(r"[\.\s]+\)", ")", result) # Remove dots/spaces before closing parentheses
# Determine the appropriate separator based on template style
# If the template contains spaces (like Plex-friendly), preserve them
if " " in self.template and "." not in self.template:
# Space-based template (Plex-friendly) - use space separator
result = sanitize_filename(result, spacer=" ")
else:
# Dot-based template (scene-style) - use dot separator
result = sanitize_filename(result, spacer=".")
# Final validation - ensure we have a non-empty result
if not result or result.isspace():
logger.warning("Template formatting resulted in empty filename, using fallback")
return "untitled"
logger.debug(f"Template formatted successfully: '{self.template}' -> '{result}'")
return result
except Exception as e:
logger.error(f"Error formatting template '{self.template}': {e}")
# Return a safe fallback filename
fallback = f"error_formatting_{hash(self.template) % 10000}"
logger.warning(f"Using fallback filename: {fallback}")
return fallback
def validate(self, context: Dict[str, Any]) -> tuple[bool, List[str]]:
"""Validate that all required variables are present in context.
Args:
context: Dictionary containing variable values
Returns:
Tuple of (is_valid, missing_variables)
"""
missing = []
for variable in self.variables:
is_conditional = variable.endswith("?")
var_name = variable[:-1] if is_conditional else variable
# Only check non-conditional variables
if not is_conditional and var_name not in context:
missing.append(var_name)
return len(missing) == 0, missing
def get_required_variables(self) -> List[str]:
"""Get list of required (non-conditional) variables."""
required = []
for variable in self.variables:
if not variable.endswith("?"):
required.append(variable)
return required
def get_optional_variables(self) -> List[str]:
"""Get list of optional (conditional) variables."""
optional = []
for variable in self.variables:
if variable.endswith("?"):
optional.append(variable[:-1]) # Remove the ?
return optional

View File

@@ -74,9 +74,7 @@ class Vaults:
for vault in self.vaults: for vault in self.vaults:
if not vault.no_push: if not vault.no_push:
try: try:
# Count each vault that successfully processes the keys (whether new or existing) success += bool(vault.add_keys(self.service, kid_keys))
vault.add_keys(self.service, kid_keys)
success += 1
except (PermissionError, NotImplementedError): except (PermissionError, NotImplementedError):
pass pass
return success return success

View File

@@ -282,10 +282,6 @@ class EXAMPLE(Service):
return chapters return chapters
def get_widevine_service_certificate(self, **_: any) -> str:
"""Return the Widevine service certificate from config, if available."""
return self.config.get("certificate")
def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]: def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]:
"""Retrieve a PlayReady license for a given track.""" """Retrieve a PlayReady license for a given track."""

View File

@@ -10,15 +10,45 @@ tag_imdb_tmdb: true
# Set terminal background color (custom option not in CONFIG.md) # Set terminal background color (custom option not in CONFIG.md)
set_terminal_bg: false set_terminal_bg: false
# Set file naming convention # File naming is now controlled via output_template (see below)
# true for style - Prime.Suspect.S07E01.The.Final.Act.Part.One.1080p.ITV.WEB-DL.AAC2.0.H.264 # Default behavior provides scene-style naming similar to the old scene_naming: true
# false for style - Prime Suspect S07E01 The Final Act - Part One #
scene_naming: true # BACKWARD COMPATIBILITY: The old scene_naming option is still supported:
# scene_naming: true -> Equivalent to scene-style templates (dot-separated)
# scene_naming: false -> Equivalent to Plex-friendly templates (space-separated)
# Note: output_template takes precedence over scene_naming if both are defined
# Whether to include the year in series names for episodes and folders (default: true) # Custom output templates for filenames
# true for style - Show Name (2023) S01E01 Episode Name # When not defined, defaults to scene-style naming equivalent to the old scene_naming: true
# false for style - Show Name S01E01 Episode Name # Available variables: {title}, {year}, {season}, {episode}, {season_episode}, {episode_name},
series_year: true # {quality}, {resolution}, {source}, {audio}, {audio_channels}, {audio_full},
# {video}, {hdr}, {hfr}, {atmos}, {dual}, {multi}, {tag}
# Conditional variables (included only if present): Add ? suffix like {year?}, {episode_name?}, {hdr?}
# Uncomment and customize the templates below:
#
# output_template:
# # Scene-style naming (dot-separated) - Default behavior when no template is defined
# movies: '{title}.{year}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}'
# series: '{title}.{year?}.{season_episode}.{episode_name?}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}'
#
# # Plex-friendly naming (space-separated, clean format)
# # movies: '{title} ({year}) {quality}'
# # series: '{title} {season_episode} {episode_name?}'
#
# # Minimal naming (basic info only)
# # movies: '{title}.{year}.{quality}'
# # series: '{title}.{season_episode}.{episode_name?}'
#
# # Custom scene-style with specific elements
# # movies: '{title}.{year}.{quality}.{hdr?}.{source}.WEB-DL.{audio_full}.{video}-{tag}'
# # series: '{title}.{year?}.{season_episode}.{episode_name?}.{quality}.{hdr?}.{source}.WEB-DL.{audio_full}.{atmos?}.{video}-{tag}'
#
# Example outputs:
# Scene movies: 'The.Matrix.1999.1080p.NF.WEB-DL.DDP5.1.H.264-EXAMPLE'
# Scene movies (HDR): 'Dune.2021.2160p.HBO.WEB-DL.DDP5.1.HDR10.H.265-EXAMPLE'
# Scene series: 'Breaking.Bad.2008.S01E01.Pilot.1080p.NF.WEB-DL.DDP5.1.H.264-EXAMPLE'
# Plex movies: 'The Matrix (1999) 1080p'
# Plex series: 'Breaking Bad S01E01 Pilot'
# Check for updates from GitHub repository on startup (default: true) # Check for updates from GitHub repository on startup (default: true)
update_checks: true update_checks: true
@@ -88,26 +118,6 @@ cdm:
jane_uhd: nexus_5_l1 # Profile 'jane_uhd' uses Nexus 5 L1 jane_uhd: nexus_5_l1 # Profile 'jane_uhd' uses Nexus 5 L1
default: generic_android_l3 # Default CDM for this service default: generic_android_l3 # Default CDM for this service
# NEW: Quality-based CDM selection
# Use different CDMs based on video resolution
# Supports operators: >=, >, <=, <, or exact match
EXAMPLE_QUALITY:
"<=1080": generic_android_l3 # Use L3 for 1080p and below
">1080": nexus_5_l1 # Use L1 for above 1080p (1440p, 2160p)
default: generic_android_l3 # Optional: fallback if no quality match
# You can mix profiles and quality thresholds in the same service
NETFLIX:
# Profile-based selection (existing functionality)
john: netflix_l3_profile
jane: netflix_l1_profile
# Quality-based selection (new functionality)
"<=720": netflix_mobile_l3
"1080": netflix_standard_l3
">=1440": netflix_premium_l1
# Fallback
default: netflix_standard_l3
# Use pywidevine Serve-compliant Remote CDMs # Use pywidevine Serve-compliant Remote CDMs
remote_cdm: remote_cdm:
- name: "chrome" - name: "chrome"

View File

@@ -28,33 +28,26 @@ class MySQL(Vault):
raise PermissionError(f"MySQL vault {self.slug} has no SELECT permission.") raise PermissionError(f"MySQL vault {self.slug} has no SELECT permission.")
def get_key(self, kid: Union[UUID, str], service: str) -> Optional[str]: def get_key(self, kid: Union[UUID, str], service: str) -> Optional[str]:
if not self.has_table(service):
# no table, no key, simple
return None
if isinstance(kid, UUID): if isinstance(kid, UUID):
kid = kid.hex kid = kid.hex
service_variants = [service]
if service != service.lower():
service_variants.append(service.lower())
if service != service.upper():
service_variants.append(service.upper())
conn = self.conn_factory.get() conn = self.conn_factory.get()
cursor = conn.cursor() cursor = conn.cursor()
try: try:
for service_name in service_variants: cursor.execute(
if not self.has_table(service_name): # TODO: SQL injection risk
continue f"SELECT `id`, `key_` FROM `{service}` WHERE `kid`=%s AND `key_`!=%s",
(kid, "0" * 32),
cursor.execute( )
# TODO: SQL injection risk cek = cursor.fetchone()
f"SELECT `id`, `key_` FROM `{service_name}` WHERE `kid`=%s AND `key_`!=%s", if not cek:
(kid, "0" * 32), return None
) return cek["key_"]
cek = cursor.fetchone()
if cek:
return cek["key_"]
return None
finally: finally:
cursor.close() cursor.close()
@@ -138,27 +131,16 @@ class MySQL(Vault):
if any(isinstance(kid, UUID) for kid, key_ in kid_keys.items()): if any(isinstance(kid, UUID) for kid, key_ in kid_keys.items()):
kid_keys = {kid.hex if isinstance(kid, UUID) else kid: key_ for kid, key_ in kid_keys.items()} kid_keys = {kid.hex if isinstance(kid, UUID) else kid: key_ for kid, key_ in kid_keys.items()}
if not kid_keys:
return 0
conn = self.conn_factory.get() conn = self.conn_factory.get()
cursor = conn.cursor() cursor = conn.cursor()
try: try:
placeholders = ",".join(["%s"] * len(kid_keys))
cursor.execute(f"SELECT kid FROM `{service}` WHERE kid IN ({placeholders})", list(kid_keys.keys()))
existing_kids = {row["kid"] for row in cursor.fetchall()}
new_keys = {kid: key for kid, key in kid_keys.items() if kid not in existing_kids}
if not new_keys:
return 0
cursor.executemany( cursor.executemany(
f"INSERT INTO `{service}` (kid, key_) VALUES (%s, %s)", # TODO: SQL injection risk
new_keys.items(), f"INSERT IGNORE INTO `{service}` (kid, key_) VALUES (%s, %s)",
kid_keys.items(),
) )
return len(new_keys) return cursor.rowcount
finally: finally:
conn.commit() conn.commit()
cursor.close() cursor.close()

View File

@@ -19,30 +19,22 @@ class SQLite(Vault):
self.conn_factory = ConnectionFactory(self.path) self.conn_factory = ConnectionFactory(self.path)
def get_key(self, kid: Union[UUID, str], service: str) -> Optional[str]: def get_key(self, kid: Union[UUID, str], service: str) -> Optional[str]:
if not self.has_table(service):
# no table, no key, simple
return None
if isinstance(kid, UUID): if isinstance(kid, UUID):
kid = kid.hex kid = kid.hex
conn = self.conn_factory.get() conn = self.conn_factory.get()
cursor = conn.cursor() cursor = conn.cursor()
# Try both the original service name and lowercase version to handle case sensitivity issues
service_variants = [service]
if service != service.lower():
service_variants.append(service.lower())
if service != service.upper():
service_variants.append(service.upper())
try: try:
for service_name in service_variants: cursor.execute(f"SELECT `id`, `key_` FROM `{service}` WHERE `kid`=? AND `key_`!=?", (kid, "0" * 32))
if not self.has_table(service_name): cek = cursor.fetchone()
continue if not cek:
return None
cursor.execute(f"SELECT `id`, `key_` FROM `{service_name}` WHERE `kid`=? AND `key_`!=?", (kid, "0" * 32)) return cek[1]
cek = cursor.fetchone()
if cek:
return cek[1]
return None
finally: finally:
cursor.close() cursor.close()
@@ -110,27 +102,16 @@ class SQLite(Vault):
if any(isinstance(kid, UUID) for kid, key_ in kid_keys.items()): if any(isinstance(kid, UUID) for kid, key_ in kid_keys.items()):
kid_keys = {kid.hex if isinstance(kid, UUID) else kid: key_ for kid, key_ in kid_keys.items()} kid_keys = {kid.hex if isinstance(kid, UUID) else kid: key_ for kid, key_ in kid_keys.items()}
if not kid_keys:
return 0
conn = self.conn_factory.get() conn = self.conn_factory.get()
cursor = conn.cursor() cursor = conn.cursor()
try: try:
placeholders = ",".join(["?"] * len(kid_keys))
cursor.execute(f"SELECT kid FROM `{service}` WHERE kid IN ({placeholders})", list(kid_keys.keys()))
existing_kids = {row[0] for row in cursor.fetchall()}
new_keys = {kid: key for kid, key in kid_keys.items() if kid not in existing_kids}
if not new_keys:
return 0
cursor.executemany( cursor.executemany(
f"INSERT INTO `{service}` (kid, key_) VALUES (?, ?)", # TODO: SQL injection risk
new_keys.items(), f"INSERT OR IGNORE INTO `{service}` (kid, key_) VALUES (?, ?)",
kid_keys.items(),
) )
return len(new_keys) return cursor.rowcount
finally: finally:
conn.commit() conn.commit()
cursor.close() cursor.close()

2
uv.lock generated
View File

@@ -1499,7 +1499,7 @@ wheels = [
[[package]] [[package]]
name = "unshackle" name = "unshackle"
version = "1.4.6" version = "1.4.4"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "appdirs" }, { name = "appdirs" },