18 Commits
1.4.4 ... 1.4.5

Author SHA1 Message Date
Andy
6137146705 chore: bump version to 1.4.5 and update changelog
- Update version from 1.4.4 to 1.4.5 in core/__init__.py
- Add comprehensive changelog entry for v1.4.5 with all changes since 1.4.4
- Include enhanced CDM support, caching improvements, and bug fixes
2025-09-09 03:53:42 +00:00
Andy
859d09693c feat(cdm): Update User-Agent to use dynamic version
- Replace hardcoded version "1.0" with dynamic version import in DecryptLabsRemoteCDM User-Agent header.
2025-09-09 03:49:01 +00:00
Andy
5f022635cb feat(cdm): Optimize get_cached_keys_if_exists for L1/L2 devices
- Always send get_cached_keys_if_exists=True for L1/L2 devices to leverage
- the API's automatic caching optimization. This reduces unnecessary license
- requests by prioritizing cached keys for these security levels.
2025-09-06 22:10:35 +00:00
Andy
ad66502c0c feat(cdm): Add fallback to Widevine common cert for L1 devices
- Use default Widevine common privacy certificate when no service certificate is provided for L1 devices
- Add get_widevine_service_certificate method to EXAMPLE service for config-based certificates
- Improve certificate handling with more descriptive return messages
2025-09-06 20:30:11 +00:00
Andy
e462f07b7a Merge branch 'main' of https://github.com/unshackle-dl/unshackle 2025-09-06 19:39:39 +00:00
Andy
83b600e999 fix(cdm): Clean up session data when retrieving cached keys
Remove decrypt_labs_session_id and challenge from session when cached keys exist but there are missing kids, ensuring clean state for subsequent requests.
2025-09-06 19:38:54 +00:00
Andy
ea8a7b00c9 fix(cdm): Clean up session data when retrieving cached keys
Remove decrypt_labs_session_id and challenge from session when cached keys exist but there are missing kids, ensuring clean state for subsequent requests.
2025-09-06 18:52:20 +00:00
Andy
16ee4175a4 feat(dl): Truncate PSSH string for display in non-debug mode
* Added `_truncate_pssh_for_display` method to limit the width of PSSH strings shown in the console.
* Ensures better readability of DRM information by truncating long strings.
2025-09-05 02:15:10 +00:00
Andy
f722ec69b6 fix(tags): 🐛 Fix formatting issues 2025-09-03 14:51:22 +00:00
Andy
2330297ea4 feat(kv): Enhance vault loading and key copying logic
* Implemented `_load_vaults` function to load and validate vaults by name.
* Improved `_copy_service_data` to handle key copying with better logging and error handling.
* Updated `copy` command to utilize the new vault loading function and streamline the process.
* Enhanced key insertion logic in MySQL and SQLite vaults to avoid inserting existing keys.
2025-09-03 14:50:51 +00:00
Andy
86bb162868 feat(tags): Enhance tag handling for TV shows and movies from Simkl data
Fixes #15
2025-09-02 22:01:44 +00:00
Andy
501cfd68e8 fix(cdm): Add error message for missing service certificate in CDM session 2025-09-02 19:16:34 +00:00
Andy
76fb2eea95 feat: implement intelligent caching system for CDM license requests 2025-09-02 18:48:34 +00:00
Andy
ea5ec40bcd Merge branch 'main' of https://github.com/unshackle-dl/unshackle 2025-09-02 17:34:12 +00:00
Andy
329850b043 feat(cdm): Enhance key retrieval logic and improve cached keys handling 2025-09-02 17:33:31 +00:00
Andy
73595f3b50 feat(cdm): Enhance key retrieval logic and improve cached keys handling 2025-09-02 17:23:02 +00:00
Andy
1e82283133 fix(tags): Fix import order. 2025-09-02 04:13:43 +00:00
Andy
ab13dde9d2 feat(changelog): Update changelog for version 1.4.4 with enhanced CDM support, configuration options, and various improvements 2025-09-02 04:10:28 +00:00
12 changed files with 482 additions and 229 deletions

View File

