4 Commits

Author SHA1 Message Date
Andy
9fd0895128 feat(cdm): Refactor DecryptLabsRemoteCDM full support for Widevine/Playready and ChromeCDM 2025-09-02 04:02:52 +00:00
Andy
ed744205ad fix(tags): 🐛 Fix Matroska tag compliance with official specification
- Update IMDB tags to use ID only (tt123456) instead of URLs
  - Update TMDB tags to use prefix/id format (movie/123456, tv/123456)
  - Update TVDB tags to use numeric ID only
  - Add XML escaping for tag values
  - Fix XML declaration to use double quotes

Fixes #15
2025-09-01 21:02:08 +00:00
Andy
3ef43afeed feat(cdm): Add DecryptLabs CDM configurations for Chrome and PlayReady devices with updated User-Agent and service certificate 2025-09-01 00:34:07 +00:00
Andy
26851cbe7c feat(cdm): Enhance DecryptLabsRemoteCDM with improved session management and caching support and better support for remote WV/PR 2025-09-01 00:31:00 +00:00
10 changed files with 736 additions and 197 deletions

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project] [project]
name = "unshackle" name = "unshackle"
version = "1.4.3" version = "1.4.4"
description = "Modular Movie, TV, and Music Archival Software." description = "Modular Movie, TV, and Music Archival Software."
authors = [{ name = "unshackle team" }] authors = [{ name = "unshackle team" }]
requires-python = ">=3.10,<3.13" requires-python = ">=3.10,<3.13"

View File

@@ -299,21 +299,6 @@ class dl:
if getattr(config, "decryption_map", None): if getattr(config, "decryption_map", None):
config.decryption = config.decryption_map.get(self.service, config.decryption) config.decryption = config.decryption_map.get(self.service, config.decryption)
with console.status("Loading DRM CDM...", spinner="dots"):
try:
self.cdm = self.get_cdm(self.service, self.profile)
except ValueError as e:
self.log.error(f"Failed to load CDM, {e}")
sys.exit(1)
if self.cdm:
if hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]:
self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})")
else:
self.log.info(
f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})"
)
with console.status("Loading Key Vaults...", spinner="dots"): with console.status("Loading Key Vaults...", spinner="dots"):
self.vaults = Vaults(self.service) self.vaults = Vaults(self.service)
total_vaults = len(config.key_vaults) total_vaults = len(config.key_vaults)
@@ -352,6 +337,24 @@ class dl:
else: else:
self.log.debug("No vaults are currently active") self.log.debug("No vaults are currently active")
with console.status("Loading DRM CDM...", spinner="dots"):
try:
self.cdm = self.get_cdm(self.service, self.profile)
except ValueError as e:
self.log.error(f"Failed to load CDM, {e}")
sys.exit(1)
if self.cdm:
if isinstance(self.cdm, DecryptLabsRemoteCDM):
drm_type = "PlayReady" if self.cdm.is_playready else "Widevine"
self.log.info(f"Loaded {drm_type} Remote CDM: DecryptLabs (L{self.cdm.security_level})")
elif hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]:
self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})")
else:
self.log.info(
f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})"
)
self.proxy_providers = [] self.proxy_providers = []
if no_proxy: if no_proxy:
ctx.params["proxy"] = None ctx.params["proxy"] = None
@@ -874,7 +877,12 @@ class dl:
), ),
licence=partial( licence=partial(
service.get_playready_license service.get_playready_license
if isinstance(self.cdm, PlayReadyCdm) if (
isinstance(self.cdm, PlayReadyCdm)
or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
)
)
and hasattr(service, "get_playready_license") and hasattr(service, "get_playready_license")
else service.get_widevine_license, else service.get_widevine_license,
title=title, title=title,
@@ -1201,10 +1209,22 @@ class dl:
if not drm: if not drm:
return return
if isinstance(drm, Widevine) and not isinstance(self.cdm, WidevineCdm): if isinstance(drm, Widevine):
self.cdm = self.get_cdm(self.service, self.profile, drm="widevine") if not isinstance(self.cdm, (WidevineCdm, DecryptLabsRemoteCDM)) or (
elif isinstance(drm, PlayReady) and not isinstance(self.cdm, PlayReadyCdm): isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
self.cdm = self.get_cdm(self.service, self.profile, drm="playready") ):
widevine_cdm = self.get_cdm(self.service, self.profile, drm="widevine")
if widevine_cdm:
self.log.info("Switching to Widevine CDM for Widevine content")
self.cdm = widevine_cdm
elif isinstance(drm, PlayReady):
if not isinstance(self.cdm, (PlayReadyCdm, DecryptLabsRemoteCDM)) or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and not self.cdm.is_playready
):
playready_cdm = self.get_cdm(self.service, self.profile, drm="playready")
if playready_cdm:
self.log.info("Switching to PlayReady CDM for PlayReady content")
self.cdm = playready_cdm
if isinstance(drm, Widevine): if isinstance(drm, Widevine):
with self.DRM_TABLE_LOCK: with self.DRM_TABLE_LOCK:
@@ -1442,8 +1462,8 @@ class dl:
return Credential(*credentials) return Credential(*credentials)
return Credential.loads(credentials) # type: ignore return Credential.loads(credentials) # type: ignore
@staticmethod
def get_cdm( def get_cdm(
self,
service: str, service: str,
profile: Optional[str] = None, profile: Optional[str] = None,
drm: Optional[str] = None, drm: Optional[str] = None,
@@ -1477,10 +1497,18 @@ class dl:
cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None) cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None)
if cdm_api: if cdm_api:
is_decrypt_lab = True if cdm_api["type"] == "decrypt_labs" else False is_decrypt_lab = True if cdm_api.get("type") == "decrypt_labs" else False
del cdm_api["name"] if is_decrypt_lab:
del cdm_api["type"] del cdm_api["name"]
return DecryptLabsRemoteCDM(service_name=service, **cdm_api) if is_decrypt_lab else RemoteCdm(**cdm_api) del cdm_api["type"]
# All DecryptLabs CDMs use DecryptLabsRemoteCDM
return DecryptLabsRemoteCDM(service_name=service, vaults=self.vaults, **cdm_api)
else:
del cdm_api["name"]
if "type" in cdm_api:
del cdm_api["type"]
return RemoteCdm(**cdm_api)
prd_path = config.directories.prds / f"{cdm_name}.prd" prd_path = config.directories.prds / f"{cdm_name}.prd"
if not prd_path.is_file(): if not prd_path.is_file():

