23 Commits

Author SHA1 Message Date
Andy
4564be6204 feat: Implement custom output templates for flexible filename generation and backward compatibility 2025-09-03 00:18:21 +00:00
Andy
d9763184bd Merge branch 'main' into feature/custom-output-templates 2025-09-02 23:08:29 +00:00
Andy
86bb162868 feat(tags): Enhance tag handling for TV shows and movies from Simkl data
Fixes #15
2025-09-02 22:01:44 +00:00
Andy
501cfd68e8 fix(cdm): Add error message for missing service certificate in CDM session 2025-09-02 19:16:34 +00:00
Andy
76fb2eea95 feat: implement intelligent caching system for CDM license requests 2025-09-02 18:48:34 +00:00
Andy
ea5ec40bcd Merge branch 'main' of https://github.com/unshackle-dl/unshackle 2025-09-02 17:34:12 +00:00
Andy
329850b043 feat(cdm): Enhance key retrieval logic and improve cached keys handling 2025-09-02 17:33:31 +00:00
Andy
73595f3b50 feat(cdm): Enhance key retrieval logic and improve cached keys handling 2025-09-02 17:23:02 +00:00
Andy
1e82283133 fix(tags): Fix import order. 2025-09-02 04:13:43 +00:00
Andy
ab13dde9d2 feat(changelog): Update changelog for version 1.4.4 with enhanced CDM support, configuration options, and various improvements 2025-09-02 04:10:28 +00:00
Andy
9fd0895128 feat(cdm): Refactor DecryptLabsRemoteCDM full support for Widevine/Playready and ChromeCDM 2025-09-02 04:02:52 +00:00
Andy
ed744205ad fix(tags): 🐛 Fix Matroska tag compliance with official specification
- Update IMDB tags to use ID only (tt123456) instead of URLs
  - Update TMDB tags to use prefix/id format (movie/123456, tv/123456)
  - Update TVDB tags to use numeric ID only
  - Add XML escaping for tag values
  - Fix XML declaration to use double quotes

Fixes #15
2025-09-01 21:02:08 +00:00
Andy
3ef43afeed feat(cdm): Add DecryptLabs CDM configurations for Chrome and PlayReady devices with updated User-Agent and service certificate 2025-09-01 00:34:07 +00:00
Andy
26851cbe7c feat(cdm): Enhance DecryptLabsRemoteCDM with improved session management and caching support and better support for remote WV/PR 2025-09-01 00:31:00 +00:00
Andy
b4efdf3f2c feat(cdm): Enhance DecryptLabsRemoteCDM to support cached keys and improve license handling 2025-08-28 17:09:55 +00:00
Andy
eb30620626 fix(main): As requested old devine version removed from banner to avoid any confusion the developer of this software. Original GNU is still applys. 2025-08-26 23:16:00 +00:00
Andy
7b71d6631c fix(main): As requested old devine version removed from banner to avoid any confusion the developer of this software. Original GNU is still applys. 2025-08-26 22:49:46 +00:00
Andy
fbada7ac4d feat(song): Enhance filename generation with custom template support and audio context 2025-08-22 05:32:22 +00:00
Andy
5949931b56 feat(config): Add new configuration options for device certificate status list and language preferences 2025-08-20 05:28:58 +00:00
Andy
ddfc0555c9 style(config): Clean up unshackle-example.yaml with correct accurate information. 2025-08-20 05:20:59 +00:00
Andy
3dda3290d3 feat(release): Bump version to 1.4.3 and update changelog with new features and improvements 2025-08-20 05:10:45 +00:00
Andy
19ff200617 refactor(drm): Simplify decrypt method by removing unused parameter and streamline logic 2025-08-20 05:10:38 +00:00
Andy
e30a3c71c7 feat(template): Implement custom filename template formatter with variable substitution 2025-08-15 23:37:12 +00:00
18 changed files with 1644 additions and 442 deletions

View File

@@ -5,6 +5,90 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- **Custom Output Templates**: Flexible filename customization system
- New `output_template` configuration in unshackle.yaml for movies, series, and songs
- Support for conditional variables using `?` suffix (e.g., `{year?}`, `{hdr?}`)
- Comprehensive template variables for title, quality, audio, video, and metadata
- Multiple naming styles: Scene-style (dot-separated), Plex-friendly (space-separated), minimal, custom
- Automatic template validation and enhanced error handling
- **Full backward compatibility**: Old `scene_naming` option still works and automatically converts to equivalent templates
- Folder naming now follows series template patterns (excluding episode-specific variables)
- Deprecation warnings guide users to migrate from `scene_naming` to `output_template`
### Changed
- **Filename Generation**: Updated all title classes (Movie, Episode, Song) to use new template system
- Enhanced context building for template variable substitution
- Improved separator handling based on template style detection
- Better handling of conditional content like HDR, Atmos, and multi-language audio
## [1.4.4] - 2025-09-02
### Added
- **Enhanced DecryptLabs CDM Support**: Comprehensive remote CDM functionality
- Full support for Widevine, PlayReady, and ChromeCDM through DecryptLabsRemoteCDM
- Enhanced session management and caching support for remote WV/PR operations
- Support for cached keys and improved license handling
- New CDM configurations for Chrome and PlayReady devices with updated User-Agent and service certificate
- **Advanced Configuration Options**: New device and language preferences
- Added configuration options for device certificate status list
- Enhanced language preference settings
### Changed
- **DRM Decryption Enhancements**: Streamlined decryption process
- Simplified decrypt method by removing unused parameter and streamlined logic
- Improved DecryptLabs CDM configurations with better device support
### Fixed
- **Matroska Tag Compliance**: Enhanced media container compatibility
- Fixed Matroska tag compliance with official specification
- **Application Branding**: Cleaned up version display
- Removed old devine version reference from banner to avoid developer confusion
- Updated branding while maintaining original GNU license compliance
- **IP Information Handling**: Improved geolocation services
- Enhanced get_ip_info functionality with better failover handling
- Added support for 429 error handling and multiple API provider fallback
- Implemented cached IP info retrieval with fallback tester to avoid rate limiting
- **Dependencies**: Streamlined package requirements
- Removed unnecessary data extra requirement from langcodes
### Removed
- Deprecated version references in application banner for clarity
## [1.4.3] - 2025-08-20
### Added
- Cached IP info helper for region detection
- New `get_cached_ip_info()` with 24h cache and provider rotation (ipinfo/ipapi) with 429 handling.
- Reduces external calls and stabilizes non-proxy region lookups for caching/logging.
### Changed
- DRM decryption selection is fully configuration-driven
- Widevine and PlayReady now select the decrypter based solely on `decryption` in YAML (including per-service mapping).
- Shaka Packager remains the default decrypter when not specified.
- `dl.py` logs the chosen tool based on the resolved configuration.
- Geofencing and proxy verification improvements
- Safer geofence checks with error handling and clearer logs.
- Always verify proxy exit region via live IP lookup; fallback to proxy parsing on failure.
- Example config updated to default to Shaka
- `unshackle.yaml`/example now sets `decryption.default: shaka` (service overrides still supported).
### Removed
- Deprecated parameter `use_mp4decrypt`
- Removed from `Widevine.decrypt()` and `PlayReady.decrypt()` and all callsites.
- Internal naming switched from mp4decrypt-specific flags to generic `decrypter` selection.
## [1.4.2] - 2025-08-14
### Added

110
CONFIG.md
View File

