4 Commits

14 changed files with 685 additions and 446 deletions

View File

@@ -5,36 +5,26 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.4.5] - 2025-09-09
## [Unreleased]
### Added
- **Enhanced CDM Key Caching**: Improved key caching and session management for L1/L2 devices
- Optimized `get_cached_keys_if_exists` functionality for better performance with L1/L2 devices
- Enhanced cached key retrieval logic with improved session handling
- **Widevine Common Certificate Fallback**: Added fallback to Widevine common certificate for L1 devices
- Improved compatibility for L1 devices when service certificates are unavailable
- **Enhanced Vault Loading**: Improved vault loading and key copying logic
- Better error handling and key management in vault operations
- **PSSH Display Optimization**: Truncated PSSH string display in non-debug mode for cleaner output
- **CDM Error Messaging**: Added error messages for missing service certificates in CDM sessions
- **Custom Output Templates**: Flexible filename customization system
- New `output_template` configuration in unshackle.yaml for movies, series, and songs
- Support for conditional variables using `?` suffix (e.g., `{year?}`, `{hdr?}`)
- Comprehensive template variables for title, quality, audio, video, and metadata
- Multiple naming styles: Scene-style (dot-separated), Plex-friendly (space-separated), minimal, custom
- Automatic template validation and enhanced error handling
- **Full backward compatibility**: Old `scene_naming` option still works and automatically converts to equivalent templates
- Folder naming now follows series template patterns (excluding episode-specific variables)
- Deprecation warnings guide users to migrate from `scene_naming` to `output_template`
### Changed
- **Dynamic Version Headers**: Updated User-Agent headers to use dynamic version strings
- DecryptLabsRemoteCDM now uses dynamic version import instead of hardcoded version
- **Intelligent CDM Caching**: Implemented intelligent caching system for CDM license requests
- Enhanced caching logic reduces redundant license requests and improves performance
- **Enhanced Tag Handling**: Improved tag handling for TV shows and movies from Simkl data
- Better metadata processing and formatting for improved media tagging
### Fixed
- **CDM Session Management**: Clean up session data when retrieving cached keys
- Remove decrypt_labs_session_id and challenge from session when cached keys exist but there are missing kids
- Ensures clean state for subsequent requests and prevents session conflicts
- **Tag Formatting**: Fixed formatting issues in tag processing
- **Import Order**: Fixed import order issues in tags module
- **Filename Generation**: Updated all title classes (Movie, Episode, Song) to use new template system
- Enhanced context building for template variable substitution
- Improved separator handling based on template style detection
- Better handling of conditional content like HDR, Atmos, and multi-language audio
## [1.4.4] - 2025-09-02
@@ -57,7 +47,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- **Matroska Tag Compliance**: Enhanced media container compatibility
- **Matroska Tag Compliance**: Enhanced media container compatibility
- Fixed Matroska tag compliance with official specification
- **Application Branding**: Cleaned up version display
- Removed old devine version reference from banner to avoid developer confusion

View File