@@ -5,6 +5,74 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.4.5] - 2025-09-09
### Added
- **Enhanced CDM Key Caching**: Improved key caching and session management for L1/L2 devices
- Optimized `get_cached_keys_if_exists` functionality for better performance with L1/L2 devices
- Enhanced cached key retrieval logic with improved session handling
- **Widevine Common Certificate Fallback**: Added fallback to Widevine common certificate for L1 devices
- Improved compatibility for L1 devices when service certificates are unavailable
- **Enhanced Vault Loading**: Improved vault loading and key copying logic
- Better error handling and key management in vault operations
- **PSSH Display Optimization**: Truncated PSSH string display in non-debug mode for cleaner output
- **CDM Error Messaging**: Added error messages for missing service certificates in CDM sessions
### Changed
- **Dynamic Version Headers**: Updated User-Agent headers to use dynamic version strings
- DecryptLabsRemoteCDM now uses dynamic version import instead of hardcoded version
- **Intelligent CDM Caching**: Implemented intelligent caching system for CDM license requests
- Enhanced caching logic reduces redundant license requests and improves performance
- **Enhanced Tag Handling**: Improved tag handling for TV shows and movies from Simkl data
- Better metadata processing and formatting for improved media tagging
### Fixed
- **CDM Session Management**: Clean up session data when retrieving cached keys
- Remove decrypt_labs_session_id and challenge from session when cached keys exist but there are missing kids
- Ensures clean state for subsequent requests and prevents session conflicts
- **Tag Formatting**: Fixed formatting issues in tag processing
- **Import Order**: Fixed import order issues in tags module
## [1.4.4] - 2025-09-02
### Added
- **Enhanced DecryptLabs CDM Support**: Comprehensive remote CDM functionality
- Full support for Widevine, PlayReady, and ChromeCDM through DecryptLabsRemoteCDM
- Enhanced session management and caching support for remote WV/PR operations
- Support for cached keys and improved license handling
- New CDM configurations for Chrome and PlayReady devices with updated User-Agent and service certificate
- **Advanced Configuration Options**: New device and language preferences
- Added configuration options for device certificate status list
- Enhanced language preference settings
### Changed
- **DRM Decryption Enhancements**: Streamlined decryption process
- Simplified decrypt method by removing unused parameter and streamlined logic
- Improved DecryptLabs CDM configurations with better device support
### Fixed
- **Matroska Tag Compliance**: Enhanced media container compatibility
- Fixed Matroska tag compliance with official specification
- **Application Branding**: Cleaned up version display
- Removed old devine version reference from banner to avoid developer confusion
- Updated branding while maintaining original GNU license compliance
- **IP Information Handling**: Improved geolocation services
- Enhanced get_ip_info functionality with better failover handling
- Added support for 429 error handling and multiple API provider fallback
- Implemented cached IP info retrieval with fallback tester to avoid rate limiting
- **Dependencies**: Streamlined package requirements
- Removed unnecessary data extra requirement from langcodes
### Removed
- Deprecated version references in application banner for clarity
## [1.4.3] - 2025-08-20
### Added

View File