View File

@@ -1 +1 @@
__version__ = "1.4.3" __version__ = "1.4.4"

View File

@@ -1,189 +1,585 @@
from __future__ import annotations
import base64 import base64
import secrets import secrets
from typing import Optional, Type, Union from typing import Any, Dict, List, Optional, Union
from uuid import UUID from uuid import UUID
import requests import requests
from pywidevine import PSSH, Device, DeviceTypes, Key, RemoteCdm from pywidevine.device import DeviceTypes
from pywidevine.license_protocol_pb2 import SignedDrmCertificate, SignedMessage from requests import Session
# Copyright 2024 by DevYukine. from unshackle.core.vaults import Vaults
class DecryptLabsRemoteCDM(RemoteCdm): class MockCertificateChain:
"""Mock certificate chain for PlayReady compatibility."""
def __init__(self, name: str):
self._name = name
def get_name(self) -> str:
return self._name
class Key:
"""Key object compatible with pywidevine."""
def __init__(self, kid: str, key: str, type_: str = "CONTENT"):
if isinstance(kid, str):
clean_kid = kid.replace("-", "")
if len(clean_kid) == 32:
self.kid = UUID(hex=clean_kid)
else:
self.kid = UUID(hex=clean_kid.ljust(32, "0"))
else:
self.kid = kid
if isinstance(key, str):
self.key = bytes.fromhex(key)
else:
self.key = key
self.type = type_
class DecryptLabsRemoteCDMExceptions:
"""Exception classes for compatibility with pywidevine CDM."""
class InvalidSession(Exception):
"""Raised when session ID is invalid."""
class TooManySessions(Exception):
"""Raised when session limit is reached."""
class InvalidInitData(Exception):
"""Raised when PSSH/init data is invalid."""
class InvalidLicenseType(Exception):
"""Raised when license type is invalid."""
class InvalidLicenseMessage(Exception):
"""Raised when license message is invalid."""
class InvalidContext(Exception):
"""Raised when session has no context data."""
class SignatureMismatch(Exception):
"""Raised when signature verification fails."""
class DecryptLabsRemoteCDM:
"""
Decrypt Labs Remote CDM implementation compatible with pywidevine's CDM interface.
This class provides a drop-in replacement for pywidevine's local CDM using
Decrypt Labs' KeyXtractor API service.
"""
service_certificate_challenge = b"\x08\x04"
def __init__( def __init__(
self, self,
device_type: Union[DeviceTypes, str],
system_id: int,
security_level: int,
host: str,
secret: str, secret: str,
device_name: str, host: str = "https://keyxtractor.decryptlabs.com",
service_name: str, device_name: str = "ChromeCDM",
service_name: Optional[str] = None,
vaults: Optional[Vaults] = None,
device_type: Optional[str] = None,
system_id: Optional[int] = None,
security_level: Optional[int] = None,
**kwargs,
): ):
self.response_counter = 0 """
self.pssh = None Initialize Decrypt Labs Remote CDM for Widevine and PlayReady schemes.
self.api_session_ids = {}
self.license_request = None Args:
self.service_name = service_name secret: Decrypt Labs API key (matches config format)
host: Decrypt Labs API host URL (matches config format)
device_name: DRM scheme (ChromeCDM, L1, L2 for Widevine; SL2, SL3 for PlayReady)
service_name: Service name for key caching and vault operations
vaults: Vaults instance for local key caching
device_type: Device type (CHROME, ANDROID, PLAYREADY) - for compatibility
system_id: System ID - for compatibility
security_level: Security level - for compatibility
"""
_ = kwargs
self.secret = secret
self.host = host.rstrip("/")
self.device_name = device_name self.device_name = device_name
self.keys = {} self.service_name = service_name or ""
self.scheme = "L1" if device_name == "L1" else "widevine" self.vaults = vaults
try:
super().__init__(device_type, system_id, security_level, host, secret, device_name)
except Exception:
pass
self.req_session = requests.Session()
self.req_session.headers.update({"decrypt-labs-api-key": secret})
@classmethod self._device_type_str = device_type
def from_device(cls, device: Device) -> Type["DecryptLabsRemoteCDM"]: if device_type:
raise NotImplementedError("You cannot load a DecryptLabsRemoteCDM from a local Device file.") self.device_type = self._get_device_type_enum(device_type)
def open(self) -> bytes: self._is_playready = (device_type and device_type.upper() == "PLAYREADY") or (device_name in ["SL2", "SL3"])
# We stub this method to return a random session ID for now, later we save the api session id and resolve by our random generated one.
return bytes.fromhex(secrets.token_hex(16))
def close(self, session_id: bytes) -> None: if self._is_playready:
# We stub this method to do nothing. self.system_id = system_id or 0
pass self.security_level = security_level or (2000 if device_name == "SL2" else 3000)
else:
self.system_id = system_id or 26830
self.security_level = security_level or 3
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str: self._sessions: Dict[bytes, Dict[str, Any]] = {}
if isinstance(certificate, bytes): self._pssh_b64 = None
certificate = base64.b64encode(certificate).decode() self._http_session = Session()
self._http_session.headers.update(
# certificate needs to be base64 to be sent off to the API. {
# it needs to intentionally be kept as base64 encoded SignedMessage. "decrypt-labs-api-key": self.secret,
"Content-Type": "application/json",
self.req_session.signed_device_certificate = certificate "User-Agent": "unshackle-decrypt-labs-cdm/1.0",
self.req_session.privacy_mode = True }
return "success"
def get_service_certificate(self, session_id: bytes) -> Optional[SignedDrmCertificate]:
raise NotImplementedError("This method is not implemented in this CDM")
def get_license_challenge(
self, session_id: bytes, pssh: PSSH, license_type: str = "STREAMING", privacy_mode: bool = True
) -> bytes:
self.pssh = pssh
request_data = {
"init_data": self.pssh.dumps(),
"service_certificate": self.req_session.signed_device_certificate,
"scheme": self.scheme,
"service": self.service_name,
}
# Add required parameter for L1 scheme
if self.scheme == "L1":
request_data["get_cached_keys_if_exists"] = True
res = self.session(
self.host + "/get-request",
request_data,
) )
# Check if we got cached keys instead of a challenge def _get_device_type_enum(self, device_type: str):
if res.get("message_type") == "cached-keys": """Convert device type string to enum for compatibility."""
# Store cached keys directly device_type_upper = device_type.upper()
if session_id not in self.keys: if device_type_upper == "ANDROID":
self.keys[session_id] = [] return DeviceTypes.ANDROID
session_keys = self.keys[session_id] elif device_type_upper == "CHROME":
return DeviceTypes.CHROME
else:
return DeviceTypes.CHROME
for cached_key in res.get("cached_keys", []): @property
# Handle KID format - could be hex string or UUID string def is_playready(self) -> bool:
kid_str = cached_key["kid"] """Check if this CDM is in PlayReady mode."""
return self._is_playready
@property
def certificate_chain(self) -> MockCertificateChain:
"""Mock certificate chain for PlayReady compatibility."""
return MockCertificateChain(f"{self.device_name}_Remote")
def set_pssh_b64(self, pssh_b64: str) -> None:
"""Store base64-encoded PSSH data for PlayReady compatibility."""
self._pssh_b64 = pssh_b64
def _generate_session_id(self) -> bytes:
"""Generate a unique session ID."""
return secrets.token_bytes(16)
def _get_init_data_from_pssh(self, pssh: Any) -> str:
"""Extract init data from various PSSH formats."""
if self.is_playready and self._pssh_b64:
return self._pssh_b64
if hasattr(pssh, "dumps"):
dumps_result = pssh.dumps()
if isinstance(dumps_result, str):
try: try:
# Try as UUID string first base64.b64decode(dumps_result)
kid_uuid = UUID(kid_str) return dumps_result
except ValueError: except Exception:
try: return base64.b64encode(dumps_result.encode("utf-8")).decode("utf-8")
# Try as hex string (like the existing code) else:
kid_uuid = UUID(bytes=bytes.fromhex(kid_str)) return base64.b64encode(dumps_result).decode("utf-8")
except ValueError: elif hasattr(pssh, "raw"):
# Fallback: use Key.kid_to_uuid raw_data = pssh.raw
kid_uuid = Key.kid_to_uuid(kid_str) if isinstance(raw_data, str):
raw_data = raw_data.encode("utf-8")
return base64.b64encode(raw_data).decode("utf-8")
elif hasattr(pssh, "__class__") and "WrmHeader" in pssh.__class__.__name__:
if self.is_playready:
raise ValueError("PlayReady WRM header received but no PSSH B64 was set via set_pssh_b64()")
session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"]))) if hasattr(pssh, "raw_bytes"):
return base64.b64encode(pssh.raw_bytes).decode("utf-8")
elif hasattr(pssh, "bytes"):
return base64.b64encode(pssh.bytes).decode("utf-8")
else:
raise ValueError(f"Cannot extract PSSH data from WRM header type: {type(pssh)}")
else:
raise ValueError(f"Unsupported PSSH type: {type(pssh)}")
# Return empty challenge since we already have the keys def open(self) -> bytes:
self.license_request = "" """
self.api_session_ids[session_id] = None Open a new CDM session.
Returns:
Session identifier as bytes
"""
session_id = self._generate_session_id()
self._sessions[session_id] = {
"service_certificate": None,
"keys": [],
"pssh": None,
"challenge": None,
"decrypt_labs_session_id": None,
}
return session_id
def close(self, session_id: bytes) -> None:
"""
Close a CDM session.
Args:
session_id: Session identifier
Raises:
ValueError: If session ID is invalid
"""
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
del self._sessions[session_id]
def get_service_certificate(self, session_id: bytes) -> Optional[bytes]:
"""
Get the service certificate for a session.
Args:
session_id: Session identifier
Returns:
Service certificate if set, None otherwise
Raises:
ValueError: If session ID is invalid
"""
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
return self._sessions[session_id]["service_certificate"]
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
"""
Set the service certificate for a session.
Args:
session_id: Session identifier
certificate: Service certificate (bytes or base64 string)
Returns:
Certificate status message
Raises:
ValueError: If session ID is invalid
"""
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
if certificate is None:
self._sessions[session_id]["service_certificate"] = None
return "Removed"
if isinstance(certificate, str):
certificate = base64.b64decode(certificate)
self._sessions[session_id]["service_certificate"] = certificate
return "Successfully set Service Certificate"
def has_cached_keys(self, session_id: bytes) -> bool:
"""
Check if cached keys are available for the session.
Args:
session_id: Session identifier
Returns:
True if cached keys are available
Raises:
ValueError: If session ID is invalid
"""
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
session = self._sessions[session_id]
pssh = session.get("pssh")
if not pssh:
return False
if self.vaults:
key_ids = []
if hasattr(pssh, "key_ids"):
key_ids = pssh.key_ids
elif hasattr(pssh, "kids"):
key_ids = pssh.kids
for kid in key_ids:
key, _ = self.vaults.get_key(kid)
if key and key.count("0") != len(key):
return True
if self.service_name:
try:
key_ids = []
if hasattr(pssh, "key_ids"):
key_ids = [kid.hex for kid in pssh.key_ids]
elif hasattr(pssh, "kids"):
key_ids = [kid.hex for kid in pssh.kids]
if key_ids:
response = self._http_session.post(
f"{self.host}/get-cached-keys",
json={"service": self.service_name, "kid": key_ids},
timeout=30,
)
if response.status_code == 200:
data = response.json()
return (
data.get("message") == "success"
and "cached_keys" in data
and isinstance(data["cached_keys"], list)
and len(data["cached_keys"]) > 0
)
except Exception:
pass
return False
def get_license_challenge(
self, session_id: bytes, pssh_or_wrm: Any, license_type: str = "STREAMING", privacy_mode: bool = True
) -> bytes:
"""
Generate a license challenge using Decrypt Labs API.
Args:
session_id: Session identifier
pssh_or_wrm: PSSH object or WRM header (for PlayReady compatibility)
license_type: Type of license (STREAMING, OFFLINE, AUTOMATIC) - for compatibility only
privacy_mode: Whether to use privacy mode - for compatibility only
Returns:
License challenge as bytes
Raises:
InvalidSession: If session ID is invalid
requests.RequestException: If API request fails
"""
_ = license_type, privacy_mode
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
session = self._sessions[session_id]
session["pssh"] = pssh_or_wrm
init_data = self._get_init_data_from_pssh(pssh_or_wrm)
if self.has_cached_keys(session_id):
self._load_cached_keys(session_id)
return b"" return b""
# Normal challenge response request_data = {"scheme": self.device_name, "init_data": init_data}
self.license_request = res["challenge"]
self.api_session_ids[session_id] = res["session_id"]
return base64.b64decode(self.license_request) if self.device_name in ["L1", "L2", "SL2", "SL3"] and self.service_name:
request_data["service"] = self.service_name
def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None: if session["service_certificate"]:
session_id_api = self.api_session_ids[session_id] request_data["service_certificate"] = base64.b64encode(session["service_certificate"]).decode("utf-8")
if session_id not in self.keys:
self.keys[session_id] = []
session_keys = self.keys[session_id]
# If we already have cached keys and no session_id_api, skip processing response = self._http_session.post(f"{self.host}/get-request", json=request_data, timeout=30)
if session_id_api is None and session_keys:
if response.status_code != 200:
raise requests.RequestException(f"API request failed: {response.status_code} {response.text}")
data = response.json()
if data.get("message") != "success":
error_msg = data.get("message", "Unknown error")
if "details" in data:
error_msg += f" - Details: {data['details']}"
if "error" in data:
error_msg += f" - Error: {data['error']}"
raise requests.RequestException(f"API error: {error_msg}")
if data.get("message_type") == "cached-keys" or "cached_keys" in data:
cached_keys = data.get("cached_keys", [])
session["keys"] = self._parse_cached_keys(cached_keys)
return b""
challenge = base64.b64decode(data["challenge"])
session["challenge"] = challenge
session["decrypt_labs_session_id"] = data["session_id"]
return challenge
def parse_license(self, session_id: bytes, license_message: Union[bytes, str]) -> None:
"""
Parse license response using Decrypt Labs API.
Args:
session_id: Session identifier
license_message: License response from license server
Raises:
ValueError: If session ID is invalid or no challenge available
requests.RequestException: If API request fails
"""
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
session = self._sessions[session_id]
if session["keys"]:
return return
if isinstance(license_message, dict) and "keys" in license_message: if not session.get("challenge") or not session.get("decrypt_labs_session_id"):
session_keys.extend( raise ValueError("No challenge available - call get_license_challenge first")
[
Key(kid=Key.kid_to_uuid(x["kid"]), type_=x.get("type", "CONTENT"), key=bytes.fromhex(x["key"]))
for x in license_message["keys"]
]
)
else: if isinstance(license_message, str):
# Ensure license_message is base64 encoded if self.is_playready and license_message.strip().startswith("<?xml"):
if isinstance(license_message, bytes): license_message = license_message.encode("utf-8")
license_response_b64 = base64.b64encode(license_message).decode()
elif isinstance(license_message, str):
license_response_b64 = license_message
else: else:
license_response_b64 = str(license_message) try:
res = self.session( license_message = base64.b64decode(license_message)
self.host + "/decrypt-response", except Exception:
{ license_message = license_message.encode("utf-8")
"session_id": session_id_api,
"init_data": self.pssh.dumps(),
"license_request": self.license_request,
"license_response": license_response_b64,
"scheme": self.scheme,
},
)
original_keys = res["keys"].replace("\n", " ") pssh = session["pssh"]
keys_separated = original_keys.split("--key ") init_data = self._get_init_data_from_pssh(pssh)
formatted_keys = []
for k in keys_separated: license_request_b64 = base64.b64encode(session["challenge"]).decode("utf-8")
if ":" in k: license_response_b64 = base64.b64encode(license_message).decode("utf-8")
key = k.strip()
formatted_keys.append(key) request_data = {
for keys in formatted_keys: "scheme": self.device_name,
session_keys.append( "session_id": session["decrypt_labs_session_id"],
( "init_data": init_data,
Key( "license_request": license_request_b64,
kid=UUID(bytes=bytes.fromhex(keys.split(":")[0])), "license_response": license_response_b64,
type_="CONTENT", }
key=bytes.fromhex(keys.split(":")[1]),
) response = self._http_session.post(f"{self.host}/decrypt-response", json=request_data, timeout=30)
if response.status_code != 200:
raise requests.RequestException(f"License decrypt failed: {response.status_code} {response.text}")
data = response.json()
if data.get("message") != "success":
error_msg = data.get("message", "Unknown error")
if "error" in data:
error_msg += f" - Error: {data['error']}"
if "details" in data:
error_msg += f" - Details: {data['details']}"
raise requests.RequestException(f"License decrypt error: {error_msg}")
session["keys"] = self._parse_keys_response(data)
if self.vaults and session["keys"]:
key_dict = {UUID(hex=key["kid"]): key["key"] for key in session["keys"] if key["type"] == "CONTENT"}
self.vaults.add_keys(key_dict)
def get_keys(self, session_id: bytes, type_: Optional[str] = None) -> List[Key]:
"""
Get keys from the session.
Args:
session_id: Session identifier
type_: Optional key type filter (CONTENT, SIGNING, etc.)
Returns:
List of Key objects
Raises:
InvalidSession: If session ID is invalid
"""
if session_id not in self._sessions:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
key_dicts = self._sessions[session_id]["keys"]
keys = [Key(kid=k["kid"], key=k["key"], type_=k["type"]) for k in key_dicts]
if type_:
keys = [key for key in keys if key.type == type_]
return keys
def _load_cached_keys(self, session_id: bytes) -> None:
"""Load cached keys from vaults and Decrypt Labs API."""
session = self._sessions[session_id]
pssh = session["pssh"]
keys = []
if self.vaults:
key_ids = []
if hasattr(pssh, "key_ids"):
key_ids = pssh.key_ids
elif hasattr(pssh, "kids"):
key_ids = pssh.kids
for kid in key_ids:
key, _ = self.vaults.get_key(kid)
if key and key.count("0") != len(key):
keys.append({"kid": kid.hex, "key": key, "type": "CONTENT"})
if not keys and self.service_name:
try:
key_ids = []
if hasattr(pssh, "key_ids"):
key_ids = [kid.hex for kid in pssh.key_ids]
elif hasattr(pssh, "kids"):
key_ids = [kid.hex for kid in pssh.kids]
if key_ids:
response = self._http_session.post(
f"{self.host}/get-cached-keys",
json={"service": self.service_name, "kid": key_ids},
timeout=30,
) )
if response.status_code == 200:
data = response.json()
if data.get("message") == "success" and "cached_keys" in data:
keys = self._parse_cached_keys(data["cached_keys"])
except Exception:
pass
session["keys"] = keys
def _parse_cached_keys(self, cached_keys_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Parse cached keys from API response.
Args:
cached_keys_data: List of cached key objects from API
Returns:
List of key dictionaries
"""
keys = []
try:
if cached_keys_data and isinstance(cached_keys_data, list):
for key_data in cached_keys_data:
if "kid" in key_data and "key" in key_data:
keys.append({"kid": key_data["kid"], "key": key_data["key"], "type": "CONTENT"})
except Exception:
pass
return keys
def _parse_keys_response(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Parse keys from decrypt response."""
keys = []
if "keys" in data and isinstance(data["keys"], str):
keys_string = data["keys"]
for line in keys_string.split("\n"):
line = line.strip()
if line.startswith("--key "):
key_part = line[6:]
if ":" in key_part:
kid, key = key_part.split(":", 1)
keys.append({"kid": kid.strip(), "key": key.strip(), "type": "CONTENT"})
elif "keys" in data and isinstance(data["keys"], list):
for key_data in data["keys"]:
keys.append(
{"kid": key_data.get("kid"), "key": key_data.get("key"), "type": key_data.get("type", "CONTENT")}
) )
def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]: return keys
return self.keys[session_id]
def session(self, url, data, retries=3):
res = self.req_session.post(url, json=data).json()
if res.get("message") != "success": __all__ = ["DecryptLabsRemoteCDM"]
if "License Response Decryption Process Failed at the very beginning" in res.get("Error", ""):
if retries > 0:
return self.session(url, data, retries=retries - 1)
else:
raise ValueError(f"CDM API returned an error: {res['Error']}")
else:
raise ValueError(f"CDM API returned an error: {res['Error']}")
return res