@@ -66,18 +66,6 @@ from unshackle.core.vaults import Vaults
class dl:
@staticmethod
def _truncate_pssh_for_display(pssh_string: str, drm_type: str) -> str:
"""Truncate PSSH string for display when not in debug mode."""
if logging.root.level == logging.DEBUG or not pssh_string:
return pssh_string
max_width = console.width - len(drm_type) - 12
if len(pssh_string) <= max_width:
return pssh_string
return pssh_string[: max_width - 3] + "..."
@click.command(
short_help="Download, Decrypt, and Mux tracks for titles from a Service.",
cls=Services,
@@ -1240,8 +1228,7 @@ class dl:
if isinstance(drm, Widevine):
with self.DRM_TABLE_LOCK:
pssh_display = self._truncate_pssh_for_display(drm.pssh.dumps(), "Widevine")
cek_tree = Tree(Text.assemble(("Widevine", "cyan"), (f"({pssh_display})", "text"), overflow="fold"))
cek_tree = Tree(Text.assemble(("Widevine", "cyan"), (f"({drm.pssh.dumps()})", "text"), overflow="fold"))
pre_existing_tree = next(
(x for x in table.columns[0].cells if isinstance(x, Tree) and x.label == cek_tree.label), None
)
@@ -1333,11 +1320,10 @@ class dl:
elif isinstance(drm, PlayReady):
with self.DRM_TABLE_LOCK:
pssh_display = self._truncate_pssh_for_display(drm.pssh_b64 or "", "PlayReady")
cek_tree = Tree(
Text.assemble(
("PlayReady", "cyan"),
(f"({pssh_display})", "text"),
(f"({drm.pssh_b64 or ''})", "text"),
overflow="fold",
)
)

View File

@@ -12,113 +12,84 @@ from unshackle.core.vault import Vault
from unshackle.core.vaults import Vaults
def _load_vaults(vault_names: list[str]) -> Vaults:
"""Load and validate vaults by name."""
vaults = Vaults()
for vault_name in vault_names:
vault_config = next((x for x in config.key_vaults if x["name"] == vault_name), None)
if not vault_config:
raise click.ClickException(f"Vault ({vault_name}) is not defined in the config.")
vault_type = vault_config["type"]
vault_args = vault_config.copy()
del vault_args["type"]
if not vaults.load(vault_type, **vault_args):
raise click.ClickException(f"Failed to load vault ({vault_name}).")
return vaults
def _process_service_keys(from_vault: Vault, service: str, log: logging.Logger) -> dict[str, str]:
"""Get and validate keys from a vault for a specific service."""
content_keys = list(from_vault.get_keys(service))
bad_keys = {kid: key for kid, key in content_keys if not key or key.count("0") == len(key)}
for kid, key in bad_keys.items():
log.warning(f"Skipping NULL key: {kid}:{key}")
return {kid: key for kid, key in content_keys if kid not in bad_keys}
def _copy_service_data(to_vault: Vault, from_vault: Vault, service: str, log: logging.Logger) -> int:
"""Copy data for a single service between vaults."""
content_keys = _process_service_keys(from_vault, service, log)
total_count = len(content_keys)
if total_count == 0:
log.info(f"{service}: No keys found in {from_vault}")
return 0
try:
added = to_vault.add_keys(service, content_keys)
except PermissionError:
log.warning(f"{service}: No permission to create table in {to_vault}, skipped")
return 0
existed = total_count - added
if added > 0 and existed > 0:
log.info(f"{service}: {added} added, {existed} skipped ({total_count} total)")
elif added > 0:
log.info(f"{service}: {added} added ({total_count} total)")
else:
log.info(f"{service}: {existed} skipped (all existed)")
return added
@click.group(short_help="Manage and configure Key Vaults.", context_settings=context_settings)
def kv() -> None:
"""Manage and configure Key Vaults."""
@kv.command()
@click.argument("to_vault_name", type=str)
@click.argument("from_vault_names", nargs=-1, type=click.UNPROCESSED)
@click.argument("to_vault", type=str)
@click.argument("from_vaults", nargs=-1, type=click.UNPROCESSED)
@click.option("-s", "--service", type=str, default=None, help="Only copy data to and from a specific service.")
def copy(to_vault_name: str, from_vault_names: list[str], service: Optional[str] = None) -> None:
def copy(to_vault: str, from_vaults: list[str], service: Optional[str] = None) -> None:
"""
Copy data from multiple Key Vaults into a single Key Vault.
Rows with matching KIDs are skipped unless there's no KEY set.
Existing data is not deleted or altered.
The `to_vault_name` argument is the key vault you wish to copy data to.
The `to_vault` argument is the key vault you wish to copy data to.
It should be the name of a Key Vault defined in the config.
The `from_vault_names` argument is the key vault(s) you wish to take
The `from_vaults` argument is the key vault(s) you wish to take
data from. You may supply multiple key vaults.
"""
if not from_vault_names:
if not from_vaults:
raise click.ClickException("No Vaults were specified to copy data from.")
log = logging.getLogger("kv")
all_vault_names = [to_vault_name] + list(from_vault_names)
vaults = _load_vaults(all_vault_names)
vaults = Vaults()
for vault_name in [to_vault] + list(from_vaults):
vault = next((x for x in config.key_vaults if x["name"] == vault_name), None)
if not vault:
raise click.ClickException(f"Vault ({vault_name}) is not defined in the config.")
vault_type = vault["type"]
vault_args = vault.copy()
del vault_args["type"]
if not vaults.load(vault_type, **vault_args):
raise click.ClickException(f"Failed to load vault ({vault_name}).")
to_vault = vaults.vaults[0]
from_vaults = vaults.vaults[1:]
vault_names = ", ".join([v.name for v in from_vaults])
log.info(f"Copying data from {vault_names}{to_vault.name}")
to_vault: Vault = vaults.vaults[0]
from_vaults: list[Vault] = vaults.vaults[1:]
log.info(f"Copying data from {', '.join([x.name for x in from_vaults])}, into {to_vault.name}")
if service:
service = Services.get_tag(service)
log.info(f"Filtering by service: {service}")
log.info(f"Only copying data for service {service}")
total_added = 0
for from_vault in from_vaults:
services_to_copy = [service] if service else from_vault.get_services()
if service:
services = [service]
else:
services = from_vault.get_services()
for service_ in services:
log.info(f"Getting data from {from_vault} for {service_}")
content_keys = list(from_vault.get_keys(service_)) # important as it's a generator we iterate twice
bad_keys = {kid: key for kid, key in content_keys if not key or key.count("0") == len(key)}
for kid, key in bad_keys.items():
log.warning(f"Cannot add a NULL Content Key to a Vault, skipping: {kid}:{key}")
content_keys = {kid: key for kid, key in content_keys if kid not in bad_keys}
total_count = len(content_keys)
log.info(f"Adding {total_count} Content Keys to {to_vault} for {service_}")
try:
added = to_vault.add_keys(service_, content_keys)
except PermissionError:
log.warning(f" - No permission to create table ({service_}) in {to_vault}, skipping...")
continue
for service_tag in services_to_copy:
added = _copy_service_data(to_vault, from_vault, service_tag, log)
total_added += added
existed = total_count - added
if total_added > 0:
log.info(f"Successfully added {total_added} new keys to {to_vault}")
else:
log.info("Copy completed - no new keys to add")
log.info(f"{to_vault} ({service_}): {added} newly added, {existed} already existed (skipped)")
log.info(f"{to_vault}: {total_added} total newly added")
@kv.command()
@@ -135,9 +106,9 @@ def sync(ctx: click.Context, vaults: list[str], service: Optional[str] = None) -
if not len(vaults) > 1:
raise click.ClickException("You must provide more than one Vault to sync.")
ctx.invoke(copy, to_vault_name=vaults[0], from_vault_names=vaults[1:], service=service)
ctx.invoke(copy, to_vault=vaults[0], from_vaults=vaults[1:], service=service)
for i in range(1, len(vaults)):
ctx.invoke(copy, to_vault_name=vaults[i], from_vault_names=[vaults[i - 1]], service=service)
ctx.invoke(copy, to_vault=vaults[i], from_vaults=[vaults[i - 1]], service=service)
@kv.command()
@@ -164,7 +135,15 @@ def add(file: Path, service: str, vaults: list[str]) -> None:
log = logging.getLogger("kv")
service = Services.get_tag(service)
vaults_ = _load_vaults(list(vaults))
vaults_ = Vaults()
for vault_name in vaults:
vault = next((x for x in config.key_vaults if x["name"] == vault_name), None)
if not vault:
raise click.ClickException(f"Vault ({vault_name}) is not defined in the config.")
vault_type = vault["type"]
vault_args = vault.copy()
del vault_args["type"]
vaults_.load(vault_type, **vault_args)
data = file.read_text(encoding="utf8")
kid_keys: dict[str, str] = {}
@@ -194,7 +173,15 @@ def prepare(vaults: list[str]) -> None:
"""Create Service Tables on Vaults if not yet created."""
log = logging.getLogger("kv")
vaults_ = _load_vaults(vaults)
vaults_ = Vaults()
for vault_name in vaults:
vault = next((x for x in config.key_vaults if x["name"] == vault_name), None)
if not vault:
raise click.ClickException(f"Vault ({vault_name}) is not defined in the config.")
vault_type = vault["type"]
vault_args = vault.copy()
del vault_args["type"]
vaults_.load(vault_type, **vault_args)
for vault in vaults_:
if hasattr(vault, "has_table") and hasattr(vault, "create_table"):

View File

@@ -1 +1 @@
__version__ = "1.4.5"
__version__ = "1.4.4"

View File

@@ -6,12 +6,10 @@ from typing import Any, Dict, List, Optional, Union
from uuid import UUID
import requests
from pywidevine.cdm import Cdm as WidevineCdm
from pywidevine.device import DeviceTypes
from requests import Session
from unshackle.core.vaults import Vaults
from unshackle.core import __version__
class MockCertificateChain:
@@ -81,17 +79,15 @@ class DecryptLabsRemoteCDM:
Key Features:
- Compatible with both Widevine and PlayReady DRM schemes
- Intelligent caching that compares required vs. available keys
- Optimized caching for L1/L2 devices (leverages API auto-optimization)
- Automatic key combination for mixed cache/license scenarios
- Seamless fallback to license requests when keys are missing
Intelligent Caching System:
1. DRM classes (PlayReady/Widevine) provide required KIDs via set_required_kids()
2. get_license_challenge() first checks for cached keys
3. For L1/L2 devices, always attempts cached keys first (API optimized)
4. If cached keys satisfy requirements, returns empty challenge (no license needed)
5. If keys are missing, makes targeted license request for remaining keys
6. parse_license() combines cached and license keys intelligently
3. If cached keys satisfy requirements, returns empty challenge (no license needed)
4. If keys are missing, makes targeted license request for remaining keys
5. parse_license() combines cached and license keys intelligently
"""
service_certificate_challenge = b"\x08\x04"
@@ -151,7 +147,7 @@ class DecryptLabsRemoteCDM:
{
"decrypt-labs-api-key": self.secret,
"Content-Type": "application/json",
"User-Agent": f"unshackle-decrypt-labs-cdm/{__version__}",
"User-Agent": "unshackle-decrypt-labs-cdm/1.0",
}
)
@@ -254,14 +250,12 @@ class DecryptLabsRemoteCDM:
"pssh": None,
"challenge": None,
"decrypt_labs_session_id": None,
"tried_cache": False,
"cached_keys": None,
}
return session_id
def close(self, session_id: bytes) -> None:
"""
Close a CDM session and perform comprehensive cleanup.
Close a CDM session.
Args:
session_id: Session identifier
@@ -272,8 +266,6 @@ class DecryptLabsRemoteCDM:
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
session = self._sessions[session_id]
session.clear()
del self._sessions[session_id]
def get_service_certificate(self, session_id: bytes) -> Optional[bytes]:
@@ -312,13 +304,8 @@ class DecryptLabsRemoteCDM:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
if certificate is None:
if not self._is_playready and self.device_name == "L1":
certificate = WidevineCdm.common_privacy_cert
self._sessions[session_id]["service_certificate"] = base64.b64decode(certificate)
return "Using default Widevine common privacy certificate for L1"
else:
self._sessions[session_id]["service_certificate"] = None
return "No certificate set (not required for this device type)"
self._sessions[session_id]["service_certificate"] = None
return "Removed"
if isinstance(certificate, str):
certificate = base64.b64decode(certificate)
@@ -359,8 +346,6 @@ class DecryptLabsRemoteCDM:
4. Returns empty challenge if all required keys are cached
The intelligent caching works as follows:
- For L1/L2 devices: Always prioritizes cached keys (API automatically optimizes)
- For other devices: Uses cache retry logic based on session state
- With required KIDs set: Only requests license for missing keys
- Without required KIDs: Returns any available cached keys
- For PlayReady: Combines cached keys with license keys seamlessly
@@ -380,7 +365,6 @@ class DecryptLabsRemoteCDM:
Note:
Call set_required_kids() before this method for optimal caching behavior.
L1/L2 devices automatically use cached keys when available per API design.
"""
_ = license_type, privacy_mode
@@ -393,15 +377,10 @@ class DecryptLabsRemoteCDM:
init_data = self._get_init_data_from_pssh(pssh_or_wrm)
already_tried_cache = session.get("tried_cache", False)
if self.device_name in ["L1", "L2"]:
get_cached_keys = True
else:
get_cached_keys = not already_tried_cache
request_data = {
"scheme": self.device_name,
"init_data": init_data,
"get_cached_keys_if_exists": get_cached_keys,
"get_cached_keys_if_exists": not already_tried_cache,
}
if self.device_name in ["L1", "L2", "SL2", "SL3"] and self.service_name:
@@ -455,30 +434,8 @@ class DecryptLabsRemoteCDM:
if missing_kids:
session["cached_keys"] = parsed_keys
if self.device_name in ["L1", "L2"]:
license_request_data = {
"scheme": self.device_name,
"init_data": init_data,
"get_cached_keys_if_exists": False,
}
if self.service_name:
license_request_data["service"] = self.service_name
if session["service_certificate"]:
license_request_data["service_certificate"] = base64.b64encode(
session["service_certificate"]
).decode("utf-8")
else:
license_request_data = request_data.copy()
license_request_data["get_cached_keys_if_exists"] = False
session["decrypt_labs_session_id"] = None
session["challenge"] = None
session["tried_cache"] = False
response = self._http_session.post(
f"{self.host}/get-request", json=license_request_data, timeout=30
)
request_data["get_cached_keys_if_exists"] = False
response = self._http_session.post(f"{self.host}/get-request", json=request_data, timeout=30)
if response.status_code == 200:
data = response.json()
if data.get("message") == "success" and "challenge" in data:
@@ -623,7 +580,6 @@ class DecryptLabsRemoteCDM:
all_keys.append(license_key)
session["keys"] = all_keys
session["cached_keys"] = None
else:
session["keys"] = license_keys

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
import re
import warnings
from pathlib import Path
from typing import Any, Optional
@@ -90,13 +92,116 @@ class Config:
self.tmdb_api_key: str = kwargs.get("tmdb_api_key") or ""
self.update_checks: bool = kwargs.get("update_checks", True)
self.update_check_interval: int = kwargs.get("update_check_interval", 24)
self.scene_naming: bool = kwargs.get("scene_naming", True)
self.series_year: bool = kwargs.get("series_year", True)
# Handle backward compatibility for scene_naming option
self.scene_naming: Optional[bool] = kwargs.get("scene_naming")
self.output_template: dict = kwargs.get("output_template") or {}
# Apply scene_naming compatibility if no output_template is defined
self._apply_scene_naming_compatibility()
# Validate output templates
self._validate_output_templates()
self.title_cache_time: int = kwargs.get("title_cache_time", 1800) # 30 minutes default
self.title_cache_max_retention: int = kwargs.get("title_cache_max_retention", 86400) # 24 hours default
self.title_cache_enabled: bool = kwargs.get("title_cache_enabled", True)
def _apply_scene_naming_compatibility(self) -> None:
"""Apply backward compatibility for the old scene_naming option."""
if self.scene_naming is not None:
# Only apply if no output_template is already defined
if not self.output_template.get("movies") and not self.output_template.get("series"):
if self.scene_naming:
# scene_naming: true = scene-style templates
self.output_template.update(
{
"movies": "{title}.{year}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}",
"series": "{title}.{year?}.{season_episode}.{episode_name?}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}",
"songs": "{track_number}.{title}.{source?}.WEB-DL.{audio_full}.{atmos?}-{tag}",
}
)
else:
# scene_naming: false = Plex-friendly templates
self.output_template.update(
{
"movies": "{title} ({year}) {quality}",
"series": "{title} {season_episode} {episode_name?}",
"songs": "{track_number}. {title}",
}
)
# Warn about deprecated option
warnings.warn(
"The 'scene_naming' option is deprecated. Please use 'output_template' instead. "
"Your current setting has been converted to equivalent templates.",
DeprecationWarning,
stacklevel=2,
)
def _validate_output_templates(self) -> None:
"""Validate output template configurations and warn about potential issues."""
if not self.output_template:
return
# Known template variables for validation
valid_variables = {
# Basic variables
"title",
"year",
"season",
"episode",
"season_episode",
"episode_name",
"quality",
"resolution",
"source",
"tag",
"track_number",
"artist",
"album",
"disc",
# Audio variables
"audio",
"audio_channels",
"audio_full",
"atmos",
"dual",
"multi",
# Video variables
"video",
"hdr",
"hfr",
}
# Filesystem-unsafe characters that could cause issues
unsafe_chars = r'[<>:"/\\|?*]'
for template_type, template_str in self.output_template.items():
if not isinstance(template_str, str):
warnings.warn(f"Template '{template_type}' must be a string, got {type(template_str).__name__}")
continue
# Extract variables from template
variables = re.findall(r"\{([^}]+)\}", template_str)
# Check for unknown variables
for var in variables:
# Remove conditional suffix if present
var_clean = var.rstrip("?")
if var_clean not in valid_variables:
warnings.warn(f"Unknown template variable '{var}' in {template_type} template")
# Check for filesystem-unsafe characters outside of variables
# Replace variables with safe placeholders for testing
test_template = re.sub(r"\{[^}]+\}", "TEST", template_str)
if re.search(unsafe_chars, test_template):
warnings.warn(f"Template '{template_type}' may contain filesystem-unsafe characters")
# Check for empty template
if not template_str.strip():
warnings.warn(f"Template '{template_type}' is empty")
@classmethod
def from_yaml(cls, path: Path) -> Config:
if not path.exists():

View File

@@ -12,6 +12,7 @@ from unshackle.core.config import config
from unshackle.core.constants import AUDIO_CODEC_MAP, DYNAMIC_RANGE_MAP, VIDEO_CODEC_MAP
from unshackle.core.titles.title import Title
from unshackle.core.utilities import sanitize_filename
from unshackle.core.utils.template_formatter import TemplateFormatter
class Episode(Title):
@@ -78,116 +79,154 @@ class Episode(Title):
self.year = year
self.description = description
def _build_template_context(self, media_info: MediaInfo, show_service: bool = True) -> dict:
"""Build template context dictionary from MediaInfo."""
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
context = {
"title": self.title.replace("$", "S"),
"year": self.year or "",
"season": f"S{self.season:02}",
"episode": f"E{self.number:02}",
"season_episode": f"S{self.season:02}E{self.number:02}",
"episode_name": self.name or "",
"tag": config.tag or "",
"source": self.service.__name__ if show_service else "",
}
# Video information
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")]
if len(aspect_ratio) == 1:
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
resolution = int(primary_video_track.width * (9 / 16))
context.update(
{
"quality": f"{resolution}p",
"resolution": str(resolution),
"video": VIDEO_CODEC_MAP.get(primary_video_track.format, primary_video_track.format),
}
)
# HDR information
hdr_format = primary_video_track.hdr_format_commercial
trc = primary_video_track.transfer_characteristics or primary_video_track.transfer_characteristics_original
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
context["hdr"] = "DV"
base_layer = DYNAMIC_RANGE_MAP.get(hdr_format)
if base_layer and base_layer != "DV":
context["hdr"] += f".{base_layer}"
else:
context["hdr"] = DYNAMIC_RANGE_MAP.get(hdr_format, "")
elif trc and "HLG" in trc:
context["hdr"] = "HLG"
else:
context["hdr"] = ""
# High frame rate
frame_rate = float(primary_video_track.frame_rate)
context["hfr"] = "HFR" if frame_rate > 30 else ""
# Audio information
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
context.update(
{
"audio": AUDIO_CODEC_MAP.get(codec, codec),
"audio_channels": f"{channels:.1f}",
"audio_full": f"{AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}",
"atmos": "Atmos" if ("JOC" in features or primary_audio_track.joc) else "",
}
)
# Multi-language audio
if unique_audio_languages == 2:
context["dual"] = "DUAL"
context["multi"] = ""
elif unique_audio_languages > 2:
context["dual"] = ""
context["multi"] = "MULTi"
else:
context["dual"] = ""
context["multi"] = ""
return context
def __str__(self) -> str:
return "{title}{year} S{season:02}E{number:02} {name}".format(
title=self.title,
year=f" {self.year}" if self.year and config.series_year else "",
year=f" {self.year}" if self.year else "",
season=self.season,
number=self.number,
name=self.name or "",
).strip()
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
# Title [Year] SXXEXX Name (or Title [Year] SXX if folder)
if folder:
name = f"{self.title}"
if self.year and config.series_year:
name += f" {self.year}"
name += f" S{self.season:02}"
else:
name = "{title}{year} S{season:02}E{number:02} {name}".format(
title=self.title.replace("$", "S"), # e.g., Arli$$
year=f" {self.year}" if self.year and config.series_year else "",
season=self.season,
number=self.number,
name=self.name or "",
).strip()
# For folders, use the series template but exclude episode-specific variables
series_template = config.output_template.get("series")
if series_template:
# Create a folder-friendly version by removing episode-specific variables
folder_template = series_template
# Remove episode number and episode name from template for folders
folder_template = re.sub(r'\{episode\}', '', folder_template)
folder_template = re.sub(r'\{episode_name\?\}', '', folder_template)
folder_template = re.sub(r'\{episode_name\}', '', folder_template)
folder_template = re.sub(r'\{season_episode\}', '{season}', folder_template)
if config.scene_naming:
# Resolution
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [
int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")
]
if len(aspect_ratio) == 1:
# e.g., aspect ratio of 2 (2.00:1) would end up as `(2.0,)`, add 1
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
# We want the resolution represented in a 4:3 or 16:9 canvas.
# If it's not 4:3 or 16:9, calculate as if it's inside a 16:9 canvas,
# otherwise the track's height value is fine.
# We are assuming this title is some weird aspect ratio so most
# likely a movie or HD source, so it's most likely widescreen so
# 16:9 canvas makes the most sense.
resolution = int(primary_video_track.width * (9 / 16))
name += f" {resolution}p"
# Clean up any double separators that might result
folder_template = re.sub(r'\.{2,}', '.', folder_template)
folder_template = re.sub(r'\s{2,}', ' ', folder_template)
folder_template = re.sub(r'^[\.\s]+|[\.\s]+$', '', folder_template)
# Service
if show_service:
name += f" {self.service.__name__}"
formatter = TemplateFormatter(folder_template)
context = self._build_template_context(media_info, show_service)
# Override season_episode with just season for folders
context['season'] = f"S{self.season:02}"
# 'WEB-DL'
name += " WEB-DL"
folder_name = formatter.format(context)
# DUAL
if unique_audio_languages == 2:
name += " DUAL"
# MULTi
if unique_audio_languages > 2:
name += " MULTi"
# Audio Codec + Channels (+ feature)
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(
sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" "))
)
# Keep the same separator style as the series template
if '.' in series_template and ' ' not in series_template:
# Dot-based template - use dot separator for folders too
return sanitize_filename(folder_name, ".")
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
# Space-based template - use space separator
return sanitize_filename(folder_name, " ")
else:
# Fallback to simple naming if no template defined
name = f"{self.title}"
if self.year:
name += f" {self.year}"
name += f" S{self.season:02}"
return sanitize_filename(name, " ")
features = primary_audio_track.format_additionalfeatures or ""
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or primary_audio_track.joc:
name += " Atmos"
# Use template from output_template (which includes scene_naming compatibility)
# or fallback to default scene-style template
template = (
config.output_template.get("series")
or "{title}.{year?}.{season_episode}.{episode_name?}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hfr?}.{video}-{tag}"
)
# Video (dynamic range + hfr +) Codec
if primary_video_track:
codec = primary_video_track.format
hdr_format = primary_video_track.hdr_format_commercial
trc = (
primary_video_track.transfer_characteristics
or primary_video_track.transfer_characteristics_original
)
frame_rate = float(primary_video_track.frame_rate)
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
name += " DV"
if DYNAMIC_RANGE_MAP.get(hdr_format) and DYNAMIC_RANGE_MAP.get(hdr_format) != "DV":
name += " HDR"
else:
name += f" {DYNAMIC_RANGE_MAP.get(hdr_format)} "
elif trc and "HLG" in trc:
name += " HLG"
if frame_rate > 30:
name += " HFR"
name += f" {VIDEO_CODEC_MAP.get(codec, codec)}"
if config.tag:
name += f"-{config.tag}"
return sanitize_filename(name)
else:
# Simple naming style without technical details - use spaces instead of dots
return sanitize_filename(name, " ")
formatter = TemplateFormatter(template)
context = self._build_template_context(media_info, show_service)
return formatter.format(context)
class Series(SortedKeyList, ABC):
@@ -197,7 +236,7 @@ class Series(SortedKeyList, ABC):
def __str__(self) -> str:
if not self:
return super().__str__()
return self[0].title + (f" ({self[0].year})" if self[0].year and config.series_year else "")
return self[0].title + (f" ({self[0].year})" if self[0].year else "")
def tree(self, verbose: bool = False) -> Tree:
seasons = Counter(x.season for x in self)

View File

@@ -9,7 +9,7 @@ from sortedcontainers import SortedKeyList
from unshackle.core.config import config
from unshackle.core.constants import AUDIO_CODEC_MAP, DYNAMIC_RANGE_MAP, VIDEO_CODEC_MAP
from unshackle.core.titles.title import Title
from unshackle.core.utilities import sanitize_filename
from unshackle.core.utils.template_formatter import TemplateFormatter
class Movie(Title):
@@ -45,100 +45,107 @@ class Movie(Title):
self.year = year
self.description = description
def _build_template_context(self, media_info: MediaInfo, show_service: bool = True) -> dict:
"""Build template context dictionary from MediaInfo."""
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
context = {
"title": self.name.replace("$", "S"),
"year": self.year or "",
"tag": config.tag or "",
"source": self.service.__name__ if show_service else "",
}
# Video information
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")]
if len(aspect_ratio) == 1:
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
resolution = int(primary_video_track.width * (9 / 16))
context.update(
{
"quality": f"{resolution}p",
"resolution": str(resolution),
"video": VIDEO_CODEC_MAP.get(primary_video_track.format, primary_video_track.format),
}
)
# HDR information
hdr_format = primary_video_track.hdr_format_commercial
trc = primary_video_track.transfer_characteristics or primary_video_track.transfer_characteristics_original
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
context["hdr"] = "DV"
base_layer = DYNAMIC_RANGE_MAP.get(hdr_format)
if base_layer and base_layer != "DV":
context["hdr"] += f".{base_layer}"
else:
context["hdr"] = DYNAMIC_RANGE_MAP.get(hdr_format, "")
elif trc and "HLG" in trc:
context["hdr"] = "HLG"
else:
context["hdr"] = ""
# High frame rate
frame_rate = float(primary_video_track.frame_rate)
context["hfr"] = "HFR" if frame_rate > 30 else ""
# Audio information
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
context.update(
{
"audio": AUDIO_CODEC_MAP.get(codec, codec),
"audio_channels": f"{channels:.1f}",
"audio_full": f"{AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}",
"atmos": "Atmos" if ("JOC" in features or primary_audio_track.joc) else "",
}
)
# Multi-language audio
if unique_audio_languages == 2:
context["dual"] = "DUAL"
context["multi"] = ""
elif unique_audio_languages > 2:
context["dual"] = ""
context["multi"] = "MULTi"
else:
context["dual"] = ""
context["multi"] = ""
return context
def __str__(self) -> str:
if self.year:
return f"{self.name} ({self.year})"
return self.name
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
# Use template from output_template (which includes scene_naming compatibility)
# or fallback to default scene-style template
template = (
config.output_template.get("movies")
or "{title}.{year}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}"
)
# Name (Year)
name = str(self).replace("$", "S") # e.g., Arli$$
if config.scene_naming:
# Resolution
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [
int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")
]
if len(aspect_ratio) == 1:
# e.g., aspect ratio of 2 (2.00:1) would end up as `(2.0,)`, add 1
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
# We want the resolution represented in a 4:3 or 16:9 canvas.
# If it's not 4:3 or 16:9, calculate as if it's inside a 16:9 canvas,
# otherwise the track's height value is fine.
# We are assuming this title is some weird aspect ratio so most
# likely a movie or HD source, so it's most likely widescreen so
# 16:9 canvas makes the most sense.
resolution = int(primary_video_track.width * (9 / 16))
name += f" {resolution}p"
# Service
if show_service:
name += f" {self.service.__name__}"
# 'WEB-DL'
name += " WEB-DL"
# DUAL
if unique_audio_languages == 2:
name += " DUAL"
# MULTi
if unique_audio_languages > 2:
name += " MULTi"
# Audio Codec + Channels (+ feature)
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(
sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" "))
)
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or primary_audio_track.joc:
name += " Atmos"
# Video (dynamic range + hfr +) Codec
if primary_video_track:
codec = primary_video_track.format
hdr_format = primary_video_track.hdr_format_commercial
trc = (
primary_video_track.transfer_characteristics
or primary_video_track.transfer_characteristics_original
)
frame_rate = float(primary_video_track.frame_rate)
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
name += " DV"
if DYNAMIC_RANGE_MAP.get(hdr_format) and DYNAMIC_RANGE_MAP.get(hdr_format) != "DV":
name += " HDR"
else:
name += f" {DYNAMIC_RANGE_MAP.get(hdr_format)} "
elif trc and "HLG" in trc:
name += " HLG"
if frame_rate > 30:
name += " HFR"
name += f" {VIDEO_CODEC_MAP.get(codec, codec)}"
if config.tag:
name += f"-{config.tag}"
return sanitize_filename(name)
else:
# Simple naming style without technical details - use spaces instead of dots
return sanitize_filename(name, " ")
formatter = TemplateFormatter(template)
context = self._build_template_context(media_info, show_service)
return formatter.format(context)
class Movies(SortedKeyList, ABC):

View File

@@ -10,6 +10,7 @@ from unshackle.core.config import config
from unshackle.core.constants import AUDIO_CODEC_MAP
from unshackle.core.titles.title import Title
from unshackle.core.utilities import sanitize_filename
from unshackle.core.utils.template_formatter import TemplateFormatter
class Song(Title):
@@ -81,46 +82,63 @@ class Song(Title):
artist=self.artist, album=self.album, year=self.year, track=self.track, name=self.name
).strip()
def _build_template_context(self, media_info: MediaInfo, show_service: bool = True) -> dict:
"""Build template context dictionary from MediaInfo."""
primary_audio_track = next(iter(media_info.audio_tracks), None)
context = {
"artist": self.artist.replace("$", "S"),
"album": self.album.replace("$", "S"),
"title": self.name.replace("$", "S"),
"track_number": f"{self.track:02}",
"disc": f"{self.disc:02}" if self.disc > 1 else "",
"year": self.year or "",
"tag": config.tag or "",
"source": self.service.__name__ if show_service else "",
}
# Audio information
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
context.update(
{
"audio": AUDIO_CODEC_MAP.get(codec, codec),
"audio_channels": f"{channels:.1f}",
"audio_full": f"{AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}",
"atmos": "Atmos" if ("JOC" in features or primary_audio_track.joc) else "",
}
)
return context
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
audio_track = next(iter(media_info.audio_tracks), None)
codec = audio_track.format
channel_layout = audio_track.channel_layout or audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = audio_track.channel_s or audio_track.channels or 0
channels = float(channel_count)
features = audio_track.format_additionalfeatures or ""
if folder:
# Artist - Album (Year)
name = str(self).split(" / ")[0]
else:
# NN. Song Name
name = str(self).split(" / ")[1]
if config.scene_naming:
# Service
if show_service:
name += f" {self.service.__name__}"
# 'WEB-DL'
name += " WEB-DL"
# Audio Codec + Channels (+ feature)
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or audio_track.joc:
name += " Atmos"
if config.tag:
name += f"-{config.tag}"
return sanitize_filename(name, " ")
else:
# Simple naming style without technical details
# For folders, use simple naming: "Artist - Album (Year)"
name = f"{self.artist} - {self.album}"
if self.year:
name += f" ({self.year})"
return sanitize_filename(name, " ")
# Use template from output_template (which includes scene_naming compatibility)
# or fallback to default scene-style template
template = (
config.output_template.get("songs") or "{track_number}.{title}.{source?}.WEB-DL.{audio_full}.{atmos?}-{tag}"
)
formatter = TemplateFormatter(template)
context = self._build_template_context(media_info, show_service)
return formatter.format(context)
class Album(SortedKeyList, ABC):
def __init__(self, iterable: Optional[Iterable] = None):

View File

@@ -0,0 +1,147 @@
import logging
import re
from typing import Any, Dict, List
from unshackle.core.utilities import sanitize_filename
class TemplateFormatter:
"""
Template formatter for custom filename patterns.
Supports variable substitution and conditional variables.
Example: '{title}.{year}.{quality?}.{source}-{tag}'
"""
def __init__(self, template: str):
"""Initialize the template formatter.
Args:
template: Template string with variables in {variable} format
"""
self.template = template
self.variables = self._extract_variables()
def _extract_variables(self) -> List[str]:
"""Extract all variables from the template."""
pattern = r"\{([^}]+)\}"
matches = re.findall(pattern, self.template)
return [match.strip() for match in matches]
def format(self, context: Dict[str, Any]) -> str:
"""Format the template with the provided context.
Args:
context: Dictionary containing variable values
Returns:
Formatted filename string
Raises:
ValueError: If required template variables are missing from context
"""
logger = logging.getLogger(__name__)
# Validate that all required variables are present
is_valid, missing_vars = self.validate(context)
if not is_valid:
error_msg = f"Missing required template variables: {', '.join(missing_vars)}"
logger.error(error_msg)
raise ValueError(error_msg)
try:
result = self.template
for variable in self.variables:
placeholder = "{" + variable + "}"
is_conditional = variable.endswith("?")
if is_conditional:
# Remove the ? for conditional variables
var_name = variable[:-1]
value = context.get(var_name, "")
if value:
# Replace with actual value, ensuring it's string and safe
safe_value = str(value).strip()
result = result.replace(placeholder, safe_value)
else:
# Remove the placeholder entirely for empty conditional variables
result = result.replace(placeholder, "")
else:
# Regular variable
value = context.get(variable, "")
if value is None:
logger.warning(f"Template variable '{variable}' is None, using empty string")
value = ""
safe_value = str(value).strip()
result = result.replace(placeholder, safe_value)
# Clean up multiple consecutive dots/separators and other artifacts
result = re.sub(r"\.{2,}", ".", result) # Multiple dots -> single dot
result = re.sub(r"\s{2,}", " ", result) # Multiple spaces -> single space
result = re.sub(r"^[\.\s]+|[\.\s]+$", "", result) # Remove leading/trailing dots and spaces
result = re.sub(r"\.-", "-", result) # Remove dots before dashes (for dot-based templates)
result = re.sub(r"[\.\s]+\)", ")", result) # Remove dots/spaces before closing parentheses
# Determine the appropriate separator based on template style
# If the template contains spaces (like Plex-friendly), preserve them
if " " in self.template and "." not in self.template:
# Space-based template (Plex-friendly) - use space separator
result = sanitize_filename(result, spacer=" ")
else:
# Dot-based template (scene-style) - use dot separator
result = sanitize_filename(result, spacer=".")
# Final validation - ensure we have a non-empty result
if not result or result.isspace():
logger.warning("Template formatting resulted in empty filename, using fallback")
return "untitled"
logger.debug(f"Template formatted successfully: '{self.template}' -> '{result}'")
return result
except Exception as e:
logger.error(f"Error formatting template '{self.template}': {e}")
# Return a safe fallback filename
fallback = f"error_formatting_{hash(self.template) % 10000}"
logger.warning(f"Using fallback filename: {fallback}")
return fallback
def validate(self, context: Dict[str, Any]) -> tuple[bool, List[str]]:
"""Validate that all required variables are present in context.
Args:
context: Dictionary containing variable values
Returns:
Tuple of (is_valid, missing_variables)
"""
missing = []
for variable in self.variables:
is_conditional = variable.endswith("?")
var_name = variable[:-1] if is_conditional else variable
# Only check non-conditional variables
if not is_conditional and var_name not in context:
missing.append(var_name)
return len(missing) == 0, missing
def get_required_variables(self) -> List[str]:
"""Get list of required (non-conditional) variables."""
required = []
for variable in self.variables:
if not variable.endswith("?"):
required.append(variable)
return required
def get_optional_variables(self) -> List[str]:
"""Get list of optional (conditional) variables."""
optional = []
for variable in self.variables:
if variable.endswith("?"):
optional.append(variable[:-1]) # Remove the ?
return optional

View File

@@ -282,10 +282,6 @@ class EXAMPLE(Service):
return chapters
def get_widevine_service_certificate(self, **_: any) -> str:
"""Return the Widevine service certificate from config, if available."""
return self.config.get("certificate")
def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]:
"""Retrieve a PlayReady license for a given track."""

View File

@@ -10,15 +10,45 @@ tag_imdb_tmdb: true
# Set terminal background color (custom option not in CONFIG.md)
set_terminal_bg: false
# Set file naming convention
# true for style - Prime.Suspect.S07E01.The.Final.Act.Part.One.1080p.ITV.WEB-DL.AAC2.0.H.264
# false for style - Prime Suspect S07E01 The Final Act - Part One
scene_naming: true
# File naming is now controlled via output_template (see below)
# Default behavior provides scene-style naming similar to the old scene_naming: true
#
# BACKWARD COMPATIBILITY: The old scene_naming option is still supported:
# scene_naming: true -> Equivalent to scene-style templates (dot-separated)
# scene_naming: false -> Equivalent to Plex-friendly templates (space-separated)
# Note: output_template takes precedence over scene_naming if both are defined
# Whether to include the year in series names for episodes and folders (default: true)
# true for style - Show Name (2023) S01E01 Episode Name
# false for style - Show Name S01E01 Episode Name
series_year: true
# Custom output templates for filenames
# When not defined, defaults to scene-style naming equivalent to the old scene_naming: true
# Available variables: {title}, {year}, {season}, {episode}, {season_episode}, {episode_name},
# {quality}, {resolution}, {source}, {audio}, {audio_channels}, {audio_full},
# {video}, {hdr}, {hfr}, {atmos}, {dual}, {multi}, {tag}
# Conditional variables (included only if present): Add ? suffix like {year?}, {episode_name?}, {hdr?}
# Uncomment and customize the templates below:
#
# output_template:
# # Scene-style naming (dot-separated) - Default behavior when no template is defined
# movies: '{title}.{year}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}'
# series: '{title}.{year?}.{season_episode}.{episode_name?}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}'
#
# # Plex-friendly naming (space-separated, clean format)
# # movies: '{title} ({year}) {quality}'
# # series: '{title} {season_episode} {episode_name?}'
#
# # Minimal naming (basic info only)
# # movies: '{title}.{year}.{quality}'
# # series: '{title}.{season_episode}.{episode_name?}'
#
# # Custom scene-style with specific elements
# # movies: '{title}.{year}.{quality}.{hdr?}.{source}.WEB-DL.{audio_full}.{video}-{tag}'
# # series: '{title}.{year?}.{season_episode}.{episode_name?}.{quality}.{hdr?}.{source}.WEB-DL.{audio_full}.{atmos?}.{video}-{tag}'
#
# Example outputs:
# Scene movies: 'The.Matrix.1999.1080p.NF.WEB-DL.DDP5.1.H.264-EXAMPLE'
# Scene movies (HDR): 'Dune.2021.2160p.HBO.WEB-DL.DDP5.1.HDR10.H.265-EXAMPLE'
# Scene series: 'Breaking.Bad.2008.S01E01.Pilot.1080p.NF.WEB-DL.DDP5.1.H.264-EXAMPLE'
# Plex movies: 'The Matrix (1999) 1080p'
# Plex series: 'Breaking Bad S01E01 Pilot'
# Check for updates from GitHub repository on startup (default: true)
update_checks: true
@@ -106,16 +136,16 @@ remote_cdm:
secret: secret_key
- name: "decrypt_labs_chrome"
type: "decrypt_labs" # Required to identify as DecryptLabs CDM
device_name: "ChromeCDM" # Scheme identifier - must match exactly
type: "decrypt_labs" # Required to identify as DecryptLabs CDM
device_name: "ChromeCDM" # Scheme identifier - must match exactly
device_type: CHROME
system_id: 4464 # Doesn't matter
security_level: 3
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here" # Replace with your API key
secret: "your_decrypt_labs_api_key_here" # Replace with your API key
- name: "decrypt_labs_l1"
type: "decrypt_labs"
device_name: "L1" # Scheme identifier - must match exactly
device_name: "L1" # Scheme identifier - must match exactly
device_type: ANDROID
system_id: 4464
security_level: 1
@@ -124,7 +154,7 @@ remote_cdm:
- name: "decrypt_labs_l2"
type: "decrypt_labs"
device_name: "L2" # Scheme identifier - must match exactly
device_name: "L2" # Scheme identifier - must match exactly
device_type: ANDROID
system_id: 4464
security_level: 2
@@ -133,7 +163,7 @@ remote_cdm:
- name: "decrypt_labs_playready_sl2"
type: "decrypt_labs"
device_name: "SL2" # Scheme identifier - must match exactly
device_name: "SL2" # Scheme identifier - must match exactly
device_type: PLAYREADY
system_id: 0
security_level: 2000
@@ -142,7 +172,7 @@ remote_cdm:
- name: "decrypt_labs_playready_sl3"
type: "decrypt_labs"
device_name: "SL3" # Scheme identifier - must match exactly
device_name: "SL3" # Scheme identifier - must match exactly
device_type: PLAYREADY
system_id: 0
security_level: 3000

View File

@@ -131,27 +131,16 @@ class MySQL(Vault):
if any(isinstance(kid, UUID) for kid, key_ in kid_keys.items()):
kid_keys = {kid.hex if isinstance(kid, UUID) else kid: key_ for kid, key_ in kid_keys.items()}
if not kid_keys:
return 0
conn = self.conn_factory.get()
cursor = conn.cursor()
try:
placeholders = ",".join(["%s"] * len(kid_keys))
cursor.execute(f"SELECT kid FROM `{service}` WHERE kid IN ({placeholders})", list(kid_keys.keys()))
existing_kids = {row["kid"] for row in cursor.fetchall()}
new_keys = {kid: key for kid, key in kid_keys.items() if kid not in existing_kids}
if not new_keys:
return 0
cursor.executemany(
f"INSERT INTO `{service}` (kid, key_) VALUES (%s, %s)",
new_keys.items(),
# TODO: SQL injection risk
f"INSERT IGNORE INTO `{service}` (kid, key_) VALUES (%s, %s)",
kid_keys.items(),
)
return len(new_keys)
return cursor.rowcount
finally:
conn.commit()
cursor.close()

View File

@@ -102,27 +102,16 @@ class SQLite(Vault):
if any(isinstance(kid, UUID) for kid, key_ in kid_keys.items()):
kid_keys = {kid.hex if isinstance(kid, UUID) else kid: key_ for kid, key_ in kid_keys.items()}
if not kid_keys:
return 0
conn = self.conn_factory.get()
cursor = conn.cursor()
try:
placeholders = ",".join(["?"] * len(kid_keys))
cursor.execute(f"SELECT kid FROM `{service}` WHERE kid IN ({placeholders})", list(kid_keys.keys()))
existing_kids = {row[0] for row in cursor.fetchall()}
new_keys = {kid: key for kid, key in kid_keys.items() if kid not in existing_kids}
if not new_keys:
return 0
cursor.executemany(
f"INSERT INTO `{service}` (kid, key_) VALUES (?, ?)",
new_keys.items(),
# TODO: SQL injection risk
f"INSERT OR IGNORE INTO `{service}` (kid, key_) VALUES (?, ?)",
kid_keys.items(),
)
return len(new_keys)
return cursor.rowcount
finally:
conn.commit()
cursor.close()