@@ -66,6 +66,18 @@ from unshackle.core.vaults import Vaults
class dl:
@staticmethod
def _truncate_pssh_for_display(pssh_string: str, drm_type: str) -> str:
"""Truncate PSSH string for display when not in debug mode."""
if logging.root.level == logging.DEBUG or not pssh_string:
return pssh_string
max_width = console.width - len(drm_type) - 12
if len(pssh_string) <= max_width:
return pssh_string
return pssh_string[: max_width - 3] + "..."
@click.command(
short_help="Download, Decrypt, and Mux tracks for titles from a Service.",
cls=Services,
@@ -1228,7 +1240,8 @@ class dl:
if isinstance(drm, Widevine):
with self.DRM_TABLE_LOCK:
cek_tree = Tree(Text.assemble(("Widevine", "cyan"), (f"({drm.pssh.dumps()})", "text"), overflow="fold"))
pssh_display = self._truncate_pssh_for_display(drm.pssh.dumps(), "Widevine")
cek_tree = Tree(Text.assemble(("Widevine", "cyan"), (f"({pssh_display})", "text"), overflow="fold"))
pre_existing_tree = next(
(x for x in table.columns[0].cells if isinstance(x, Tree) and x.label == cek_tree.label), None
)
@@ -1320,10 +1333,11 @@ class dl:
elif isinstance(drm, PlayReady):
with self.DRM_TABLE_LOCK:
pssh_display = self._truncate_pssh_for_display(drm.pssh_b64 or "", "PlayReady")
cek_tree = Tree(
Text.assemble(
("PlayReady", "cyan"),
(f"({drm.pssh_b64 or ''})", "text"),
(f"({pssh_display})", "text"),
overflow="fold",
)
)

View File

@@ -12,84 +12,113 @@ from unshackle.core.vault import Vault
from unshackle.core.vaults import Vaults
def _load_vaults(vault_names: list[str]) -> Vaults:
"""Load and validate vaults by name."""
vaults = Vaults()
for vault_name in vault_names:
vault_config = next((x for x in config.key_vaults if x["name"] == vault_name), None)
if not vault_config:
raise click.ClickException(f"Vault ({vault_name}) is not defined in the config.")
vault_type = vault_config["type"]
vault_args = vault_config.copy()
del vault_args["type"]
if not vaults.load(vault_type, **vault_args):
raise click.ClickException(f"Failed to load vault ({vault_name}).")
return vaults
def _process_service_keys(from_vault: Vault, service: str, log: logging.Logger) -> dict[str, str]:
"""Get and validate keys from a vault for a specific service."""
content_keys = list(from_vault.get_keys(service))
bad_keys = {kid: key for kid, key in content_keys if not key or key.count("0") == len(key)}
for kid, key in bad_keys.items():
log.warning(f"Skipping NULL key: {kid}:{key}")
return {kid: key for kid, key in content_keys if kid not in bad_keys}
def _copy_service_data(to_vault: Vault, from_vault: Vault, service: str, log: logging.Logger) -> int:
"""Copy data for a single service between vaults."""
content_keys = _process_service_keys(from_vault, service, log)
total_count = len(content_keys)
if total_count == 0:
log.info(f"{service}: No keys found in {from_vault}")
return 0
try:
added = to_vault.add_keys(service, content_keys)
except PermissionError:
log.warning(f"{service}: No permission to create table in {to_vault}, skipped")
return 0
existed = total_count - added
if added > 0 and existed > 0:
log.info(f"{service}: {added} added, {existed} skipped ({total_count} total)")
elif added > 0:
log.info(f"{service}: {added} added ({total_count} total)")
else:
log.info(f"{service}: {existed} skipped (all existed)")
return added
@click.group(short_help="Manage and configure Key Vaults.", context_settings=context_settings)
def kv() -> None:
"""Manage and configure Key Vaults."""
@kv.command()
@click.argument("to_vault", type=str)
@click.argument("from_vaults", nargs=-1, type=click.UNPROCESSED)
@click.argument("to_vault_name", type=str)
@click.argument("from_vault_names", nargs=-1, type=click.UNPROCESSED)
@click.option("-s", "--service", type=str, default=None, help="Only copy data to and from a specific service.")
def copy(to_vault: str, from_vaults: list[str], service: Optional[str] = None) -> None:
def copy(to_vault_name: str, from_vault_names: list[str], service: Optional[str] = None) -> None:
"""
Copy data from multiple Key Vaults into a single Key Vault.
Rows with matching KIDs are skipped unless there's no KEY set.
Existing data is not deleted or altered.
The `to_vault` argument is the key vault you wish to copy data to.
The `to_vault_name` argument is the key vault you wish to copy data to.
It should be the name of a Key Vault defined in the config.
The `from_vaults` argument is the key vault(s) you wish to take
The `from_vault_names` argument is the key vault(s) you wish to take
data from. You may supply multiple key vaults.
"""
if not from_vaults:
if not from_vault_names:
raise click.ClickException("No Vaults were specified to copy data from.")
log = logging.getLogger("kv")
vaults = Vaults()
for vault_name in [to_vault] + list(from_vaults):
vault = next((x for x in config.key_vaults if x["name"] == vault_name), None)
if not vault:
raise click.ClickException(f"Vault ({vault_name}) is not defined in the config.")
vault_type = vault["type"]
vault_args = vault.copy()
del vault_args["type"]
if not vaults.load(vault_type, **vault_args):
raise click.ClickException(f"Failed to load vault ({vault_name}).")
all_vault_names = [to_vault_name] + list(from_vault_names)
vaults = _load_vaults(all_vault_names)
to_vault: Vault = vaults.vaults[0]
from_vaults: list[Vault] = vaults.vaults[1:]
to_vault = vaults.vaults[0]
from_vaults = vaults.vaults[1:]
vault_names = ", ".join([v.name for v in from_vaults])
log.info(f"Copying data from {vault_names}{to_vault.name}")
log.info(f"Copying data from {', '.join([x.name for x in from_vaults])}, into {to_vault.name}")
if service:
service = Services.get_tag(service)
log.info(f"Only copying data for service {service}")
log.info(f"Filtering by service: {service}")
total_added = 0
for from_vault in from_vaults:
if service:
services = [service]
else:
services = from_vault.get_services()
for service_ in services:
log.info(f"Getting data from {from_vault} for {service_}")
content_keys = list(from_vault.get_keys(service_)) # important as it's a generator we iterate twice
bad_keys = {kid: key for kid, key in content_keys if not key or key.count("0") == len(key)}
for kid, key in bad_keys.items():
log.warning(f"Cannot add a NULL Content Key to a Vault, skipping: {kid}:{key}")
content_keys = {kid: key for kid, key in content_keys if kid not in bad_keys}
total_count = len(content_keys)
log.info(f"Adding {total_count} Content Keys to {to_vault} for {service_}")
try:
added = to_vault.add_keys(service_, content_keys)
except PermissionError:
log.warning(f" - No permission to create table ({service_}) in {to_vault}, skipping...")
continue
services_to_copy = [service] if service else from_vault.get_services()
for service_tag in services_to_copy:
added = _copy_service_data(to_vault, from_vault, service_tag, log)
total_added += added
existed = total_count - added
log.info(f"{to_vault} ({service_}): {added} newly added, {existed} already existed (skipped)")
log.info(f"{to_vault}: {total_added} total newly added")
if total_added > 0:
log.info(f"Successfully added {total_added} new keys to {to_vault}")
else:
log.info("Copy completed - no new keys to add")
@kv.command()
@@ -106,9 +135,9 @@ def sync(ctx: click.Context, vaults: list[str], service: Optional[str] = None) -
if not len(vaults) > 1:
raise click.ClickException("You must provide more than one Vault to sync.")
ctx.invoke(copy, to_vault=vaults[0], from_vaults=vaults[1:], service=service)
ctx.invoke(copy, to_vault_name=vaults[0], from_vault_names=vaults[1:], service=service)
for i in range(1, len(vaults)):
ctx.invoke(copy, to_vault=vaults[i], from_vaults=[vaults[i - 1]], service=service)
ctx.invoke(copy, to_vault_name=vaults[i], from_vault_names=[vaults[i - 1]], service=service)
@kv.command()
@@ -135,15 +164,7 @@ def add(file: Path, service: str, vaults: list[str]) -> None:
log = logging.getLogger("kv")
service = Services.get_tag(service)
vaults_ = Vaults()
for vault_name in vaults:
vault = next((x for x in config.key_vaults if x["name"] == vault_name), None)
if not vault:
raise click.ClickException(f"Vault ({vault_name}) is not defined in the config.")
vault_type = vault["type"]
vault_args = vault.copy()
del vault_args["type"]
vaults_.load(vault_type, **vault_args)
vaults_ = _load_vaults(list(vaults))
data = file.read_text(encoding="utf8")
kid_keys: dict[str, str] = {}
@@ -173,15 +194,7 @@ def prepare(vaults: list[str]) -> None:
"""Create Service Tables on Vaults if not yet created."""
log = logging.getLogger("kv")
vaults_ = Vaults()
for vault_name in vaults:
vault = next((x for x in config.key_vaults if x["name"] == vault_name), None)
if not vault:
raise click.ClickException(f"Vault ({vault_name}) is not defined in the config.")
vault_type = vault["type"]
vault_args = vault.copy()
del vault_args["type"]
vaults_.load(vault_type, **vault_args)
vaults_ = _load_vaults(vaults)
for vault in vaults_:
if hasattr(vault, "has_table") and hasattr(vault, "create_table"):

View File

@@ -1 +1 @@
__version__ = "1.4.4"
__version__ = "1.4.5"

View File

@@ -6,10 +6,12 @@ from typing import Any, Dict, List, Optional, Union
from uuid import UUID
import requests
from pywidevine.cdm import Cdm as WidevineCdm
from pywidevine.device import DeviceTypes
from requests import Session
from unshackle.core.vaults import Vaults
from unshackle.core import __version__
class MockCertificateChain:
@@ -70,10 +72,26 @@ class DecryptLabsRemoteCDMExceptions:
class DecryptLabsRemoteCDM:
"""
Decrypt Labs Remote CDM implementation compatible with pywidevine's CDM interface.
Decrypt Labs Remote CDM implementation with intelligent caching system.
This class provides a drop-in replacement for pywidevine's local CDM using
Decrypt Labs' KeyXtractor API service.
Decrypt Labs' KeyXtractor API service, enhanced with smart caching logic
that minimizes unnecessary license requests.
Key Features:
- Compatible with both Widevine and PlayReady DRM schemes
- Intelligent caching that compares required vs. available keys
- Optimized caching for L1/L2 devices (leverages API auto-optimization)
- Automatic key combination for mixed cache/license scenarios
- Seamless fallback to license requests when keys are missing
Intelligent Caching System:
1. DRM classes (PlayReady/Widevine) provide required KIDs via set_required_kids()
2. get_license_challenge() first checks for cached keys
3. For L1/L2 devices, always attempts cached keys first (API optimized)
4. If cached keys satisfy requirements, returns empty challenge (no license needed)
5. If keys are missing, makes targeted license request for remaining keys
6. parse_license() combines cached and license keys intelligently
"""
service_certificate_challenge = b"\x08\x04"
@@ -110,6 +128,7 @@ class DecryptLabsRemoteCDM:
self.device_name = device_name
self.service_name = service_name or ""
self.vaults = vaults
self.uch = self.host != "https://keyxtractor.decryptlabs.com"
self._device_type_str = device_type
if device_type:
@@ -126,12 +145,13 @@ class DecryptLabsRemoteCDM:
self._sessions: Dict[bytes, Dict[str, Any]] = {}
self._pssh_b64 = None
self._required_kids: Optional[List[str]] = None
self._http_session = Session()
self._http_session.headers.update(
{
"decrypt-labs-api-key": self.secret,
"Content-Type": "application/json",
"User-Agent": "unshackle-decrypt-labs-cdm/1.0",
"User-Agent": f"unshackle-decrypt-labs-cdm/{__version__}",
}
)
@@ -159,6 +179,29 @@ class DecryptLabsRemoteCDM:
"""Store base64-encoded PSSH data for PlayReady compatibility."""
self._pssh_b64 = pssh_b64
def set_required_kids(self, kids: List[Union[str, UUID]]) -> None:
"""
Set the required Key IDs for intelligent caching decisions.
This method enables the CDM to make smart decisions about when to request
additional keys via license challenges. When cached keys are available,
the CDM will compare them against the required KIDs to determine if a
license request is still needed for missing keys.
Args:
kids: List of required Key IDs as UUIDs or hex strings
Note:
Should be called by DRM classes (PlayReady/Widevine) before making
license challenge requests to enable optimal caching behavior.
"""
self._required_kids = []
for kid in kids:
if isinstance(kid, UUID):
self._required_kids.append(str(kid).replace("-", "").lower())
else:
self._required_kids.append(str(kid).replace("-", "").lower())
def _generate_session_id(self) -> bytes:
"""Generate a unique session ID."""
return secrets.token_bytes(16)
@@ -211,12 +254,14 @@ class DecryptLabsRemoteCDM:
"pssh": None,
"challenge": None,
"decrypt_labs_session_id": None,
"tried_cache": False,
"cached_keys": None,
}
return session_id
def close(self, session_id: bytes) -> None:
"""
Close a CDM session.
Close a CDM session and perform comprehensive cleanup.
Args:
session_id: Session identifier
@@ -227,6 +272,8 @@ class DecryptLabsRemoteCDM:
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
session = self._sessions[session_id]
session.clear()
del self._sessions[session_id]
def get_service_certificate(self, session_id: bytes) -> Optional[bytes]:
@@ -265,8 +312,13 @@ class DecryptLabsRemoteCDM:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
if certificate is None:
self._sessions[session_id]["service_certificate"] = None
return "Removed"
if not self._is_playready and self.device_name == "L1":
certificate = WidevineCdm.common_privacy_cert
self._sessions[session_id]["service_certificate"] = base64.b64decode(certificate)
return "Using default Widevine common privacy certificate for L1"
else:
self._sessions[session_id]["service_certificate"] = None
return "No certificate set (not required for this device type)"
if isinstance(certificate, str):
certificate = base64.b64decode(certificate)
@@ -291,57 +343,27 @@ class DecryptLabsRemoteCDM:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
session = self._sessions[session_id]
pssh = session.get("pssh")
if not pssh:
return False
if self.vaults:
key_ids = []
if hasattr(pssh, "key_ids"):
key_ids = pssh.key_ids
elif hasattr(pssh, "kids"):
key_ids = pssh.kids
for kid in key_ids:
key, _ = self.vaults.get_key(kid)
if key and key.count("0") != len(key):
return True
if self.service_name:
try:
key_ids = []
if hasattr(pssh, "key_ids"):
key_ids = [kid.hex for kid in pssh.key_ids]
elif hasattr(pssh, "kids"):
key_ids = [kid.hex for kid in pssh.kids]
if key_ids:
response = self._http_session.post(
f"{self.host}/get-cached-keys",
json={"service": self.service_name, "kid": key_ids},
timeout=30,
)
if response.status_code == 200:
data = response.json()
return (
data.get("message") == "success"
and "cached_keys" in data
and isinstance(data["cached_keys"], list)
and len(data["cached_keys"]) > 0
)
except Exception:
pass
return False
session_keys = session.get("keys", [])
return len(session_keys) > 0
def get_license_challenge(
self, session_id: bytes, pssh_or_wrm: Any, license_type: str = "STREAMING", privacy_mode: bool = True
) -> bytes:
"""
Generate a license challenge using Decrypt Labs API.
Generate a license challenge using Decrypt Labs API with intelligent caching.
This method implements smart caching logic that:
1. First attempts to retrieve cached keys from the API
2. If required KIDs are set, compares cached keys against requirements
3. Only makes a license request if keys are missing
4. Returns empty challenge if all required keys are cached
The intelligent caching works as follows:
- For L1/L2 devices: Always prioritizes cached keys (API automatically optimizes)
- For other devices: Uses cache retry logic based on session state
- With required KIDs set: Only requests license for missing keys
- Without required KIDs: Returns any available cached keys
- For PlayReady: Combines cached keys with license keys seamlessly
Args:
session_id: Session identifier
@@ -350,11 +372,15 @@ class DecryptLabsRemoteCDM:
privacy_mode: Whether to use privacy mode - for compatibility only
Returns:
License challenge as bytes
License challenge as bytes, or empty bytes if cached keys satisfy requirements
Raises:
InvalidSession: If session ID is invalid
requests.RequestException: If API request fails
Note:
Call set_required_kids() before this method for optimal caching behavior.
L1/L2 devices automatically use cached keys when available per API design.
"""
_ = license_type, privacy_mode
@@ -365,12 +391,18 @@ class DecryptLabsRemoteCDM:
session["pssh"] = pssh_or_wrm
init_data = self._get_init_data_from_pssh(pssh_or_wrm)
already_tried_cache = session.get("tried_cache", False)
if self.has_cached_keys(session_id):
self._load_cached_keys(session_id)
return b""
if self.device_name in ["L1", "L2"]:
get_cached_keys = True
else:
get_cached_keys = not already_tried_cache
request_data = {"scheme": self.device_name, "init_data": init_data}
request_data = {
"scheme": self.device_name,
"init_data": init_data,
"get_cached_keys_if_exists": get_cached_keys,
}
if self.device_name in ["L1", "L2", "SL2", "SL3"] and self.service_name:
request_data["service"] = self.service_name
@@ -391,22 +423,108 @@ class DecryptLabsRemoteCDM:
error_msg += f" - Details: {data['details']}"
if "error" in data:
error_msg += f" - Error: {data['error']}"
if "service_certificate is required" in str(data) and not session["service_certificate"]:
error_msg += " (No service certificate was provided to the CDM session)"
raise requests.RequestException(f"API error: {error_msg}")
if data.get("message_type") == "cached-keys" or "cached_keys" in data:
message_type = data.get("message_type")
if message_type == "cached-keys" or "cached_keys" in data:
"""
Handle cached keys response from API.
When the API returns cached keys, we need to determine if they satisfy
our requirements or if we need to make an additional license request
for missing keys.
"""
cached_keys = data.get("cached_keys", [])
session["keys"] = self._parse_cached_keys(cached_keys)
parsed_keys = self._parse_cached_keys(cached_keys)
session["keys"] = parsed_keys
session["tried_cache"] = True
if self._required_kids:
cached_kids = set()
for key in parsed_keys:
if isinstance(key, dict) and "kid" in key:
cached_kids.add(key["kid"].replace("-", "").lower())
required_kids = set(self._required_kids)
missing_kids = required_kids - cached_kids
if missing_kids:
session["cached_keys"] = parsed_keys
if self.device_name in ["L1", "L2"]:
license_request_data = {
"scheme": self.device_name,
"init_data": init_data,
"get_cached_keys_if_exists": False,
}
if self.service_name:
license_request_data["service"] = self.service_name
if session["service_certificate"]:
license_request_data["service_certificate"] = base64.b64encode(
session["service_certificate"]
).decode("utf-8")
else:
license_request_data = request_data.copy()
license_request_data["get_cached_keys_if_exists"] = False
session["decrypt_labs_session_id"] = None
session["challenge"] = None
session["tried_cache"] = False
response = self._http_session.post(
f"{self.host}/get-request", json=license_request_data, timeout=30
)
if response.status_code == 200:
data = response.json()
if data.get("message") == "success" and "challenge" in data:
challenge = base64.b64decode(data["challenge"])
session["challenge"] = challenge
session["decrypt_labs_session_id"] = data["session_id"]
return challenge
return b""
else:
return b""
else:
return b""
if message_type == "license-request" or "challenge" in data:
challenge = base64.b64decode(data["challenge"])
session["challenge"] = challenge
session["decrypt_labs_session_id"] = data["session_id"]
return challenge
error_msg = f"Unexpected API response format. message_type={message_type}, available_fields={list(data.keys())}"
if data.get("message"):
error_msg = f"API response: {data['message']} - {error_msg}"
if "details" in data:
error_msg += f" - Details: {data['details']}"
if "error" in data:
error_msg += f" - Error: {data['error']}"
if already_tried_cache and data.get("message") == "success":
return b""
challenge = base64.b64decode(data["challenge"])
session["challenge"] = challenge
session["decrypt_labs_session_id"] = data["session_id"]
return challenge
raise requests.RequestException(error_msg)
def parse_license(self, session_id: bytes, license_message: Union[bytes, str]) -> None:
"""
Parse license response using Decrypt Labs API.
Parse license response using Decrypt Labs API with intelligent key combination.
For PlayReady content with partial cached keys, this method intelligently
combines the cached keys with newly obtained license keys, avoiding
duplicates while ensuring all required keys are available.
The key combination process:
1. Extracts keys from the license response
2. If cached keys exist (PlayReady), combines them with license keys
3. Removes duplicate keys by comparing normalized KIDs
4. Updates the session with the complete key set
Args:
session_id: Session identifier
@@ -421,7 +539,7 @@ class DecryptLabsRemoteCDM:
session = self._sessions[session_id]
if session["keys"]:
if session["keys"] and not (self.is_playready and "cached_keys" in session):
return
if not session.get("challenge") or not session.get("decrypt_labs_session_id"):
@@ -465,7 +583,49 @@ class DecryptLabsRemoteCDM:
error_msg += f" - Details: {data['details']}"
raise requests.RequestException(f"License decrypt error: {error_msg}")
session["keys"] = self._parse_keys_response(data)
license_keys = self._parse_keys_response(data)
if self.is_playready and "cached_keys" in session:
"""
Combine cached keys with license keys for PlayReady content.
This ensures we have both the cached keys (obtained earlier) and
any additional keys from the license response, without duplicates.
"""
cached_keys = session.get("cached_keys", [])
all_keys = list(cached_keys)
for license_key in license_keys:
already_exists = False
license_kid = None
if isinstance(license_key, dict) and "kid" in license_key:
license_kid = license_key["kid"].replace("-", "").lower()
elif hasattr(license_key, "kid"):
license_kid = str(license_key.kid).replace("-", "").lower()
elif hasattr(license_key, "key_id"):
license_kid = str(license_key.key_id).replace("-", "").lower()
if license_kid:
for cached_key in cached_keys:
cached_kid = None
if isinstance(cached_key, dict) and "kid" in cached_key:
cached_kid = cached_key["kid"].replace("-", "").lower()
elif hasattr(cached_key, "kid"):
cached_kid = str(cached_key.kid).replace("-", "").lower()
elif hasattr(cached_key, "key_id"):
cached_kid = str(cached_key.key_id).replace("-", "").lower()
if cached_kid == license_kid:
already_exists = True
break
if not already_exists:
all_keys.append(license_key)
session["keys"] = all_keys
session["cached_keys"] = None
else:
session["keys"] = license_keys
if self.vaults and session["keys"]:
key_dict = {UUID(hex=key["kid"]): key["key"] for key in session["keys"] if key["type"] == "CONTENT"}
@@ -496,49 +656,6 @@ class DecryptLabsRemoteCDM:
return keys
def _load_cached_keys(self, session_id: bytes) -> None:
"""Load cached keys from vaults and Decrypt Labs API."""
session = self._sessions[session_id]
pssh = session["pssh"]
keys = []
if self.vaults:
key_ids = []
if hasattr(pssh, "key_ids"):
key_ids = pssh.key_ids
elif hasattr(pssh, "kids"):
key_ids = pssh.kids
for kid in key_ids:
key, _ = self.vaults.get_key(kid)
if key and key.count("0") != len(key):
keys.append({"kid": kid.hex, "key": key, "type": "CONTENT"})
if not keys and self.service_name:
try:
key_ids = []
if hasattr(pssh, "key_ids"):
key_ids = [kid.hex for kid in pssh.key_ids]
elif hasattr(pssh, "kids"):
key_ids = [kid.hex for kid in pssh.kids]
if key_ids:
response = self._http_session.post(
f"{self.host}/get-cached-keys",
json={"service": self.service_name, "kid": key_ids},
timeout=30,
)
if response.status_code == 200:
data = response.json()
if data.get("message") == "success" and "cached_keys" in data:
keys = self._parse_cached_keys(data["cached_keys"])
except Exception:
pass
session["keys"] = keys
def _parse_cached_keys(self, cached_keys_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Parse cached keys from API response.

View File

@@ -256,44 +256,38 @@ class PlayReady:
return keys
def get_content_keys(self, cdm: PlayReadyCdm, certificate: Callable, licence: Callable) -> None:
for kid in self.kids:
if kid in self.content_keys:
continue
session_id = cdm.open()
try:
if hasattr(cdm, "set_pssh_b64") and self.pssh_b64:
cdm.set_pssh_b64(self.pssh_b64)
session_id = cdm.open()
try:
if hasattr(cdm, "set_pssh_b64") and self.pssh_b64:
cdm.set_pssh_b64(self.pssh_b64)
if hasattr(cdm, "set_required_kids"):
cdm.set_required_kids(self.kids)
challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0])
challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0])
if challenge:
try:
license_res = licence(challenge=challenge)
except Exception:
if hasattr(cdm, "use_cached_keys_as_fallback"):
if cdm.use_cached_keys_as_fallback(session_id):
keys = self._extract_keys_from_cdm(cdm, session_id)
self.content_keys.update(keys)
continue
if isinstance(license_res, bytes):
license_str = license_res.decode(errors="ignore")
else:
license_str = str(license_res)
if "<License>" not in license_str:
try:
license_str = base64.b64decode(license_str + "===").decode()
except Exception:
pass
cdm.parse_license(session_id, license_str)
except Exception:
raise
if isinstance(license_res, bytes):
license_str = license_res.decode(errors="ignore")
else:
license_str = str(license_res)
if "<License>" not in license_str:
try:
license_str = base64.b64decode(license_str + "===").decode()
except Exception:
pass
cdm.parse_license(session_id, license_str)
keys = self._extract_keys_from_cdm(cdm, session_id)
self.content_keys.update(keys)
finally:
cdm.close(session_id)
keys = self._extract_keys_from_cdm(cdm, session_id)
self.content_keys.update(keys)
finally:
cdm.close(session_id)
if not self.content_keys:
raise PlayReady.Exceptions.EmptyLicense("No Content Keys were within the License")