View File

@@ -224,14 +224,59 @@ class PlayReady:
def kids(self) -> list[UUID]: def kids(self) -> list[UUID]:
return self._kids return self._kids
def _extract_keys_from_cdm(self, cdm: PlayReadyCdm, session_id: bytes) -> dict:
"""Extract keys from CDM session with cross-library compatibility.
Args:
cdm: CDM instance
session_id: Session identifier
Returns:
Dictionary mapping KID UUIDs to hex keys
"""
keys = {}
for key in cdm.get_keys(session_id):
if hasattr(key, "key_id"):
kid = key.key_id
elif hasattr(key, "kid"):
kid = key.kid
else:
continue
if hasattr(key, "key") and hasattr(key.key, "hex"):
key_hex = key.key.hex()
elif hasattr(key, "key") and isinstance(key.key, bytes):
key_hex = key.key.hex()
elif hasattr(key, "key") and isinstance(key.key, str):
key_hex = key.key
else:
continue
keys[kid] = key_hex
return keys
def get_content_keys(self, cdm: PlayReadyCdm, certificate: Callable, licence: Callable) -> None: def get_content_keys(self, cdm: PlayReadyCdm, certificate: Callable, licence: Callable) -> None:
for kid in self.kids: for kid in self.kids:
if kid in self.content_keys: if kid in self.content_keys:
continue continue
session_id = cdm.open() session_id = cdm.open()
try: try:
if hasattr(cdm, "set_pssh_b64") and self.pssh_b64:
cdm.set_pssh_b64(self.pssh_b64)
challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0]) challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0])
license_res = licence(challenge=challenge)
try:
license_res = licence(challenge=challenge)
except Exception:
if hasattr(cdm, "use_cached_keys_as_fallback"):
if cdm.use_cached_keys_as_fallback(session_id):
keys = self._extract_keys_from_cdm(cdm, session_id)
self.content_keys.update(keys)
continue
raise
if isinstance(license_res, bytes): if isinstance(license_res, bytes):
license_str = license_res.decode(errors="ignore") license_str = license_res.decode(errors="ignore")
@@ -245,7 +290,7 @@ class PlayReady:
pass pass
cdm.parse_license(session_id, license_str) cdm.parse_license(session_id, license_str)
keys = {key.key_id: key.key.hex() for key in cdm.get_keys(session_id)} keys = self._extract_keys_from_cdm(cdm, session_id)
self.content_keys.update(keys) self.content_keys.update(keys)
finally: finally:
cdm.close(session_id) cdm.close(session_id)