@@ -141,6 +141,11 @@ The following directories are available and may be overridden,
- `logs` - Logs.
- `wvds` - Widevine Devices.
- `prds` - PlayReady Devices.
- `dcsl` - Device Certificate Status List.
Notes:
- `services` accepts either a single directory or a list of directories to search for service modules.
For example,
@@ -165,6 +170,14 @@ For example to set the default primary language to download to German,
lang: de
```
You can also set multiple preferred languages using a list, e.g.,
```yaml
lang:
- en
- fr
```
to set how many tracks to download concurrently to 4 and download threads to 16,
```yaml
@@ -302,6 +315,11 @@ Note: SQLite and MySQL vaults have to connect directly to the Host/IP. It cannot
Beware that some Hosting Providers do not let you access the MySQL server outside their intranet and may not be
accessible outside their hosting platform.
Additional behavior:
- `no_push` (bool): Optional per-vault flag. When `true`, the vault will not receive pushed keys (writes) but
will still be queried and can provide keys for lookups. Useful for read-only/backup vaults.
### Using an API Vault
API vaults use a specific HTTP request format, therefore API or HTTP Key Vault APIs from other projects or services may
@@ -314,6 +332,7 @@ not work in unshackle. The API format can be seen in the [API Vault Code](unshac
# uri: "127.0.0.1:80/key-vault"
# uri: "https://api.example.com/key-vault"
token: "random secret key" # authorization token
# no_push: true # optional; make this API vault read-only (lookups only)
```
### Using a MySQL Vault
@@ -329,6 +348,7 @@ A MySQL Vault can be on a local or remote network, but I recommend SQLite for lo
database: vault # database used for unshackle
username: jane11
password: Doe123
# no_push: false # optional; defaults to false
```
I recommend giving only a trustable user (or yourself) CREATE permission and then use unshackle to cache at least one CEK
@@ -352,6 +372,7 @@ case something happens to your MySQL Vault.
- type: SQLite
name: "My Local Vault" # arbitrary vault name
path: "C:/Users/Jane11/Documents/unshackle/data/key_vault.db"
# no_push: true # optional; commonly true for local backup vaults
```
**Note**: You do not need to create the file at the specified path.
@@ -394,7 +415,7 @@ n_m3u8dl_re:
Set your NordVPN Service credentials with `username` and `password` keys to automate the use of NordVPN as a Proxy
system where required.
You can also specify specific servers to use per-region with the `servers` key.
You can also specify specific servers to use per-region with the `server_map` key.
Sometimes a specific server works best for a service than others, so hard-coding one for a day or two helps.
For example,
@@ -403,8 +424,8 @@ For example,
nordvpn:
username: zxqsR7C5CyGwmGb6KSvk8qsZ # example of the login format
password: wXVHmht22hhRKUEQ32PQVjCZ
servers:
- us: 12 # force US server #12 for US proxies
server_map:
us: 12 # force US server #12 for US proxies
```
The username and password should NOT be your normal NordVPN Account Credentials.
@@ -443,7 +464,7 @@ second proxy of the US list.
Set your NordVPN Service credentials with `username` and `password` keys to automate the use of NordVPN as a Proxy
system where required.
You can also specify specific servers to use per-region with the `servers` key.
You can also specify specific servers to use per-region with the `server_map` key.
Sometimes a specific server works best for a service than others, so hard-coding one for a day or two helps.
For example,
@@ -451,8 +472,8 @@ For example,
```yaml
username: zxqsR7C5CyGwmGb6KSvk8qsZ # example of the login format
password: wXVHmht22hhRKUEQ32PQVjCZ
servers:
- us: 12 # force US server #12 for US proxies
server_map:
us: 12 # force US server #12 for US proxies
```
The username and password should NOT be your normal NordVPN Account Credentials.
@@ -463,6 +484,20 @@ You can even set a specific server number this way, e.g., `--proxy=gb2366`.
Note that `gb` is used instead of `uk` to be more consistent across regional systems.
### surfsharkvpn (dict)
Enable Surfshark VPN proxy service using Surfshark Service credentials (not your login password).
You may pin specific server IDs per region using `server_map`.
```yaml
username: your_surfshark_service_username # https://my.surfshark.com/vpn/manual-setup/main/openvpn
password: your_surfshark_service_password # service credentials, not account password
server_map:
us: 3844 # force US server #3844
gb: 2697 # force GB server #2697
au: 4621 # force AU server #4621
```
### hola (dict)
Enable Hola VPN proxy service. This is a simple provider that doesn't require configuration.
@@ -497,6 +532,15 @@ For example,
[pywidevine]: https://github.com/rlaphoenix/pywidevine
## scene_naming (bool)
Set scene-style naming for titles. When `true` uses scene naming patterns (e.g., `Prime.Suspect.S07E01...`), when
`false` uses a more human-readable style (e.g., `Prime Suspect S07E01 ...`). Default: `true`.
## series_year (bool)
Whether to include the series year in series names for episodes and folders. Default: `true`.
## serve (dict)
Configuration data for pywidevine's serve functionality run through unshackle.
@@ -561,6 +605,27 @@ set_terminal_bg: true
Group or Username to postfix to the end of all download filenames following a dash.
For example, `tag: "J0HN"` will have `-J0HN` at the end of all download filenames.
## tag_group_name (bool)
Enable/disable tagging downloads with your group name when `tag` is set. Default: `true`.
## tag_imdb_tmdb (bool)
Enable/disable tagging downloaded files with IMDB/TMDB/TVDB identifiers (when available). Default: `true`.
## title_cache_enabled (bool)
Enable/disable caching of title metadata to reduce redundant API calls. Default: `true`.
## title_cache_time (int)
Cache duration in seconds for title metadata. Default: `1800` (30 minutes).
## title_cache_max_retention (int)
Maximum retention time in seconds for serving slightly stale cached title metadata when API calls fail.
Default: `86400` (24 hours). Effective retention is `min(title_cache_time + grace, title_cache_max_retention)`.
## tmdb_api_key (str)
API key for The Movie Database (TMDB). This is used for tagging downloaded files with TMDB,
@@ -580,3 +645,36 @@ tmdb_api_key: cf66bf18956kca5311ada3bebb84eb9a # Not a real key
```
**Note**: Keep your API key secure and do not share it publicly. This key is used by the core/utils/tags.py module to fetch metadata from TMDB for proper file tagging.
## subtitle (dict)
Control subtitle conversion and SDH (hearing-impaired) stripping behavior.
- `conversion_method`: How to convert subtitles between formats. Default: `auto`.
- `auto`: Use subby for WebVTT/SAMI, standard for others.
- `subby`: Always use subby with CommonIssuesFixer.
- `subtitleedit`: Prefer SubtitleEdit when available; otherwise fallback to standard conversion.
- `pycaption`: Use only the pycaption library (no SubtitleEdit, no subby).
- `sdh_method`: How to strip SDH cues. Default: `auto`.
- `auto`: Try subby for SRT first, then SubtitleEdit, then filter-subs.
- `subby`: Use subbys SDHStripper (SRT only).
- `subtitleedit`: Use SubtitleEdits RemoveTextForHI when available.
- `filter-subs`: Use the subtitle-filter library.
Example:
```yaml
subtitle:
conversion_method: auto
sdh_method: auto
```
## update_checks (bool)
Check for updates from the GitHub repository on startup. Default: `true`.
## update_check_interval (int)
How often to check for updates, in hours. Default: `24`.

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "unshackle"
version = "1.4.2"
version = "1.4.4"
description = "Modular Movie, TV, and Music Archival Software."
authors = [{ name = "unshackle team" }]
requires-python = ">=3.10,<3.13"

View File

@@ -248,7 +248,9 @@ class dl:
)
@click.option("--downloads", type=int, default=1, help="Amount of tracks to download concurrently.")
@click.option("--no-cache", "no_cache", is_flag=True, default=False, help="Bypass title cache for this download.")
@click.option("--reset-cache", "reset_cache", is_flag=True, default=False, help="Clear title cache before fetching.")
@click.option(
"--reset-cache", "reset_cache", is_flag=True, default=False, help="Clear title cache before fetching."
)
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> dl:
return dl(ctx, **kwargs)
@@ -294,20 +296,8 @@ class dl:
if getattr(config, "downloader_map", None):
config.downloader = config.downloader_map.get(self.service, config.downloader)
with console.status("Loading DRM CDM...", spinner="dots"):
try:
self.cdm = self.get_cdm(self.service, self.profile)
except ValueError as e:
self.log.error(f"Failed to load CDM, {e}")
sys.exit(1)
if self.cdm:
if hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]:
self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})")
else:
self.log.info(
f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})"
)
if getattr(config, "decryption_map", None):
config.decryption = config.decryption_map.get(self.service, config.decryption)
with console.status("Loading Key Vaults...", spinner="dots"):
self.vaults = Vaults(self.service)
@@ -347,6 +337,24 @@ class dl:
else:
self.log.debug("No vaults are currently active")
with console.status("Loading DRM CDM...", spinner="dots"):
try:
self.cdm = self.get_cdm(self.service, self.profile)
except ValueError as e:
self.log.error(f"Failed to load CDM, {e}")
sys.exit(1)
if self.cdm:
if isinstance(self.cdm, DecryptLabsRemoteCDM):
drm_type = "PlayReady" if self.cdm.is_playready else "Widevine"
self.log.info(f"Loaded {drm_type} Remote CDM: DecryptLabs (L{self.cdm.security_level})")
elif hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]:
self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})")
else:
self.log.info(
f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})"
)
self.proxy_providers = []
if no_proxy:
ctx.params["proxy"] = None
@@ -531,7 +539,7 @@ class dl:
else:
console.print(Padding("Search -> [bright_black]No match found[/]", (0, 5)))
if self.tmdb_id and getattr(self, 'search_source', None) != 'simkl':
if self.tmdb_id and getattr(self, "search_source", None) != "simkl":
kind = "tv" if isinstance(title, Episode) else "movie"
tags.external_ids(self.tmdb_id, kind)
if self.tmdb_year:
@@ -869,7 +877,12 @@ class dl:
),
licence=partial(
service.get_playready_license
if isinstance(self.cdm, PlayReadyCdm)
if (
isinstance(self.cdm, PlayReadyCdm)
or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
)
)
and hasattr(service, "get_playready_license")
else service.get_widevine_license,
title=title,
@@ -1001,12 +1014,7 @@ class dl:
# Handle DRM decryption BEFORE repacking (must decrypt first!)
service_name = service.__class__.__name__.upper()
decryption_method = config.decryption_map.get(service_name, config.decryption)
use_mp4decrypt = decryption_method.lower() == "mp4decrypt"
if use_mp4decrypt:
decrypt_tool = "mp4decrypt"
else:
decrypt_tool = "Shaka Packager"
decrypt_tool = "mp4decrypt" if decryption_method.lower() == "mp4decrypt" else "Shaka Packager"
drm_tracks = [track for track in title.tracks if track.drm]
if drm_tracks:
@@ -1015,7 +1023,7 @@ class dl:
for track in drm_tracks:
drm = track.get_drm_for_cdm(self.cdm)
if drm and hasattr(drm, "decrypt"):
drm.decrypt(track.path, use_mp4decrypt=use_mp4decrypt)
drm.decrypt(track.path)
has_decrypted = True
events.emit(events.Types.TRACK_REPACKED, track=track)
else:
@@ -1201,10 +1209,22 @@ class dl:
if not drm:
return
if isinstance(drm, Widevine) and not isinstance(self.cdm, WidevineCdm):
self.cdm = self.get_cdm(self.service, self.profile, drm="widevine")
elif isinstance(drm, PlayReady) and not isinstance(self.cdm, PlayReadyCdm):
self.cdm = self.get_cdm(self.service, self.profile, drm="playready")
if isinstance(drm, Widevine):
if not isinstance(self.cdm, (WidevineCdm, DecryptLabsRemoteCDM)) or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
):
widevine_cdm = self.get_cdm(self.service, self.profile, drm="widevine")
if widevine_cdm:
self.log.info("Switching to Widevine CDM for Widevine content")
self.cdm = widevine_cdm
elif isinstance(drm, PlayReady):
if not isinstance(self.cdm, (PlayReadyCdm, DecryptLabsRemoteCDM)) or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and not self.cdm.is_playready
):
playready_cdm = self.get_cdm(self.service, self.profile, drm="playready")
if playready_cdm:
self.log.info("Switching to PlayReady CDM for PlayReady content")
self.cdm = playready_cdm
if isinstance(drm, Widevine):
with self.DRM_TABLE_LOCK:
@@ -1442,8 +1462,8 @@ class dl:
return Credential(*credentials)
return Credential.loads(credentials) # type: ignore
@staticmethod
def get_cdm(
self,
service: str,
profile: Optional[str] = None,
drm: Optional[str] = None,
@@ -1477,10 +1497,18 @@ class dl:
cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None)
if cdm_api:
is_decrypt_lab = True if cdm_api["type"] == "decrypt_labs" else False
del cdm_api["name"]
del cdm_api["type"]
return DecryptLabsRemoteCDM(service_name=service, **cdm_api) if is_decrypt_lab else RemoteCdm(**cdm_api)
is_decrypt_lab = True if cdm_api.get("type") == "decrypt_labs" else False
if is_decrypt_lab:
del cdm_api["name"]
del cdm_api["type"]
# All DecryptLabs CDMs use DecryptLabsRemoteCDM
return DecryptLabsRemoteCDM(service_name=service, vaults=self.vaults, **cdm_api)
else:
del cdm_api["name"]
if "type" in cdm_api:
del cdm_api["type"]
return RemoteCdm(**cdm_api)
prd_path = config.directories.prds / f"{cdm_name}.prd"
if not prd_path.is_file():

View File

@@ -1 +1 @@
__version__ = "1.4.2"
__version__ = "1.4.4"

View File

@@ -69,7 +69,7 @@ def main(version: bool, debug: bool, log_path: Path) -> None:
r" ▀▀▀ ▀▀ █▪ ▀▀▀▀ ▀▀▀ · ▀ ▀ ·▀▀▀ ·▀ ▀.▀▀▀ ▀▀▀ ",
style="ascii.art",
),
"v 3.3.3 Copyright © 2019-2025 rlaphoenix" + f"\nv [repr.number]{__version__}[/] - unshackle",
f"v [repr.number]{__version__}[/] - © 2025 - github.com/unshackle-dl/unshackle",
),
(1, 11, 1, 10),
expand=True,

View File

@@ -1,143 +1,658 @@
from __future__ import annotations
import base64
import secrets
from typing import Optional, Type, Union
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
import requests
from pywidevine import PSSH, Device, DeviceTypes, Key, RemoteCdm
from pywidevine.license_protocol_pb2 import SignedDrmCertificate, SignedMessage
from pywidevine.device import DeviceTypes
from requests import Session
# Copyright 2024 by DevYukine.
from unshackle.core.vaults import Vaults
class DecryptLabsRemoteCDM(RemoteCdm):
class MockCertificateChain:
"""Mock certificate chain for PlayReady compatibility."""
def __init__(self, name: str):
self._name = name
def get_name(self) -> str:
return self._name
class Key:
"""Key object compatible with pywidevine."""
def __init__(self, kid: str, key: str, type_: str = "CONTENT"):
if isinstance(kid, str):
clean_kid = kid.replace("-", "")
if len(clean_kid) == 32:
self.kid = UUID(hex=clean_kid)
else:
self.kid = UUID(hex=clean_kid.ljust(32, "0"))
else:
self.kid = kid
if isinstance(key, str):
self.key = bytes.fromhex(key)
else:
self.key = key
self.type = type_
class DecryptLabsRemoteCDMExceptions:
"""Exception classes for compatibility with pywidevine CDM."""
class InvalidSession(Exception):
"""Raised when session ID is invalid."""
class TooManySessions(Exception):
"""Raised when session limit is reached."""
class InvalidInitData(Exception):
"""Raised when PSSH/init data is invalid."""
class InvalidLicenseType(Exception):
"""Raised when license type is invalid."""
class InvalidLicenseMessage(Exception):
"""Raised when license message is invalid."""
class InvalidContext(Exception):
"""Raised when session has no context data."""
class SignatureMismatch(Exception):
"""Raised when signature verification fails."""
class DecryptLabsRemoteCDM:
"""
Decrypt Labs Remote CDM implementation with intelligent caching system.
This class provides a drop-in replacement for pywidevine's local CDM using
Decrypt Labs' KeyXtractor API service, enhanced with smart caching logic
that minimizes unnecessary license requests.
Key Features:
- Compatible with both Widevine and PlayReady DRM schemes
- Intelligent caching that compares required vs. available keys
- Automatic key combination for mixed cache/license scenarios
- Seamless fallback to license requests when keys are missing
Intelligent Caching System:
1. DRM classes (PlayReady/Widevine) provide required KIDs via set_required_kids()
2. get_license_challenge() first checks for cached keys
3. If cached keys satisfy requirements, returns empty challenge (no license needed)
4. If keys are missing, makes targeted license request for remaining keys
5. parse_license() combines cached and license keys intelligently
"""
service_certificate_challenge = b"\x08\x04"
def __init__(
self,
device_type: Union[DeviceTypes, str],
system_id: int,
security_level: int,
host: str,
secret: str,
device_name: str,
service_name: str,
host: str = "https://keyxtractor.decryptlabs.com",
device_name: str = "ChromeCDM",
service_name: Optional[str] = None,
vaults: Optional[Vaults] = None,
device_type: Optional[str] = None,
system_id: Optional[int] = None,
security_level: Optional[int] = None,
**kwargs,
):
self.response_counter = 0
self.pssh = None
self.api_session_ids = {}
self.license_request = None
self.service_name = service_name
self.keys = {}
try:
super().__init__(device_type, system_id, security_level, host, secret, device_name)
except Exception:
pass
self.req_session = requests.Session()
self.req_session.headers.update({"decrypt-labs-api-key": secret})
"""
Initialize Decrypt Labs Remote CDM for Widevine and PlayReady schemes.
@classmethod
def from_device(cls, device: Device) -> Type["DecryptLabsRemoteCDM"]:
raise NotImplementedError("You cannot load a DecryptLabsRemoteCDM from a local Device file.")
Args:
secret: Decrypt Labs API key (matches config format)
host: Decrypt Labs API host URL (matches config format)
device_name: DRM scheme (ChromeCDM, L1, L2 for Widevine; SL2, SL3 for PlayReady)
service_name: Service name for key caching and vault operations
vaults: Vaults instance for local key caching
device_type: Device type (CHROME, ANDROID, PLAYREADY) - for compatibility
system_id: System ID - for compatibility
security_level: Security level - for compatibility
"""
_ = kwargs
def open(self) -> bytes:
# We stub this method to return a random session ID for now, later we save the api session id and resolve by our random generated one.
return bytes.fromhex(secrets.token_hex(16))
self.secret = secret
self.host = host.rstrip("/")
self.device_name = device_name
self.service_name = service_name or ""
self.vaults = vaults
self.uch = self.host != "https://keyxtractor.decryptlabs.com"
def close(self, session_id: bytes) -> None:
# We stub this method to do nothing.
pass
self._device_type_str = device_type
if device_type:
self.device_type = self._get_device_type_enum(device_type)
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
if isinstance(certificate, bytes):
certificate = base64.b64encode(certificate).decode()
self._is_playready = (device_type and device_type.upper() == "PLAYREADY") or (device_name in ["SL2", "SL3"])
# certificate needs to be base64 to be sent off to the API.
# it needs to intentionally be kept as base64 encoded SignedMessage.
if self._is_playready:
self.system_id = system_id or 0
self.security_level = security_level or (2000 if device_name == "SL2" else 3000)
else:
self.system_id = system_id or 26830
self.security_level = security_level or 3
self.req_session.signed_device_certificate = certificate
self.req_session.privacy_mode = True
return "success"
def get_service_certificate(self, session_id: bytes) -> Optional[SignedDrmCertificate]:
raise NotImplementedError("This method is not implemented in this CDM")
def get_license_challenge(
self, session_id: bytes, pssh: PSSH, license_type: str = "STREAMING", privacy_mode: bool = True
) -> bytes:
self.pssh = pssh
res = self.session(
self.host + "/get-request",
self._sessions: Dict[bytes, Dict[str, Any]] = {}
self._pssh_b64 = None
self._required_kids: Optional[List[str]] = None
self._http_session = Session()
self._http_session.headers.update(
{
"init_data": self.pssh.dumps(),
"service_certificate": self.req_session.signed_device_certificate,
"scheme": "widevine",
"service": self.service_name,
},
"decrypt-labs-api-key": self.secret,
"Content-Type": "application/json",
"User-Agent": "unshackle-decrypt-labs-cdm/1.0",
}
)
self.license_request = res["challenge"]
self.api_session_ids[session_id] = res["session_id"]
return base64.b64decode(self.license_request)
def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None:
session_id_api = self.api_session_ids[session_id]
if session_id not in self.keys:
self.keys[session_id] = []
session_keys = self.keys[session_id]
if isinstance(license_message, dict) and "keys" in license_message:
session_keys.extend(
[
Key(kid=Key.kid_to_uuid(x["kid"]), type_=x.get("type", "CONTENT"), key=bytes.fromhex(x["key"]))
for x in license_message["keys"]
]
)
def _get_device_type_enum(self, device_type: str):
"""Convert device type string to enum for compatibility."""
device_type_upper = device_type.upper()
if device_type_upper == "ANDROID":
return DeviceTypes.ANDROID
elif device_type_upper == "CHROME":
return DeviceTypes.CHROME
else:
res = self.session(
self.host + "/decrypt-response",
{
"session_id": session_id_api,
"init_data": self.pssh.dumps(),
"license_request": self.license_request,
"license_response": license_message,
"scheme": "widevine",
},
)
return DeviceTypes.CHROME
original_keys = res["keys"].replace("\n", " ")
keys_separated = original_keys.split("--key ")
formatted_keys = []
for k in keys_separated:
if ":" in k:
key = k.strip()
formatted_keys.append(key)
for keys in formatted_keys:
session_keys.append(
(
Key(
kid=UUID(bytes=bytes.fromhex(keys.split(":")[0])),
type_="CONTENT",
key=bytes.fromhex(keys.split(":")[1]),
)
)
@property
def is_playready(self) -> bool:
"""Check if this CDM is in PlayReady mode."""
return self._is_playready
@property
def certificate_chain(self) -> MockCertificateChain:
"""Mock certificate chain for PlayReady compatibility."""
return MockCertificateChain(f"{self.device_name}_Remote")
def set_pssh_b64(self, pssh_b64: str) -> None:
"""Store base64-encoded PSSH data for PlayReady compatibility."""
self._pssh_b64 = pssh_b64
def set_required_kids(self, kids: List[Union[str, UUID]]) -> None:
"""
Set the required Key IDs for intelligent caching decisions.
This method enables the CDM to make smart decisions about when to request
additional keys via license challenges. When cached keys are available,
the CDM will compare them against the required KIDs to determine if a
license request is still needed for missing keys.
Args:
kids: List of required Key IDs as UUIDs or hex strings
Note:
Should be called by DRM classes (PlayReady/Widevine) before making
license challenge requests to enable optimal caching behavior.
"""
self._required_kids = []
for kid in kids:
if isinstance(kid, UUID):
self._required_kids.append(str(kid).replace("-", "").lower())
else:
self._required_kids.append(str(kid).replace("-", "").lower())
def _generate_session_id(self) -> bytes:
"""Generate a unique session ID."""
return secrets.token_bytes(16)
def _get_init_data_from_pssh(self, pssh: Any) -> str:
"""Extract init data from various PSSH formats."""
if self.is_playready and self._pssh_b64:
return self._pssh_b64
if hasattr(pssh, "dumps"):
dumps_result = pssh.dumps()
if isinstance(dumps_result, str):
try:
base64.b64decode(dumps_result)
return dumps_result
except Exception:
return base64.b64encode(dumps_result.encode("utf-8")).decode("utf-8")
else:
return base64.b64encode(dumps_result).decode("utf-8")
elif hasattr(pssh, "raw"):
raw_data = pssh.raw
if isinstance(raw_data, str):
raw_data = raw_data.encode("utf-8")
return base64.b64encode(raw_data).decode("utf-8")
elif hasattr(pssh, "__class__") and "WrmHeader" in pssh.__class__.__name__:
if self.is_playready:
raise ValueError("PlayReady WRM header received but no PSSH B64 was set via set_pssh_b64()")
if hasattr(pssh, "raw_bytes"):
return base64.b64encode(pssh.raw_bytes).decode("utf-8")
elif hasattr(pssh, "bytes"):
return base64.b64encode(pssh.bytes).decode("utf-8")
else:
raise ValueError(f"Cannot extract PSSH data from WRM header type: {type(pssh)}")
else:
raise ValueError(f"Unsupported PSSH type: {type(pssh)}")
def open(self) -> bytes:
"""
Open a new CDM session.
Returns:
Session identifier as bytes
"""
session_id = self._generate_session_id()
self._sessions[session_id] = {
"service_certificate": None,
"keys": [],
"pssh": None,
"challenge": None,
"decrypt_labs_session_id": None,
}
return session_id
def close(self, session_id: bytes) -> None:
"""
Close a CDM session.
Args:
session_id: Session identifier
Raises:
ValueError: If session ID is invalid
"""
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
del self._sessions[session_id]
def get_service_certificate(self, session_id: bytes) -> Optional[bytes]:
"""
Get the service certificate for a session.
Args:
session_id: Session identifier
Returns:
Service certificate if set, None otherwise
Raises:
ValueError: If session ID is invalid
"""
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
return self._sessions[session_id]["service_certificate"]
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
"""
Set the service certificate for a session.
Args:
session_id: Session identifier
certificate: Service certificate (bytes or base64 string)
Returns:
Certificate status message
Raises:
ValueError: If session ID is invalid
"""
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
if certificate is None:
self._sessions[session_id]["service_certificate"] = None
return "Removed"
if isinstance(certificate, str):
certificate = base64.b64decode(certificate)
self._sessions[session_id]["service_certificate"] = certificate
return "Successfully set Service Certificate"
def has_cached_keys(self, session_id: bytes) -> bool:
"""
Check if cached keys are available for the session.
Args:
session_id: Session identifier
Returns:
True if cached keys are available
Raises:
ValueError: If session ID is invalid
"""
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
session = self._sessions[session_id]
session_keys = session.get("keys", [])
return len(session_keys) > 0
def get_license_challenge(
self, session_id: bytes, pssh_or_wrm: Any, license_type: str = "STREAMING", privacy_mode: bool = True
) -> bytes:
"""
Generate a license challenge using Decrypt Labs API with intelligent caching.
This method implements smart caching logic that:
1. First attempts to retrieve cached keys from the API
2. If required KIDs are set, compares cached keys against requirements
3. Only makes a license request if keys are missing
4. Returns empty challenge if all required keys are cached
The intelligent caching works as follows:
- With required KIDs set: Only requests license for missing keys
- Without required KIDs: Returns any available cached keys
- For PlayReady: Combines cached keys with license keys seamlessly
Args:
session_id: Session identifier
pssh_or_wrm: PSSH object or WRM header (for PlayReady compatibility)
license_type: Type of license (STREAMING, OFFLINE, AUTOMATIC) - for compatibility only
privacy_mode: Whether to use privacy mode - for compatibility only
Returns:
License challenge as bytes, or empty bytes if cached keys satisfy requirements
Raises:
InvalidSession: If session ID is invalid
requests.RequestException: If API request fails
Note:
Call set_required_kids() before this method for optimal caching behavior.
"""
_ = license_type, privacy_mode
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
session = self._sessions[session_id]
session["pssh"] = pssh_or_wrm
init_data = self._get_init_data_from_pssh(pssh_or_wrm)
already_tried_cache = session.get("tried_cache", False)
request_data = {
"scheme": self.device_name,
"init_data": init_data,
"get_cached_keys_if_exists": not already_tried_cache,
}
if self.device_name in ["L1", "L2", "SL2", "SL3"] and self.service_name:
request_data["service"] = self.service_name
if session["service_certificate"]:
request_data["service_certificate"] = base64.b64encode(session["service_certificate"]).decode("utf-8")
response = self._http_session.post(f"{self.host}/get-request", json=request_data, timeout=30)
if response.status_code != 200:
raise requests.RequestException(f"API request failed: {response.status_code} {response.text}")
data = response.json()
if data.get("message") != "success":
error_msg = data.get("message", "Unknown error")
if "details" in data:
error_msg += f" - Details: {data['details']}"
if "error" in data:
error_msg += f" - Error: {data['error']}"
if "service_certificate is required" in str(data) and not session["service_certificate"]:
error_msg += " (No service certificate was provided to the CDM session)"
raise requests.RequestException(f"API error: {error_msg}")
message_type = data.get("message_type")
if message_type == "cached-keys" or "cached_keys" in data:
"""
Handle cached keys response from API.
When the API returns cached keys, we need to determine if they satisfy
our requirements or if we need to make an additional license request
for missing keys.
"""
cached_keys = data.get("cached_keys", [])
parsed_keys = self._parse_cached_keys(cached_keys)
session["keys"] = parsed_keys
session["tried_cache"] = True
if self._required_kids:
cached_kids = set()
for key in parsed_keys:
if isinstance(key, dict) and "kid" in key:
cached_kids.add(key["kid"].replace("-", "").lower())
required_kids = set(self._required_kids)
missing_kids = required_kids - cached_kids
if missing_kids:
session["cached_keys"] = parsed_keys
request_data["get_cached_keys_if_exists"] = False
response = self._http_session.post(f"{self.host}/get-request", json=request_data, timeout=30)
if response.status_code == 200:
data = response.json()
if data.get("message") == "success" and "challenge" in data:
challenge = base64.b64decode(data["challenge"])
session["challenge"] = challenge
session["decrypt_labs_session_id"] = data["session_id"]
return challenge
return b""
else:
return b""
else:
return b""
if message_type == "license-request" or "challenge" in data:
challenge = base64.b64decode(data["challenge"])
session["challenge"] = challenge
session["decrypt_labs_session_id"] = data["session_id"]
return challenge
error_msg = f"Unexpected API response format. message_type={message_type}, available_fields={list(data.keys())}"
if data.get("message"):
error_msg = f"API response: {data['message']} - {error_msg}"
if "details" in data:
error_msg += f" - Details: {data['details']}"
if "error" in data:
error_msg += f" - Error: {data['error']}"
if already_tried_cache and data.get("message") == "success":
return b""
raise requests.RequestException(error_msg)
def parse_license(self, session_id: bytes, license_message: Union[bytes, str]) -> None:
"""
Parse license response using Decrypt Labs API with intelligent key combination.
For PlayReady content with partial cached keys, this method intelligently
combines the cached keys with newly obtained license keys, avoiding
duplicates while ensuring all required keys are available.
The key combination process:
1. Extracts keys from the license response
2. If cached keys exist (PlayReady), combines them with license keys
3. Removes duplicate keys by comparing normalized KIDs
4. Updates the session with the complete key set
Args:
session_id: Session identifier
license_message: License response from license server
Raises:
ValueError: If session ID is invalid or no challenge available
requests.RequestException: If API request fails
"""
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
session = self._sessions[session_id]
if session["keys"] and not (self.is_playready and "cached_keys" in session):
return
if not session.get("challenge") or not session.get("decrypt_labs_session_id"):
raise ValueError("No challenge available - call get_license_challenge first")
if isinstance(license_message, str):
if self.is_playready and license_message.strip().startswith("<?xml"):
license_message = license_message.encode("utf-8")
else:
try:
license_message = base64.b64decode(license_message)
except Exception:
license_message = license_message.encode("utf-8")
pssh = session["pssh"]
init_data = self._get_init_data_from_pssh(pssh)
license_request_b64 = base64.b64encode(session["challenge"]).decode("utf-8")
license_response_b64 = base64.b64encode(license_message).decode("utf-8")
request_data = {
"scheme": self.device_name,
"session_id": session["decrypt_labs_session_id"],
"init_data": init_data,
"license_request": license_request_b64,
"license_response": license_response_b64,
}
response = self._http_session.post(f"{self.host}/decrypt-response", json=request_data, timeout=30)
if response.status_code != 200:
raise requests.RequestException(f"License decrypt failed: {response.status_code} {response.text}")
data = response.json()
if data.get("message") != "success":
error_msg = data.get("message", "Unknown error")
if "error" in data:
error_msg += f" - Error: {data['error']}"
if "details" in data:
error_msg += f" - Details: {data['details']}"
raise requests.RequestException(f"License decrypt error: {error_msg}")
license_keys = self._parse_keys_response(data)
if self.is_playready and "cached_keys" in session:
"""
Combine cached keys with license keys for PlayReady content.
This ensures we have both the cached keys (obtained earlier) and
any additional keys from the license response, without duplicates.
"""
cached_keys = session.get("cached_keys", [])
all_keys = list(cached_keys)
for license_key in license_keys:
already_exists = False
license_kid = None
if isinstance(license_key, dict) and "kid" in license_key:
license_kid = license_key["kid"].replace("-", "").lower()
elif hasattr(license_key, "kid"):
license_kid = str(license_key.kid).replace("-", "").lower()
elif hasattr(license_key, "key_id"):
license_kid = str(license_key.key_id).replace("-", "").lower()
if license_kid:
for cached_key in cached_keys:
cached_kid = None
if isinstance(cached_key, dict) and "kid" in cached_key:
cached_kid = cached_key["kid"].replace("-", "").lower()
elif hasattr(cached_key, "kid"):
cached_kid = str(cached_key.kid).replace("-", "").lower()
elif hasattr(cached_key, "key_id"):
cached_kid = str(cached_key.key_id).replace("-", "").lower()
if cached_kid == license_kid:
already_exists = True
break
if not already_exists:
all_keys.append(license_key)
session["keys"] = all_keys
else:
session["keys"] = license_keys
if self.vaults and session["keys"]:
key_dict = {UUID(hex=key["kid"]): key["key"] for key in session["keys"] if key["type"] == "CONTENT"}
self.vaults.add_keys(key_dict)
def get_keys(self, session_id: bytes, type_: Optional[str] = None) -> List[Key]:
"""
Get keys from the session.
Args:
session_id: Session identifier
type_: Optional key type filter (CONTENT, SIGNING, etc.)
Returns:
List of Key objects
Raises:
InvalidSession: If session ID is invalid
"""
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
key_dicts = self._sessions[session_id]["keys"]
keys = [Key(kid=k["kid"], key=k["key"], type_=k["type"]) for k in key_dicts]
if type_:
keys = [key for key in keys if key.type == type_]
return keys
def _parse_cached_keys(self, cached_keys_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Parse cached keys from API response.
Args:
cached_keys_data: List of cached key objects from API
Returns:
List of key dictionaries
"""
keys = []
try:
if cached_keys_data and isinstance(cached_keys_data, list):
for key_data in cached_keys_data:
if "kid" in key_data and "key" in key_data:
keys.append({"kid": key_data["kid"], "key": key_data["key"], "type": "CONTENT"})
except Exception:
pass
return keys
def _parse_keys_response(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Parse keys from decrypt response."""
keys = []
if "keys" in data and isinstance(data["keys"], str):
keys_string = data["keys"]
for line in keys_string.split("\n"):
line = line.strip()
if line.startswith("--key "):
key_part = line[6:]
if ":" in key_part:
kid, key = key_part.split(":", 1)
keys.append({"kid": kid.strip(), "key": key.strip(), "type": "CONTENT"})
elif "keys" in data and isinstance(data["keys"], list):
for key_data in data["keys"]:
keys.append(
{"kid": key_data.get("kid"), "key": key_data.get("key"), "type": key_data.get("type", "CONTENT")}
)
def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]:
return self.keys[session_id]
return keys
def session(self, url, data, retries=3):
res = self.req_session.post(url, json=data).json()
if res.get("message") != "success":
if "License Response Decryption Process Failed at the very beginning" in res.get("Error", ""):
if retries > 0:
return self.session(url, data, retries=retries - 1)
else:
raise ValueError(f"CDM API returned an error: {res['Error']}")
else:
raise ValueError(f"CDM API returned an error: {res['Error']}")
return res
__all__ = ["DecryptLabsRemoteCDM"]

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
import re
import warnings
from pathlib import Path
from typing import Any, Optional
@@ -90,13 +92,116 @@ class Config:
self.tmdb_api_key: str = kwargs.get("tmdb_api_key") or ""
self.update_checks: bool = kwargs.get("update_checks", True)
self.update_check_interval: int = kwargs.get("update_check_interval", 24)
self.scene_naming: bool = kwargs.get("scene_naming", True)
self.series_year: bool = kwargs.get("series_year", True)
# Handle backward compatibility for scene_naming option
self.scene_naming: Optional[bool] = kwargs.get("scene_naming")
self.output_template: dict = kwargs.get("output_template") or {}
# Apply scene_naming compatibility if no output_template is defined
self._apply_scene_naming_compatibility()
# Validate output templates
self._validate_output_templates()
self.title_cache_time: int = kwargs.get("title_cache_time", 1800) # 30 minutes default
self.title_cache_max_retention: int = kwargs.get("title_cache_max_retention", 86400) # 24 hours default
self.title_cache_enabled: bool = kwargs.get("title_cache_enabled", True)
def _apply_scene_naming_compatibility(self) -> None:
"""Apply backward compatibility for the old scene_naming option."""
if self.scene_naming is not None:
# Only apply if no output_template is already defined
if not self.output_template.get("movies") and not self.output_template.get("series"):
if self.scene_naming:
# scene_naming: true = scene-style templates
self.output_template.update(
{
"movies": "{title}.{year}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}",
"series": "{title}.{year?}.{season_episode}.{episode_name?}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}",
"songs": "{track_number}.{title}.{source?}.WEB-DL.{audio_full}.{atmos?}-{tag}",
}
)
else:
# scene_naming: false = Plex-friendly templates
self.output_template.update(
{
"movies": "{title} ({year}) {quality}",
"series": "{title} {season_episode} {episode_name?}",
"songs": "{track_number}. {title}",
}
)
# Warn about deprecated option
warnings.warn(
"The 'scene_naming' option is deprecated. Please use 'output_template' instead. "
"Your current setting has been converted to equivalent templates.",
DeprecationWarning,
stacklevel=2,
)
def _validate_output_templates(self) -> None:
"""Validate output template configurations and warn about potential issues."""
if not self.output_template:
return
# Known template variables for validation
valid_variables = {
# Basic variables
"title",
"year",
"season",
"episode",
"season_episode",
"episode_name",
"quality",
"resolution",
"source",
"tag",
"track_number",
"artist",
"album",
"disc",
# Audio variables
"audio",
"audio_channels",
"audio_full",
"atmos",
"dual",
"multi",
# Video variables
"video",
"hdr",
"hfr",
}
# Filesystem-unsafe characters that could cause issues
unsafe_chars = r'[<>:"/\\|?*]'
for template_type, template_str in self.output_template.items():
if not isinstance(template_str, str):
warnings.warn(f"Template '{template_type}' must be a string, got {type(template_str).__name__}")
continue
# Extract variables from template
variables = re.findall(r"\{([^}]+)\}", template_str)
# Check for unknown variables
for var in variables:
# Remove conditional suffix if present
var_clean = var.rstrip("?")
if var_clean not in valid_variables:
warnings.warn(f"Unknown template variable '{var}' in {template_type} template")
# Check for filesystem-unsafe characters outside of variables
# Replace variables with safe placeholders for testing
test_template = re.sub(r"\{[^}]+\}", "TEST", template_str)
if re.search(unsafe_chars, test_template):
warnings.warn(f"Template '{template_type}' may contain filesystem-unsafe characters")
# Check for empty template
if not template_str.strip():
warnings.warn(f"Template '{template_type}' is empty")
@classmethod
def from_yaml(cls, path: Path) -> Config:
if not path.exists():

View File

@@ -224,41 +224,79 @@ class PlayReady:
def kids(self) -> list[UUID]:
return self._kids
def get_content_keys(self, cdm: PlayReadyCdm, certificate: Callable, licence: Callable) -> None:
for kid in self.kids:
if kid in self.content_keys:
def _extract_keys_from_cdm(self, cdm: PlayReadyCdm, session_id: bytes) -> dict:
"""Extract keys from CDM session with cross-library compatibility.
Args:
cdm: CDM instance
session_id: Session identifier
Returns:
Dictionary mapping KID UUIDs to hex keys
"""
keys = {}
for key in cdm.get_keys(session_id):
if hasattr(key, "key_id"):
kid = key.key_id
elif hasattr(key, "kid"):
kid = key.kid
else:
continue
session_id = cdm.open()
try:
challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0])
license_res = licence(challenge=challenge)
if isinstance(license_res, bytes):
license_str = license_res.decode(errors="ignore")
else:
license_str = str(license_res)
if hasattr(key, "key") and hasattr(key.key, "hex"):
key_hex = key.key.hex()
elif hasattr(key, "key") and isinstance(key.key, bytes):
key_hex = key.key.hex()
elif hasattr(key, "key") and isinstance(key.key, str):
key_hex = key.key
else:
continue
if "<License>" not in license_str:
try:
license_str = base64.b64decode(license_str + "===").decode()
except Exception:
pass
keys[kid] = key_hex
return keys
cdm.parse_license(session_id, license_str)
keys = {key.key_id: key.key.hex() for key in cdm.get_keys(session_id)}
self.content_keys.update(keys)
finally:
cdm.close(session_id)
def get_content_keys(self, cdm: PlayReadyCdm, certificate: Callable, licence: Callable) -> None:
session_id = cdm.open()
try:
if hasattr(cdm, "set_pssh_b64") and self.pssh_b64:
cdm.set_pssh_b64(self.pssh_b64)
if hasattr(cdm, "set_required_kids"):
cdm.set_required_kids(self.kids)
challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0])
if challenge:
try:
license_res = licence(challenge=challenge)
if isinstance(license_res, bytes):
license_str = license_res.decode(errors="ignore")
else:
license_str = str(license_res)
if "<License>" not in license_str:
try:
license_str = base64.b64decode(license_str + "===").decode()
except Exception:
pass
cdm.parse_license(session_id, license_str)
except Exception:
raise
keys = self._extract_keys_from_cdm(cdm, session_id)
self.content_keys.update(keys)
finally:
cdm.close(session_id)
if not self.content_keys:
raise PlayReady.Exceptions.EmptyLicense("No Content Keys were within the License")
def decrypt(self, path: Path, use_mp4decrypt: bool = False) -> None:
def decrypt(self, path: Path) -> None:
"""
Decrypt a Track with PlayReady DRM.
Args:
path: Path to the encrypted file to decrypt
use_mp4decrypt: If True, use mp4decrypt instead of Shaka Packager
Raises:
EnvironmentError if the required decryption executable could not be found.
ValueError if the track has not yet been downloaded.
@@ -270,7 +308,9 @@ class PlayReady:
if not path or not path.exists():
raise ValueError("Tried to decrypt a file that does not exist.")
if use_mp4decrypt:
decrypter = str(getattr(config, "decryption", "")).lower()
if decrypter == "mp4decrypt":
return self._decrypt_with_mp4decrypt(path)
else:
return self._decrypt_with_shaka_packager(path)

View File

@@ -185,7 +185,15 @@ class Widevine:
if cert and hasattr(cdm, "set_service_certificate"):
cdm.set_service_certificate(session_id, cert)
cdm.parse_license(session_id, licence(challenge=cdm.get_license_challenge(session_id, self.pssh)))
if hasattr(cdm, "set_required_kids"):
cdm.set_required_kids(self.kids)
challenge = cdm.get_license_challenge(session_id, self.pssh)
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
pass
else:
cdm.parse_license(session_id, licence(challenge=challenge))
self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")}
if not self.content_keys:
@@ -213,10 +221,18 @@ class Widevine:
if cert and hasattr(cdm, "set_service_certificate"):
cdm.set_service_certificate(session_id, cert)
cdm.parse_license(
session_id,
licence(session_id=session_id, challenge=cdm.get_license_challenge(session_id, self.pssh)),
)
if hasattr(cdm, "set_required_kids"):
cdm.set_required_kids(self.kids)
challenge = cdm.get_license_challenge(session_id, self.pssh)
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
pass
else:
cdm.parse_license(
session_id,
licence(session_id=session_id, challenge=challenge),
)
self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")}
if not self.content_keys:
@@ -227,12 +243,11 @@ class Widevine:
finally:
cdm.close(session_id)
def decrypt(self, path: Path, use_mp4decrypt: bool = False) -> None:
def decrypt(self, path: Path) -> None:
"""
Decrypt a Track with Widevine DRM.
Args:
path: Path to the encrypted file to decrypt
use_mp4decrypt: If True, use mp4decrypt instead of Shaka Packager
Raises:
EnvironmentError if the required decryption executable could not be found.
ValueError if the track has not yet been downloaded.
@@ -244,7 +259,9 @@ class Widevine:
if not path or not path.exists():
raise ValueError("Tried to decrypt a file that does not exist.")
if use_mp4decrypt:
decrypter = str(getattr(config, "decryption", "")).lower()
if decrypter == "mp4decrypt":
return self._decrypt_with_mp4decrypt(path)
else:
return self._decrypt_with_shaka_packager(path)

View File

@@ -12,6 +12,7 @@ from unshackle.core.config import config
from unshackle.core.constants import AUDIO_CODEC_MAP, DYNAMIC_RANGE_MAP, VIDEO_CODEC_MAP
from unshackle.core.titles.title import Title
from unshackle.core.utilities import sanitize_filename
from unshackle.core.utils.template_formatter import TemplateFormatter
class Episode(Title):
@@ -78,116 +79,154 @@ class Episode(Title):
self.year = year
self.description = description
def _build_template_context(self, media_info: MediaInfo, show_service: bool = True) -> dict:
"""Build template context dictionary from MediaInfo."""
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
context = {
"title": self.title.replace("$", "S"),
"year": self.year or "",
"season": f"S{self.season:02}",
"episode": f"E{self.number:02}",
"season_episode": f"S{self.season:02}E{self.number:02}",
"episode_name": self.name or "",
"tag": config.tag or "",
"source": self.service.__name__ if show_service else "",
}
# Video information
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")]
if len(aspect_ratio) == 1:
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
resolution = int(primary_video_track.width * (9 / 16))
context.update(
{
"quality": f"{resolution}p",
"resolution": str(resolution),
"video": VIDEO_CODEC_MAP.get(primary_video_track.format, primary_video_track.format),
}
)
# HDR information
hdr_format = primary_video_track.hdr_format_commercial
trc = primary_video_track.transfer_characteristics or primary_video_track.transfer_characteristics_original
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
context["hdr"] = "DV"
base_layer = DYNAMIC_RANGE_MAP.get(hdr_format)
if base_layer and base_layer != "DV":
context["hdr"] += f".{base_layer}"
else:
context["hdr"] = DYNAMIC_RANGE_MAP.get(hdr_format, "")
elif trc and "HLG" in trc:
context["hdr"] = "HLG"
else:
context["hdr"] = ""
# High frame rate
frame_rate = float(primary_video_track.frame_rate)
context["hfr"] = "HFR" if frame_rate > 30 else ""
# Audio information
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
context.update(
{
"audio": AUDIO_CODEC_MAP.get(codec, codec),
"audio_channels": f"{channels:.1f}",
"audio_full": f"{AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}",
"atmos": "Atmos" if ("JOC" in features or primary_audio_track.joc) else "",
}
)
# Multi-language audio
if unique_audio_languages == 2:
context["dual"] = "DUAL"
context["multi"] = ""
elif unique_audio_languages > 2:
context["dual"] = ""
context["multi"] = "MULTi"
else:
context["dual"] = ""
context["multi"] = ""
return context
def __str__(self) -> str:
return "{title}{year} S{season:02}E{number:02} {name}".format(
title=self.title,
year=f" {self.year}" if self.year and config.series_year else "",
year=f" {self.year}" if self.year else "",
season=self.season,
number=self.number,
name=self.name or "",
).strip()
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
# Title [Year] SXXEXX Name (or Title [Year] SXX if folder)
if folder:
name = f"{self.title}"
if self.year and config.series_year:
name += f" {self.year}"
name += f" S{self.season:02}"
else:
name = "{title}{year} S{season:02}E{number:02} {name}".format(
title=self.title.replace("$", "S"), # e.g., Arli$$
year=f" {self.year}" if self.year and config.series_year else "",
season=self.season,
number=self.number,
name=self.name or "",
).strip()
# For folders, use the series template but exclude episode-specific variables
series_template = config.output_template.get("series")
if series_template:
# Create a folder-friendly version by removing episode-specific variables
folder_template = series_template
# Remove episode number and episode name from template for folders
folder_template = re.sub(r'\{episode\}', '', folder_template)
folder_template = re.sub(r'\{episode_name\?\}', '', folder_template)
folder_template = re.sub(r'\{episode_name\}', '', folder_template)
folder_template = re.sub(r'\{season_episode\}', '{season}', folder_template)
if config.scene_naming:
# Resolution
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [
int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")
]
if len(aspect_ratio) == 1:
# e.g., aspect ratio of 2 (2.00:1) would end up as `(2.0,)`, add 1
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
# We want the resolution represented in a 4:3 or 16:9 canvas.
# If it's not 4:3 or 16:9, calculate as if it's inside a 16:9 canvas,
# otherwise the track's height value is fine.
# We are assuming this title is some weird aspect ratio so most
# likely a movie or HD source, so it's most likely widescreen so
# 16:9 canvas makes the most sense.
resolution = int(primary_video_track.width * (9 / 16))
name += f" {resolution}p"
# Clean up any double separators that might result
folder_template = re.sub(r'\.{2,}', '.', folder_template)
folder_template = re.sub(r'\s{2,}', ' ', folder_template)
folder_template = re.sub(r'^[\.\s]+|[\.\s]+$', '', folder_template)
# Service
if show_service:
name += f" {self.service.__name__}"
formatter = TemplateFormatter(folder_template)
context = self._build_template_context(media_info, show_service)
# Override season_episode with just season for folders
context['season'] = f"S{self.season:02}"
# 'WEB-DL'
name += " WEB-DL"
folder_name = formatter.format(context)
# DUAL
if unique_audio_languages == 2:
name += " DUAL"
# MULTi
if unique_audio_languages > 2:
name += " MULTi"
# Audio Codec + Channels (+ feature)
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(
sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" "))
)
# Keep the same separator style as the series template
if '.' in series_template and ' ' not in series_template:
# Dot-based template - use dot separator for folders too
return sanitize_filename(folder_name, ".")
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
# Space-based template - use space separator
return sanitize_filename(folder_name, " ")
else:
# Fallback to simple naming if no template defined
name = f"{self.title}"
if self.year:
name += f" {self.year}"
name += f" S{self.season:02}"
return sanitize_filename(name, " ")
features = primary_audio_track.format_additionalfeatures or ""
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or primary_audio_track.joc:
name += " Atmos"
# Use template from output_template (which includes scene_naming compatibility)
# or fallback to default scene-style template
template = (
config.output_template.get("series")
or "{title}.{year?}.{season_episode}.{episode_name?}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hfr?}.{video}-{tag}"
)
# Video (dynamic range + hfr +) Codec
if primary_video_track:
codec = primary_video_track.format
hdr_format = primary_video_track.hdr_format_commercial
trc = (
primary_video_track.transfer_characteristics
or primary_video_track.transfer_characteristics_original
)
frame_rate = float(primary_video_track.frame_rate)
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
name += " DV"
if DYNAMIC_RANGE_MAP.get(hdr_format) and DYNAMIC_RANGE_MAP.get(hdr_format) != "DV":
name += " HDR"
else:
name += f" {DYNAMIC_RANGE_MAP.get(hdr_format)} "
elif trc and "HLG" in trc:
name += " HLG"
if frame_rate > 30:
name += " HFR"
name += f" {VIDEO_CODEC_MAP.get(codec, codec)}"
if config.tag:
name += f"-{config.tag}"
return sanitize_filename(name)
else:
# Simple naming style without technical details - use spaces instead of dots
return sanitize_filename(name, " ")
formatter = TemplateFormatter(template)
context = self._build_template_context(media_info, show_service)
return formatter.format(context)
class Series(SortedKeyList, ABC):
@@ -197,7 +236,7 @@ class Series(SortedKeyList, ABC):
def __str__(self) -> str:
if not self:
return super().__str__()
return self[0].title + (f" ({self[0].year})" if self[0].year and config.series_year else "")
return self[0].title + (f" ({self[0].year})" if self[0].year else "")
def tree(self, verbose: bool = False) -> Tree:
seasons = Counter(x.season for x in self)

View File

@@ -9,7 +9,7 @@ from sortedcontainers import SortedKeyList
from unshackle.core.config import config
from unshackle.core.constants import AUDIO_CODEC_MAP, DYNAMIC_RANGE_MAP, VIDEO_CODEC_MAP
from unshackle.core.titles.title import Title
from unshackle.core.utilities import sanitize_filename
from unshackle.core.utils.template_formatter import TemplateFormatter
class Movie(Title):
@@ -45,100 +45,107 @@ class Movie(Title):
self.year = year
self.description = description
def _build_template_context(self, media_info: MediaInfo, show_service: bool = True) -> dict:
"""Build template context dictionary from MediaInfo."""
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
context = {
"title": self.name.replace("$", "S"),
"year": self.year or "",
"tag": config.tag or "",
"source": self.service.__name__ if show_service else "",
}
# Video information
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")]
if len(aspect_ratio) == 1:
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
resolution = int(primary_video_track.width * (9 / 16))
context.update(
{
"quality": f"{resolution}p",
"resolution": str(resolution),
"video": VIDEO_CODEC_MAP.get(primary_video_track.format, primary_video_track.format),
}
)
# HDR information
hdr_format = primary_video_track.hdr_format_commercial
trc = primary_video_track.transfer_characteristics or primary_video_track.transfer_characteristics_original
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
context["hdr"] = "DV"
base_layer = DYNAMIC_RANGE_MAP.get(hdr_format)
if base_layer and base_layer != "DV":
context["hdr"] += f".{base_layer}"
else:
context["hdr"] = DYNAMIC_RANGE_MAP.get(hdr_format, "")
elif trc and "HLG" in trc:
context["hdr"] = "HLG"
else:
context["hdr"] = ""
# High frame rate
frame_rate = float(primary_video_track.frame_rate)
context["hfr"] = "HFR" if frame_rate > 30 else ""
# Audio information
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
context.update(
{
"audio": AUDIO_CODEC_MAP.get(codec, codec),
"audio_channels": f"{channels:.1f}",
"audio_full": f"{AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}",
"atmos": "Atmos" if ("JOC" in features or primary_audio_track.joc) else "",
}
)
# Multi-language audio
if unique_audio_languages == 2:
context["dual"] = "DUAL"
context["multi"] = ""
elif unique_audio_languages > 2:
context["dual"] = ""
context["multi"] = "MULTi"
else:
context["dual"] = ""
context["multi"] = ""
return context
def __str__(self) -> str:
if self.year:
return f"{self.name} ({self.year})"
return self.name
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
# Use template from output_template (which includes scene_naming compatibility)
# or fallback to default scene-style template
template = (
config.output_template.get("movies")
or "{title}.{year}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}"
)
# Name (Year)
name = str(self).replace("$", "S") # e.g., Arli$$
if config.scene_naming:
# Resolution
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [
int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")
]
if len(aspect_ratio) == 1:
# e.g., aspect ratio of 2 (2.00:1) would end up as `(2.0,)`, add 1
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
# We want the resolution represented in a 4:3 or 16:9 canvas.
# If it's not 4:3 or 16:9, calculate as if it's inside a 16:9 canvas,
# otherwise the track's height value is fine.
# We are assuming this title is some weird aspect ratio so most
# likely a movie or HD source, so it's most likely widescreen so
# 16:9 canvas makes the most sense.
resolution = int(primary_video_track.width * (9 / 16))
name += f" {resolution}p"
# Service
if show_service:
name += f" {self.service.__name__}"
# 'WEB-DL'
name += " WEB-DL"
# DUAL
if unique_audio_languages == 2:
name += " DUAL"
# MULTi
if unique_audio_languages > 2:
name += " MULTi"
# Audio Codec + Channels (+ feature)
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(
sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" "))
)
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or primary_audio_track.joc:
name += " Atmos"
# Video (dynamic range + hfr +) Codec
if primary_video_track:
codec = primary_video_track.format
hdr_format = primary_video_track.hdr_format_commercial
trc = (
primary_video_track.transfer_characteristics
or primary_video_track.transfer_characteristics_original
)
frame_rate = float(primary_video_track.frame_rate)
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
name += " DV"
if DYNAMIC_RANGE_MAP.get(hdr_format) and DYNAMIC_RANGE_MAP.get(hdr_format) != "DV":
name += " HDR"
else:
name += f" {DYNAMIC_RANGE_MAP.get(hdr_format)} "
elif trc and "HLG" in trc:
name += " HLG"
if frame_rate > 30:
name += " HFR"
name += f" {VIDEO_CODEC_MAP.get(codec, codec)}"
if config.tag:
name += f"-{config.tag}"
return sanitize_filename(name)
else:
# Simple naming style without technical details - use spaces instead of dots
return sanitize_filename(name, " ")
formatter = TemplateFormatter(template)
context = self._build_template_context(media_info, show_service)
return formatter.format(context)
class Movies(SortedKeyList, ABC):

View File

@@ -10,6 +10,7 @@ from unshackle.core.config import config
from unshackle.core.constants import AUDIO_CODEC_MAP
from unshackle.core.titles.title import Title
from unshackle.core.utilities import sanitize_filename
from unshackle.core.utils.template_formatter import TemplateFormatter
class Song(Title):
@@ -81,46 +82,63 @@ class Song(Title):
artist=self.artist, album=self.album, year=self.year, track=self.track, name=self.name
).strip()
def _build_template_context(self, media_info: MediaInfo, show_service: bool = True) -> dict:
"""Build template context dictionary from MediaInfo."""
primary_audio_track = next(iter(media_info.audio_tracks), None)
context = {
"artist": self.artist.replace("$", "S"),
"album": self.album.replace("$", "S"),
"title": self.name.replace("$", "S"),
"track_number": f"{self.track:02}",
"disc": f"{self.disc:02}" if self.disc > 1 else "",
"year": self.year or "",
"tag": config.tag or "",
"source": self.service.__name__ if show_service else "",
}
# Audio information
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
context.update(
{
"audio": AUDIO_CODEC_MAP.get(codec, codec),
"audio_channels": f"{channels:.1f}",
"audio_full": f"{AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}",
"atmos": "Atmos" if ("JOC" in features or primary_audio_track.joc) else "",
}
)
return context
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
audio_track = next(iter(media_info.audio_tracks), None)
codec = audio_track.format
channel_layout = audio_track.channel_layout or audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = audio_track.channel_s or audio_track.channels or 0
channels = float(channel_count)
features = audio_track.format_additionalfeatures or ""
if folder:
# Artist - Album (Year)
name = str(self).split(" / ")[0]
else:
# NN. Song Name
name = str(self).split(" / ")[1]
if config.scene_naming:
# Service
if show_service:
name += f" {self.service.__name__}"
# 'WEB-DL'
name += " WEB-DL"
# Audio Codec + Channels (+ feature)
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or audio_track.joc:
name += " Atmos"
if config.tag:
name += f"-{config.tag}"
return sanitize_filename(name, " ")
else:
# Simple naming style without technical details
# For folders, use simple naming: "Artist - Album (Year)"
name = f"{self.artist} - {self.album}"
if self.year:
name += f" ({self.year})"
return sanitize_filename(name, " ")
# Use template from output_template (which includes scene_naming compatibility)
# or fallback to default scene-style template
template = (
config.output_template.get("songs") or "{track_number}.{title}.{source?}.WEB-DL.{audio_full}.{atmos?}-{tag}"
)
formatter = TemplateFormatter(template)
context = self._build_template_context(media_info, show_service)
return formatter.format(context)
class Album(SortedKeyList, ABC):
def __init__(self, iterable: Optional[Iterable] = None):

View File

@@ -420,6 +420,15 @@ class Track:
for drm in self.drm:
if isinstance(drm, PlayReady):
return drm
elif hasattr(cdm, 'is_playready'):
if cdm.is_playready:
for drm in self.drm:
if isinstance(drm, PlayReady):
return drm
else:
for drm in self.drm:
if isinstance(drm, Widevine):
return drm
return self.drm[0]

View File

@@ -8,6 +8,7 @@ import tempfile
from difflib import SequenceMatcher
from pathlib import Path
from typing import Optional, Tuple
from xml.sax.saxutils import escape
import requests
from requests.adapters import HTTPAdapter, Retry
@@ -289,9 +290,9 @@ def _apply_tags(path: Path, tags: dict[str, str]) -> None:
log.debug("mkvpropedit not found on PATH; skipping tags")
return
log.debug("Applying tags to %s: %s", path, tags)
xml_lines = ["<?xml version='1.0' encoding='UTF-8'?>", "<Tags>", " <Tag>", " <Targets/>"]
xml_lines = ['<?xml version="1.0" encoding="UTF-8"?>', "<Tags>", " <Tag>", " <Targets/>"]
for name, value in tags.items():
xml_lines.append(f" <Simple><Name>{name}</Name><String>{value}</String></Simple>")
xml_lines.append(f" <Simple><Name>{escape(name)}</Name><String>{escape(value)}</String></Simple>")
xml_lines.extend([" </Tag>", "</Tags>"])
with tempfile.NamedTemporaryFile("w", suffix=".xml", delete=False) as f:
f.write("\n".join(xml_lines))
@@ -349,13 +350,25 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) ->
if simkl_tmdb_id:
tmdb_id = simkl_tmdb_id
show_ids = simkl_data.get("show", {}).get("ids", {})
if show_ids.get("imdb"):
standard_tags["IMDB"] = f"https://www.imdb.com/title/{show_ids['imdb']}"
if show_ids.get("tvdb"):
standard_tags["TVDB"] = f"https://thetvdb.com/dereferrer/series/{show_ids['tvdb']}"
if show_ids.get("tmdbtv"):
standard_tags["TMDB"] = f"https://www.themoviedb.org/tv/{show_ids['tmdbtv']}"
# Handle TV show data from Simkl
if simkl_data.get("type") == "episode" and "show" in simkl_data:
show_ids = simkl_data.get("show", {}).get("ids", {})
if show_ids.get("imdb"):
standard_tags["IMDB"] = show_ids["imdb"]
if show_ids.get("tvdb"):
standard_tags["TVDB2"] = f"series/{show_ids['tvdb']}"
if show_ids.get("tmdbtv"):
standard_tags["TMDB"] = f"tv/{show_ids['tmdbtv']}"
# Handle movie data from Simkl
elif simkl_data.get("type") == "movie" and "movie" in simkl_data:
movie_ids = simkl_data.get("movie", {}).get("ids", {})
if movie_ids.get("imdb"):
standard_tags["IMDB"] = movie_ids["imdb"]
if movie_ids.get("tvdb"):
standard_tags["TVDB2"] = f"movies/{movie_ids['tvdb']}"
if movie_ids.get("tmdb"):
standard_tags["TMDB"] = f"movie/{movie_ids['tmdb']}"
# Use TMDB API for additional metadata (either from provided ID or Simkl lookup)
api_key = _api_key()
@@ -373,8 +386,8 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) ->
_apply_tags(path, custom_tags)
return
tmdb_url = f"https://www.themoviedb.org/{'movie' if kind == 'movie' else 'tv'}/{tmdb_id}"
standard_tags["TMDB"] = tmdb_url
prefix = "movie" if kind == "movie" else "tv"
standard_tags["TMDB"] = f"{prefix}/{tmdb_id}"
try:
ids = external_ids(tmdb_id, kind)
except requests.RequestException as exc:
@@ -385,11 +398,13 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) ->
imdb_id = ids.get("imdb_id")
if imdb_id:
standard_tags["IMDB"] = f"https://www.imdb.com/title/{imdb_id}"
standard_tags["IMDB"] = imdb_id
tvdb_id = ids.get("tvdb_id")
if tvdb_id:
tvdb_prefix = "movies" if kind == "movie" else "series"
standard_tags["TVDB"] = f"https://thetvdb.com/dereferrer/{tvdb_prefix}/{tvdb_id}"
if kind == "movie":
standard_tags["TVDB2"] = f"movies/{tvdb_id}"
else:
standard_tags["TVDB2"] = f"series/{tvdb_id}"
merged_tags = {
**custom_tags,

View File

@@ -0,0 +1,147 @@
import logging
import re
from typing import Any, Dict, List
from unshackle.core.utilities import sanitize_filename
class TemplateFormatter:
"""
Template formatter for custom filename patterns.
Supports variable substitution and conditional variables.
Example: '{title}.{year}.{quality?}.{source}-{tag}'
"""
def __init__(self, template: str):
"""Initialize the template formatter.
Args:
template: Template string with variables in {variable} format
"""
self.template = template
self.variables = self._extract_variables()
def _extract_variables(self) -> List[str]:
"""Extract all variables from the template."""
pattern = r"\{([^}]+)\}"
matches = re.findall(pattern, self.template)
return [match.strip() for match in matches]
def format(self, context: Dict[str, Any]) -> str:
"""Format the template with the provided context.
Args:
context: Dictionary containing variable values
Returns:
Formatted filename string
Raises:
ValueError: If required template variables are missing from context
"""
logger = logging.getLogger(__name__)
# Validate that all required variables are present
is_valid, missing_vars = self.validate(context)
if not is_valid:
error_msg = f"Missing required template variables: {', '.join(missing_vars)}"
logger.error(error_msg)
raise ValueError(error_msg)
try:
result = self.template
for variable in self.variables:
placeholder = "{" + variable + "}"
is_conditional = variable.endswith("?")
if is_conditional:
# Remove the ? for conditional variables
var_name = variable[:-1]
value = context.get(var_name, "")
if value:
# Replace with actual value, ensuring it's string and safe
safe_value = str(value).strip()
result = result.replace(placeholder, safe_value)
else:
# Remove the placeholder entirely for empty conditional variables
result = result.replace(placeholder, "")
else:
# Regular variable
value = context.get(variable, "")
if value is None:
logger.warning(f"Template variable '{variable}' is None, using empty string")
value = ""
safe_value = str(value).strip()
result = result.replace(placeholder, safe_value)
# Clean up multiple consecutive dots/separators and other artifacts
result = re.sub(r"\.{2,}", ".", result) # Multiple dots -> single dot
result = re.sub(r"\s{2,}", " ", result) # Multiple spaces -> single space
result = re.sub(r"^[\.\s]+|[\.\s]+$", "", result) # Remove leading/trailing dots and spaces
result = re.sub(r"\.-", "-", result) # Remove dots before dashes (for dot-based templates)
result = re.sub(r"[\.\s]+\)", ")", result) # Remove dots/spaces before closing parentheses
# Determine the appropriate separator based on template style
# If the template contains spaces (like Plex-friendly), preserve them
if " " in self.template and "." not in self.template:
# Space-based template (Plex-friendly) - use space separator
result = sanitize_filename(result, spacer=" ")
else:
# Dot-based template (scene-style) - use dot separator
result = sanitize_filename(result, spacer=".")
# Final validation - ensure we have a non-empty result
if not result or result.isspace():
logger.warning("Template formatting resulted in empty filename, using fallback")
return "untitled"
logger.debug(f"Template formatted successfully: '{self.template}' -> '{result}'")
return result
except Exception as e:
logger.error(f"Error formatting template '{self.template}': {e}")
# Return a safe fallback filename
fallback = f"error_formatting_{hash(self.template) % 10000}"
logger.warning(f"Using fallback filename: {fallback}")
return fallback
def validate(self, context: Dict[str, Any]) -> tuple[bool, List[str]]:
"""Validate that all required variables are present in context.
Args:
context: Dictionary containing variable values
Returns:
Tuple of (is_valid, missing_variables)
"""
missing = []
for variable in self.variables:
is_conditional = variable.endswith("?")
var_name = variable[:-1] if is_conditional else variable
# Only check non-conditional variables
if not is_conditional and var_name not in context:
missing.append(var_name)
return len(missing) == 0, missing
def get_required_variables(self) -> List[str]:
"""Get list of required (non-conditional) variables."""
required = []
for variable in self.variables:
if not variable.endswith("?"):
required.append(variable)
return required
def get_optional_variables(self) -> List[str]:
"""Get list of optional (conditional) variables."""
optional = []
for variable in self.variables:
if variable.endswith("?"):
optional.append(variable[:-1]) # Remove the ?
return optional

View File

@@ -10,15 +10,45 @@ tag_imdb_tmdb: true
# Set terminal background color (custom option not in CONFIG.md)
set_terminal_bg: false
# Set file naming convention
# true for style - Prime.Suspect.S07E01.The.Final.Act.Part.One.1080p.ITV.WEB-DL.AAC2.0.H.264
# false for style - Prime Suspect S07E01 The Final Act - Part One
scene_naming: true
# File naming is now controlled via output_template (see below)
# Default behavior provides scene-style naming similar to the old scene_naming: true
#
# BACKWARD COMPATIBILITY: The old scene_naming option is still supported:
# scene_naming: true -> Equivalent to scene-style templates (dot-separated)
# scene_naming: false -> Equivalent to Plex-friendly templates (space-separated)
# Note: output_template takes precedence over scene_naming if both are defined
# Whether to include the year in series names for episodes and folders (default: true)
# true for style - Show Name (2023) S01E01 Episode Name
# false for style - Show Name S01E01 Episode Name
series_year: true
# Custom output templates for filenames
# When not defined, defaults to scene-style naming equivalent to the old scene_naming: true
# Available variables: {title}, {year}, {season}, {episode}, {season_episode}, {episode_name},
# {quality}, {resolution}, {source}, {audio}, {audio_channels}, {audio_full},
# {video}, {hdr}, {hfr}, {atmos}, {dual}, {multi}, {tag}
# Conditional variables (included only if present): Add ? suffix like {year?}, {episode_name?}, {hdr?}
# Uncomment and customize the templates below:
#
# output_template:
# # Scene-style naming (dot-separated) - Default behavior when no template is defined
# movies: '{title}.{year}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}'
# series: '{title}.{year?}.{season_episode}.{episode_name?}.{quality}.{source}.WEB-DL.{dual?}.{multi?}.{audio_full}.{atmos?}.{hdr?}.{hfr?}.{video}-{tag}'
#
# # Plex-friendly naming (space-separated, clean format)
# # movies: '{title} ({year}) {quality}'
# # series: '{title} {season_episode} {episode_name?}'
#
# # Minimal naming (basic info only)
# # movies: '{title}.{year}.{quality}'
# # series: '{title}.{season_episode}.{episode_name?}'
#
# # Custom scene-style with specific elements
# # movies: '{title}.{year}.{quality}.{hdr?}.{source}.WEB-DL.{audio_full}.{video}-{tag}'
# # series: '{title}.{year?}.{season_episode}.{episode_name?}.{quality}.{hdr?}.{source}.WEB-DL.{audio_full}.{atmos?}.{video}-{tag}'
#
# Example outputs:
# Scene movies: 'The.Matrix.1999.1080p.NF.WEB-DL.DDP5.1.H.264-EXAMPLE'
# Scene movies (HDR): 'Dune.2021.2160p.HBO.WEB-DL.DDP5.1.HDR10.H.265-EXAMPLE'
# Scene series: 'Breaking.Bad.2008.S01E01.Pilot.1080p.NF.WEB-DL.DDP5.1.H.264-EXAMPLE'
# Plex movies: 'The Matrix (1999) 1080p'
# Plex series: 'Breaking Bad S01E01 Pilot'
# Check for updates from GitHub repository on startup (default: true)
update_checks: true
@@ -28,9 +58,9 @@ update_check_interval: 24
# Title caching configuration
# Cache title metadata to reduce redundant API calls
title_cache_enabled: true # Enable/disable title caching globally (default: true)
title_cache_time: 1800 # Cache duration in seconds (default: 1800 = 30 minutes)
title_cache_max_retention: 86400 # Maximum cache retention for fallback when API fails (default: 86400 = 24 hours)
title_cache_enabled: true # Enable/disable title caching globally (default: true)
title_cache_time: 1800 # Cache duration in seconds (default: 1800 = 30 minutes)
title_cache_max_retention: 86400 # Maximum cache retention for fallback when API fails (default: 86400 = 24 hours)
# Muxing configuration
muxing:
@@ -105,6 +135,50 @@ remote_cdm:
host: https://domain-2.com/api
secret: secret_key
- name: "decrypt_labs_chrome"
type: "decrypt_labs" # Required to identify as DecryptLabs CDM
device_name: "ChromeCDM" # Scheme identifier - must match exactly
device_type: CHROME
system_id: 4464 # Doesn't matter
security_level: 3
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here" # Replace with your API key
- name: "decrypt_labs_l1"
type: "decrypt_labs"
device_name: "L1" # Scheme identifier - must match exactly
device_type: ANDROID
system_id: 4464
security_level: 1
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
- name: "decrypt_labs_l2"
type: "decrypt_labs"
device_name: "L2" # Scheme identifier - must match exactly
device_type: ANDROID
system_id: 4464
security_level: 2
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
- name: "decrypt_labs_playready_sl2"
type: "decrypt_labs"
device_name: "SL2" # Scheme identifier - must match exactly
device_type: PLAYREADY
system_id: 0
security_level: 2000
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
- name: "decrypt_labs_playready_sl3"
type: "decrypt_labs"
device_name: "SL3" # Scheme identifier - must match exactly
device_type: PLAYREADY
system_id: 0
security_level: 3000
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
# Key Vaults store your obtained Content Encryption Keys (CEKs)
# Use 'no_push: true' to prevent a vault from receiving pushed keys
# while still allowing it to provide keys when requested
@@ -156,7 +230,6 @@ curl_impersonate:
# Pre-define default options and switches of the dl command
dl:
best: true
sub_format: srt
downloads: 4
workers: 16
@@ -172,7 +245,7 @@ chapter_fallback_name: "Chapter {j:02}"
# Case-Insensitive dictionary of headers for all Services
headers:
Accept-Language: "en-US,en;q=0.8"
User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36"
User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
# Override default filenames used across unshackle
filenames:
@@ -214,6 +287,13 @@ services:
# Global service config
api_key: "service_api_key"
# Service certificate for Widevine L1/L2 (base64 encoded)
# This certificate is automatically used when L1/L2 schemes are selected
# Services obtain this from their DRM provider or license server
certificate: |
CAUSwwUKvQIIAxIQ5US6QAvBDzfTtjb4tU/7QxiH8c+TBSKOAjCCAQoCggEBAObzvlu2hZRsapAPx4Aa4GUZj4/GjxgXUtBH4THSkM40x63wQeyVxlEEo
# ... (full base64 certificate here)
# Profile-specific device configurations
profiles:
john_sd:
@@ -241,14 +321,14 @@ proxy_providers:
username: username_from_service_credentials
password: password_from_service_credentials
server_map:
- us: 12 # force US server #12 for US proxies
us: 12 # force US server #12 for US proxies
surfsharkvpn:
username: your_surfshark_service_username # Service credentials from https://my.surfshark.com/vpn/manual-setup/main/openvpn
password: your_surfshark_service_password # Service credentials (not your login password)
server_map:
- us: 3844 # force US server #3844 for US proxies
- gb: 2697 # force GB server #2697 for GB proxies
- au: 4621 # force AU server #4621 for AU proxies
us: 3844 # force US server #3844 for US proxies
gb: 2697 # force GB server #2697 for GB proxies
au: 4621 # force AU server #4621 for AU proxies
basic:
GB:
- "socks5://username:password@bhx.socks.ipvanish.com:1080" # 1 (Birmingham)

2
uv.lock generated
View File

@@ -1499,7 +1499,7 @@ wheels = [
[[package]]
name = "unshackle"
version = "1.4.2"
version = "1.4.4"
source = { editable = "." }
dependencies = [
{ name = "appdirs" },