View File

@@ -185,6 +185,9 @@ class Widevine:
if cert and hasattr(cdm, "set_service_certificate"):
cdm.set_service_certificate(session_id, cert)
if hasattr(cdm, "set_required_kids"):
cdm.set_required_kids(self.kids)
challenge = cdm.get_license_challenge(session_id, self.pssh)
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
@@ -218,6 +221,9 @@ class Widevine:
if cert and hasattr(cdm, "set_service_certificate"):
cdm.set_service_certificate(session_id, cert)
if hasattr(cdm, "set_required_kids"):
cdm.set_required_kids(self.kids)
challenge = cdm.get_license_challenge(session_id, self.pssh)
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):

View File

@@ -8,10 +8,10 @@ import tempfile
from difflib import SequenceMatcher
from pathlib import Path
from typing import Optional, Tuple
from xml.sax.saxutils import escape
import requests
from requests.adapters import HTTPAdapter, Retry
from xml.sax.saxutils import escape
from unshackle.core import binaries
from unshackle.core.config import config
@@ -350,13 +350,25 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) ->
if simkl_tmdb_id:
tmdb_id = simkl_tmdb_id
show_ids = simkl_data.get("show", {}).get("ids", {})
if show_ids.get("imdb"):
standard_tags["IMDB"] = show_ids["imdb"]
if show_ids.get("tvdb"):
standard_tags["TVDB"] = str(show_ids["tvdb"])
if show_ids.get("tmdbtv"):
standard_tags["TMDB"] = f"tv/{show_ids['tmdbtv']}"
# Handle TV show data from Simkl
if simkl_data.get("type") == "episode" and "show" in simkl_data:
show_ids = simkl_data.get("show", {}).get("ids", {})
if show_ids.get("imdb"):
standard_tags["IMDB"] = show_ids["imdb"]
if show_ids.get("tvdb"):
standard_tags["TVDB2"] = f"series/{show_ids['tvdb']}"
if show_ids.get("tmdbtv"):
standard_tags["TMDB"] = f"tv/{show_ids['tmdbtv']}"
# Handle movie data from Simkl
elif simkl_data.get("type") == "movie" and "movie" in simkl_data:
movie_ids = simkl_data.get("movie", {}).get("ids", {})
if movie_ids.get("imdb"):
standard_tags["IMDB"] = movie_ids["imdb"]
if movie_ids.get("tvdb"):
standard_tags["TVDB2"] = f"movies/{movie_ids['tvdb']}"
if movie_ids.get("tmdb"):
standard_tags["TMDB"] = f"movie/{movie_ids['tmdb']}"
# Use TMDB API for additional metadata (either from provided ID or Simkl lookup)
api_key = _api_key()
@@ -389,7 +401,10 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) ->
standard_tags["IMDB"] = imdb_id
tvdb_id = ids.get("tvdb_id")
if tvdb_id:
standard_tags["TVDB"] = str(tvdb_id)
if kind == "movie":
standard_tags["TVDB2"] = f"movies/{tvdb_id}"
else:
standard_tags["TVDB2"] = f"series/{tvdb_id}"
merged_tags = {
**custom_tags,

View File

@@ -282,6 +282,10 @@ class EXAMPLE(Service):
return chapters
def get_widevine_service_certificate(self, **_: any) -> str:
"""Return the Widevine service certificate from config, if available."""
return self.config.get("certificate")
def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]:
"""Retrieve a PlayReady license for a given track."""