View File

@@ -185,7 +185,12 @@ class Widevine:
if cert and hasattr(cdm, "set_service_certificate"): if cert and hasattr(cdm, "set_service_certificate"):
cdm.set_service_certificate(session_id, cert) cdm.set_service_certificate(session_id, cert)
cdm.parse_license(session_id, licence(challenge=cdm.get_license_challenge(session_id, self.pssh))) challenge = cdm.get_license_challenge(session_id, self.pssh)
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
pass
else:
cdm.parse_license(session_id, licence(challenge=challenge))
self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")} self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")}
if not self.content_keys: if not self.content_keys:
@@ -213,10 +218,15 @@ class Widevine:
if cert and hasattr(cdm, "set_service_certificate"): if cert and hasattr(cdm, "set_service_certificate"):
cdm.set_service_certificate(session_id, cert) cdm.set_service_certificate(session_id, cert)
cdm.parse_license( challenge = cdm.get_license_challenge(session_id, self.pssh)
session_id,
licence(session_id=session_id, challenge=cdm.get_license_challenge(session_id, self.pssh)), if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
) pass
else:
cdm.parse_license(
session_id,
licence(session_id=session_id, challenge=challenge),
)
self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")} self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")}
if not self.content_keys: if not self.content_keys:

View File

@@ -420,6 +420,15 @@ class Track:
for drm in self.drm: for drm in self.drm:
if isinstance(drm, PlayReady): if isinstance(drm, PlayReady):
return drm return drm
elif hasattr(cdm, 'is_playready'):
if cdm.is_playready:
for drm in self.drm:
if isinstance(drm, PlayReady):
return drm
else:
for drm in self.drm:
if isinstance(drm, Widevine):
return drm
return self.drm[0] return self.drm[0]

View File

@@ -11,6 +11,7 @@ from typing import Optional, Tuple
import requests import requests
from requests.adapters import HTTPAdapter, Retry from requests.adapters import HTTPAdapter, Retry
from xml.sax.saxutils import escape
from unshackle.core import binaries from unshackle.core import binaries
from unshackle.core.config import config from unshackle.core.config import config
@@ -289,9 +290,9 @@ def _apply_tags(path: Path, tags: dict[str, str]) -> None:
log.debug("mkvpropedit not found on PATH; skipping tags") log.debug("mkvpropedit not found on PATH; skipping tags")
return return
log.debug("Applying tags to %s: %s", path, tags) log.debug("Applying tags to %s: %s", path, tags)
xml_lines = ["<?xml version='1.0' encoding='UTF-8'?>", "<Tags>", " <Tag>", " <Targets/>"] xml_lines = ['<?xml version="1.0" encoding="UTF-8"?>', "<Tags>", " <Tag>", " <Targets/>"]
for name, value in tags.items(): for name, value in tags.items():
xml_lines.append(f" <Simple><Name>{name}</Name><String>{value}</String></Simple>") xml_lines.append(f" <Simple><Name>{escape(name)}</Name><String>{escape(value)}</String></Simple>")
xml_lines.extend([" </Tag>", "</Tags>"]) xml_lines.extend([" </Tag>", "</Tags>"])
with tempfile.NamedTemporaryFile("w", suffix=".xml", delete=False) as f: with tempfile.NamedTemporaryFile("w", suffix=".xml", delete=False) as f:
f.write("\n".join(xml_lines)) f.write("\n".join(xml_lines))
@@ -351,11 +352,11 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) ->
show_ids = simkl_data.get("show", {}).get("ids", {}) show_ids = simkl_data.get("show", {}).get("ids", {})
if show_ids.get("imdb"): if show_ids.get("imdb"):
standard_tags["IMDB"] = f"https://www.imdb.com/title/{show_ids['imdb']}" standard_tags["IMDB"] = show_ids["imdb"]
if show_ids.get("tvdb"): if show_ids.get("tvdb"):
standard_tags["TVDB"] = f"https://thetvdb.com/dereferrer/series/{show_ids['tvdb']}" standard_tags["TVDB"] = str(show_ids["tvdb"])
if show_ids.get("tmdbtv"): if show_ids.get("tmdbtv"):
standard_tags["TMDB"] = f"https://www.themoviedb.org/tv/{show_ids['tmdbtv']}" standard_tags["TMDB"] = f"tv/{show_ids['tmdbtv']}"
# Use TMDB API for additional metadata (either from provided ID or Simkl lookup) # Use TMDB API for additional metadata (either from provided ID or Simkl lookup)
api_key = _api_key() api_key = _api_key()
@@ -373,8 +374,8 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) ->
_apply_tags(path, custom_tags) _apply_tags(path, custom_tags)
return return
tmdb_url = f"https://www.themoviedb.org/{'movie' if kind == 'movie' else 'tv'}/{tmdb_id}" prefix = "movie" if kind == "movie" else "tv"
standard_tags["TMDB"] = tmdb_url standard_tags["TMDB"] = f"{prefix}/{tmdb_id}"
try: try:
ids = external_ids(tmdb_id, kind) ids = external_ids(tmdb_id, kind)
except requests.RequestException as exc: except requests.RequestException as exc:
@@ -385,11 +386,10 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) ->
imdb_id = ids.get("imdb_id") imdb_id = ids.get("imdb_id")
if imdb_id: if imdb_id:
standard_tags["IMDB"] = f"https://www.imdb.com/title/{imdb_id}" standard_tags["IMDB"] = imdb_id
tvdb_id = ids.get("tvdb_id") tvdb_id = ids.get("tvdb_id")
if tvdb_id: if tvdb_id:
tvdb_prefix = "movies" if kind == "movie" else "series" standard_tags["TVDB"] = str(tvdb_id)
standard_tags["TVDB"] = f"https://thetvdb.com/dereferrer/{tvdb_prefix}/{tvdb_id}"
merged_tags = { merged_tags = {
**custom_tags, **custom_tags,

View File

@@ -105,6 +105,50 @@ remote_cdm:
host: https://domain-2.com/api host: https://domain-2.com/api
secret: secret_key secret: secret_key
- name: "decrypt_labs_chrome"
type: "decrypt_labs" # Required to identify as DecryptLabs CDM
device_name: "ChromeCDM" # Scheme identifier - must match exactly
device_type: CHROME
system_id: 4464 # Doesn't matter
security_level: 3
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here" # Replace with your API key
- name: "decrypt_labs_l1"
type: "decrypt_labs"
device_name: "L1" # Scheme identifier - must match exactly
device_type: ANDROID
system_id: 4464
security_level: 1
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
- name: "decrypt_labs_l2"
type: "decrypt_labs"
device_name: "L2" # Scheme identifier - must match exactly
device_type: ANDROID
system_id: 4464
security_level: 2
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
- name: "decrypt_labs_playready_sl2"
type: "decrypt_labs"
device_name: "SL2" # Scheme identifier - must match exactly
device_type: PLAYREADY
system_id: 0
security_level: 2000
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
- name: "decrypt_labs_playready_sl3"
type: "decrypt_labs"
device_name: "SL3" # Scheme identifier - must match exactly
device_type: PLAYREADY
system_id: 0
security_level: 3000
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
# Key Vaults store your obtained Content Encryption Keys (CEKs) # Key Vaults store your obtained Content Encryption Keys (CEKs)
# Use 'no_push: true' to prevent a vault from receiving pushed keys # Use 'no_push: true' to prevent a vault from receiving pushed keys
# while still allowing it to provide keys when requested # while still allowing it to provide keys when requested
@@ -171,7 +215,7 @@ chapter_fallback_name: "Chapter {j:02}"
# Case-Insensitive dictionary of headers for all Services # Case-Insensitive dictionary of headers for all Services
headers: headers:
Accept-Language: "en-US,en;q=0.8" Accept-Language: "en-US,en;q=0.8"
User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36" User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
# Override default filenames used across unshackle # Override default filenames used across unshackle
filenames: filenames:
@@ -213,6 +257,13 @@ services:
# Global service config # Global service config
api_key: "service_api_key" api_key: "service_api_key"
# Service certificate for Widevine L1/L2 (base64 encoded)
# This certificate is automatically used when L1/L2 schemes are selected
# Services obtain this from their DRM provider or license server
certificate: |
CAUSwwUKvQIIAxIQ5US6QAvBDzfTtjb4tU/7QxiH8c+TBSKOAjCCAQoCggEBAObzvlu2hZRsapAPx4Aa4GUZj4/GjxgXUtBH4THSkM40x63wQeyVxlEEo
# ... (full base64 certificate here)
# Profile-specific device configurations # Profile-specific device configurations
profiles: profiles:
john_sd: john_sd:

2
uv.lock generated
View File

@@ -1499,7 +1499,7 @@ wheels = [
[[package]] [[package]]
name = "unshackle" name = "unshackle"
version = "1.4.3" version = "1.4.4"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "appdirs" }, { name = "appdirs" },