View File

@@ -117,7 +117,7 @@ remote_cdm:
type: "decrypt_labs"
device_name: "L1" # Scheme identifier - must match exactly
device_type: ANDROID
system_id: 4464
system_id: 4464
security_level: 1
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"

View File

@@ -131,16 +131,27 @@ class MySQL(Vault):
if any(isinstance(kid, UUID) for kid, key_ in kid_keys.items()):
kid_keys = {kid.hex if isinstance(kid, UUID) else kid: key_ for kid, key_ in kid_keys.items()}
if not kid_keys:
return 0
conn = self.conn_factory.get()
cursor = conn.cursor()
try:
placeholders = ",".join(["%s"] * len(kid_keys))
cursor.execute(f"SELECT kid FROM `{service}` WHERE kid IN ({placeholders})", list(kid_keys.keys()))
existing_kids = {row["kid"] for row in cursor.fetchall()}
new_keys = {kid: key for kid, key in kid_keys.items() if kid not in existing_kids}
if not new_keys:
return 0
cursor.executemany(
# TODO: SQL injection risk
f"INSERT IGNORE INTO `{service}` (kid, key_) VALUES (%s, %s)",
kid_keys.items(),
f"INSERT INTO `{service}` (kid, key_) VALUES (%s, %s)",
new_keys.items(),
)
return cursor.rowcount
return len(new_keys)
finally:
conn.commit()
cursor.close()

View File

@@ -102,16 +102,27 @@ class SQLite(Vault):
if any(isinstance(kid, UUID) for kid, key_ in kid_keys.items()):
kid_keys = {kid.hex if isinstance(kid, UUID) else kid: key_ for kid, key_ in kid_keys.items()}
if not kid_keys:
return 0
conn = self.conn_factory.get()
cursor = conn.cursor()
try:
placeholders = ",".join(["?"] * len(kid_keys))
cursor.execute(f"SELECT kid FROM `{service}` WHERE kid IN ({placeholders})", list(kid_keys.keys()))
existing_kids = {row[0] for row in cursor.fetchall()}
new_keys = {kid: key for kid, key in kid_keys.items() if kid not in existing_kids}
if not new_keys:
return 0
cursor.executemany(
# TODO: SQL injection risk
f"INSERT OR IGNORE INTO `{service}` (kid, key_) VALUES (?, ?)",
kid_keys.items(),
f"INSERT INTO `{service}` (kid, key_) VALUES (?, ?)",
new_keys.items(),
)
return cursor.rowcount
return len(new_keys)
finally:
conn.commit()
cursor.close()