diff --git a/pyproject.toml b/pyproject.toml index 2a0b84f..2c4c67c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "unshackle" -version = "1.4.3" +version = "1.4.4" description = "Modular Movie, TV, and Music Archival Software." authors = [{ name = "unshackle team" }] requires-python = ">=3.10,<3.13" diff --git a/unshackle/commands/dl.py b/unshackle/commands/dl.py index 254b807..eac5e7d 100644 --- a/unshackle/commands/dl.py +++ b/unshackle/commands/dl.py @@ -345,7 +345,10 @@ class dl: sys.exit(1) if self.cdm: - if hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]: + if isinstance(self.cdm, DecryptLabsRemoteCDM): + drm_type = "PlayReady" if self.cdm.is_playready else "Widevine" + self.log.info(f"Loaded {drm_type} Remote CDM: DecryptLabs (L{self.cdm.security_level})") + elif hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]: self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})") else: self.log.info( @@ -874,7 +877,12 @@ class dl: ), licence=partial( service.get_playready_license - if isinstance(self.cdm, PlayReadyCdm) + if ( + isinstance(self.cdm, PlayReadyCdm) + or ( + isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready + ) + ) and hasattr(service, "get_playready_license") else service.get_widevine_license, title=title, @@ -1201,10 +1209,22 @@ class dl: if not drm: return - if isinstance(drm, Widevine) and not isinstance(self.cdm, WidevineCdm): - self.cdm = self.get_cdm(self.service, self.profile, drm="widevine") - elif isinstance(drm, PlayReady) and not isinstance(self.cdm, PlayReadyCdm): - self.cdm = self.get_cdm(self.service, self.profile, drm="playready") + if isinstance(drm, Widevine): + if not isinstance(self.cdm, (WidevineCdm, DecryptLabsRemoteCDM)) or ( + isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready + ): + widevine_cdm = self.get_cdm(self.service, self.profile, drm="widevine") + if widevine_cdm: + self.log.info("Switching to Widevine CDM for Widevine content") + self.cdm = widevine_cdm + elif isinstance(drm, PlayReady): + if not isinstance(self.cdm, (PlayReadyCdm, DecryptLabsRemoteCDM)) or ( + isinstance(self.cdm, DecryptLabsRemoteCDM) and not self.cdm.is_playready + ): + playready_cdm = self.get_cdm(self.service, self.profile, drm="playready") + if playready_cdm: + self.log.info("Switching to PlayReady CDM for PlayReady content") + self.cdm = playready_cdm if isinstance(drm, Widevine): with self.DRM_TABLE_LOCK: @@ -1477,26 +1497,17 @@ class dl: cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None) if cdm_api: - is_decrypt_lab = True if cdm_api["type"] == "decrypt_labs" else False + is_decrypt_lab = True if cdm_api.get("type") == "decrypt_labs" else False if is_decrypt_lab: - device_type = cdm_api.get("device_type") del cdm_api["name"] del cdm_api["type"] - # Use the appropriate DecryptLabs CDM class based on device type - if device_type == "PLAYREADY" or cdm_api.get("device_name") in ["SL2", "SL3"]: - from unshackle.core.cdm.decrypt_labs_remote_cdm import DecryptLabsRemotePlayReadyCDM - - # Remove unused parameters for PlayReady CDM - cdm_params = cdm_api.copy() - cdm_params.pop("device_type", None) - cdm_params.pop("system_id", None) - return DecryptLabsRemotePlayReadyCDM(service_name=service, vaults=self.vaults, **cdm_params) - else: - return DecryptLabsRemoteCDM(service_name=service, vaults=self.vaults, **cdm_api) + # All DecryptLabs CDMs use DecryptLabsRemoteCDM + return DecryptLabsRemoteCDM(service_name=service, vaults=self.vaults, **cdm_api) else: del cdm_api["name"] - del cdm_api["type"] + if "type" in cdm_api: + del cdm_api["type"] return RemoteCdm(**cdm_api) prd_path = config.directories.prds / f"{cdm_name}.prd" diff --git a/unshackle/core/__init__.py b/unshackle/core/__init__.py index aa56ed4..c0f285b 100644 --- a/unshackle/core/__init__.py +++ b/unshackle/core/__init__.py @@ -1 +1 @@ -__version__ = "1.4.3" +__version__ = "1.4.4" diff --git a/unshackle/core/cdm/decrypt_labs_remote_cdm.py b/unshackle/core/cdm/decrypt_labs_remote_cdm.py index f356e40..b9f4250 100644 --- a/unshackle/core/cdm/decrypt_labs_remote_cdm.py +++ b/unshackle/core/cdm/decrypt_labs_remote_cdm.py @@ -1,687 +1,585 @@ +from __future__ import annotations + import base64 import secrets -from typing import Optional, Type, Union +from typing import Any, Dict, List, Optional, Union from uuid import UUID import requests -from pyplayready.cdm import Cdm as PlayReadyCdm -from pywidevine import PSSH, Device, DeviceTypes, Key, RemoteCdm -from pywidevine.license_protocol_pb2 import SignedDrmCertificate, SignedMessage +from pywidevine.device import DeviceTypes +from requests import Session -# Copyright 2024 by DevYukine. -# Copyright 2025 by sp4rk.y. +from unshackle.core.vaults import Vaults -class DecryptLabsRemoteCDM(RemoteCdm): - """Remote CDM implementation for DecryptLabs KeyXtractor API. +class MockCertificateChain: + """Mock certificate chain for PlayReady compatibility.""" - Provides CDM functionality through DecryptLabs' remote API service, - supporting multiple DRM schemes including Widevine and PlayReady. + def __init__(self, name: str): + self._name = name + + def get_name(self) -> str: + return self._name + + +class Key: + """Key object compatible with pywidevine.""" + + def __init__(self, kid: str, key: str, type_: str = "CONTENT"): + if isinstance(kid, str): + clean_kid = kid.replace("-", "") + if len(clean_kid) == 32: + self.kid = UUID(hex=clean_kid) + else: + self.kid = UUID(hex=clean_kid.ljust(32, "0")) + else: + self.kid = kid + + if isinstance(key, str): + self.key = bytes.fromhex(key) + else: + self.key = key + + self.type = type_ + + +class DecryptLabsRemoteCDMExceptions: + """Exception classes for compatibility with pywidevine CDM.""" + + class InvalidSession(Exception): + """Raised when session ID is invalid.""" + + class TooManySessions(Exception): + """Raised when session limit is reached.""" + + class InvalidInitData(Exception): + """Raised when PSSH/init data is invalid.""" + + class InvalidLicenseType(Exception): + """Raised when license type is invalid.""" + + class InvalidLicenseMessage(Exception): + """Raised when license message is invalid.""" + + class InvalidContext(Exception): + """Raised when session has no context data.""" + + class SignatureMismatch(Exception): + """Raised when signature verification fails.""" + + +class DecryptLabsRemoteCDM: """ + Decrypt Labs Remote CDM implementation compatible with pywidevine's CDM interface. + + This class provides a drop-in replacement for pywidevine's local CDM using + Decrypt Labs' KeyXtractor API service. + """ + + service_certificate_challenge = b"\x08\x04" def __init__( self, - device_type: Union[DeviceTypes, str], - system_id: int, - security_level: int, - host: str, secret: str, - device_name: str, - service_name: str, - vaults=None, + host: str = "https://keyxtractor.decryptlabs.com", + device_name: str = "ChromeCDM", + service_name: Optional[str] = None, + vaults: Optional[Vaults] = None, + device_type: Optional[str] = None, + system_id: Optional[int] = None, + security_level: Optional[int] = None, + **kwargs, ): - """Initialize DecryptLabs Remote CDM. + """ + Initialize Decrypt Labs Remote CDM for Widevine and PlayReady schemes. Args: - device_type: Type of device to emulate - system_id: System identifier - security_level: DRM security level - host: DecryptLabs API host URL - secret: DecryptLabs API key for authentication - device_name: Device/scheme name (used as scheme identifier) - service_name: Service/platform name - vaults: Optional vaults reference for caching keys + secret: Decrypt Labs API key (matches config format) + host: Decrypt Labs API host URL (matches config format) + device_name: DRM scheme (ChromeCDM, L1, L2 for Widevine; SL2, SL3 for PlayReady) + service_name: Service name for key caching and vault operations + vaults: Vaults instance for local key caching + device_type: Device type (CHROME, ANDROID, PLAYREADY) - for compatibility + system_id: System ID - for compatibility + security_level: Security level - for compatibility """ - self.response_counter = 0 - self.pssh = None - self.api_session_ids = {} - self.license_request = None - self.service_name = service_name + _ = kwargs + + self.secret = secret + self.host = host.rstrip("/") self.device_name = device_name - self.keys = {} - self.scheme = device_name - self._has_cached_keys = False + self.service_name = service_name or "" self.vaults = vaults - self.security_level = security_level - self.host = host - class MockCertificateChain: - """Mock certificate chain for DecryptLabs remote CDM compatibility.""" + self._device_type_str = device_type + if device_type: + self.device_type = self._get_device_type_enum(device_type) - def __init__(self, scheme: str, security_level: int): - self.scheme = scheme - self.security_level = security_level + self._is_playready = (device_type and device_type.upper() == "PLAYREADY") or (device_name in ["SL2", "SL3"]) - def get_name(self) -> str: - """Return the certificate chain name for logging.""" - return f"DecryptLabs-{self.scheme}" + if self._is_playready: + self.system_id = system_id or 0 + self.security_level = security_level or (2000 if device_name == "SL2" else 3000) + else: + self.system_id = system_id or 26830 + self.security_level = security_level or 3 - def get_security_level(self) -> int: - """Return the security level.""" - return self.security_level + self._sessions: Dict[bytes, Dict[str, Any]] = {} + self._pssh_b64 = None + self._http_session = Session() + self._http_session.headers.update( + { + "decrypt-labs-api-key": self.secret, + "Content-Type": "application/json", + "User-Agent": "unshackle-decrypt-labs-cdm/1.0", + } + ) - self.certificate_chain = MockCertificateChain(self.scheme, security_level) - try: - super().__init__(device_type, system_id, security_level, host, secret, device_name) - except Exception: - pass - self.req_session = requests.Session() - self.req_session.headers.update({"decrypt-labs-api-key": secret}) + def _get_device_type_enum(self, device_type: str): + """Convert device type string to enum for compatibility.""" + device_type_upper = device_type.upper() + if device_type_upper == "ANDROID": + return DeviceTypes.ANDROID + elif device_type_upper == "CHROME": + return DeviceTypes.CHROME + else: + return DeviceTypes.CHROME - @classmethod - def from_device(cls, device: Device) -> Type["DecryptLabsRemoteCDM"]: - raise NotImplementedError("You cannot load a DecryptLabsRemoteCDM from a local Device file.") + @property + def is_playready(self) -> bool: + """Check if this CDM is in PlayReady mode.""" + return self._is_playready + + @property + def certificate_chain(self) -> MockCertificateChain: + """Mock certificate chain for PlayReady compatibility.""" + return MockCertificateChain(f"{self.device_name}_Remote") + + def set_pssh_b64(self, pssh_b64: str) -> None: + """Store base64-encoded PSSH data for PlayReady compatibility.""" + self._pssh_b64 = pssh_b64 + + def _generate_session_id(self) -> bytes: + """Generate a unique session ID.""" + return secrets.token_bytes(16) + + def _get_init_data_from_pssh(self, pssh: Any) -> str: + """Extract init data from various PSSH formats.""" + if self.is_playready and self._pssh_b64: + return self._pssh_b64 + + if hasattr(pssh, "dumps"): + dumps_result = pssh.dumps() + + if isinstance(dumps_result, str): + try: + base64.b64decode(dumps_result) + return dumps_result + except Exception: + return base64.b64encode(dumps_result.encode("utf-8")).decode("utf-8") + else: + return base64.b64encode(dumps_result).decode("utf-8") + elif hasattr(pssh, "raw"): + raw_data = pssh.raw + if isinstance(raw_data, str): + raw_data = raw_data.encode("utf-8") + return base64.b64encode(raw_data).decode("utf-8") + elif hasattr(pssh, "__class__") and "WrmHeader" in pssh.__class__.__name__: + if self.is_playready: + raise ValueError("PlayReady WRM header received but no PSSH B64 was set via set_pssh_b64()") + + if hasattr(pssh, "raw_bytes"): + return base64.b64encode(pssh.raw_bytes).decode("utf-8") + elif hasattr(pssh, "bytes"): + return base64.b64encode(pssh.bytes).decode("utf-8") + else: + raise ValueError(f"Cannot extract PSSH data from WRM header type: {type(pssh)}") + else: + raise ValueError(f"Unsupported PSSH type: {type(pssh)}") def open(self) -> bytes: - """Open a new CDM session. + """ + Open a new CDM session. Returns: - Random session ID bytes for internal tracking + Session identifier as bytes """ - return bytes.fromhex(secrets.token_hex(16)) + session_id = self._generate_session_id() + self._sessions[session_id] = { + "service_certificate": None, + "keys": [], + "pssh": None, + "challenge": None, + "decrypt_labs_session_id": None, + } + return session_id def close(self, session_id: bytes) -> None: - """Close a CDM session. + """ + Close a CDM session. Args: - session_id: Session identifier to close + session_id: Session identifier + + Raises: + ValueError: If session ID is invalid """ - pass + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") + + del self._sessions[session_id] + + def get_service_certificate(self, session_id: bytes) -> Optional[bytes]: + """ + Get the service certificate for a session. + + Args: + session_id: Session identifier + + Returns: + Service certificate if set, None otherwise + + Raises: + ValueError: If session ID is invalid + """ + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") + + return self._sessions[session_id]["service_certificate"] def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str: - """Set service certificate for L1/L2 schemes. + """ + Set the service certificate for a session. Args: session_id: Session identifier certificate: Service certificate (bytes or base64 string) Returns: - Success status string + Certificate status message + + Raises: + ValueError: If session ID is invalid """ - if isinstance(certificate, bytes): - certificate = base64.b64encode(certificate).decode() + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") - self.service_certificate = certificate - self.privacy_mode = True + if certificate is None: + self._sessions[session_id]["service_certificate"] = None + return "Removed" - return "success" + if isinstance(certificate, str): + certificate = base64.b64decode(certificate) - def get_service_certificate(self, session_id: bytes) -> Optional[SignedDrmCertificate]: - raise NotImplementedError("This method is not implemented in this CDM") + self._sessions[session_id]["service_certificate"] = certificate + return "Successfully set Service Certificate" - def get_license_challenge( - self, session_id: bytes, pssh: PSSH, license_type: str = "STREAMING", privacy_mode: bool = True - ) -> bytes: - """Generate license challenge using DecryptLabs API. + def has_cached_keys(self, session_id: bytes) -> bool: + """ + Check if cached keys are available for the session. Args: session_id: Session identifier - pssh: PSSH initialization data - license_type: Type of license (default: "STREAMING") - privacy_mode: Enable privacy mode Returns: - License challenge bytes or empty bytes if using cached keys + True if cached keys are available + + Raises: + ValueError: If session ID is invalid """ - self.pssh = pssh + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") - scheme_to_use = self.scheme - try: - pssh_data = pssh.dumps() - if b"edef8ba9-79d6-4ace-a3c8-27dcd51d21ed" in pssh_data or "edef8ba979d64acea3c827dcd51d21ed" in pssh_data: - if self.scheme in ["SL2", "SL3"]: - scheme_to_use = "L1" if self.scheme == "SL2" else "L1" - else: - scheme_to_use = self.scheme - except Exception: - scheme_to_use = self.scheme + session = self._sessions[session_id] + pssh = session.get("pssh") - request_data = { - "init_data": self.pssh.dumps(), - "scheme": scheme_to_use, - "service": self.service_name, - } + if not pssh: + return False - if scheme_to_use in ["L1", "L2"] and hasattr(self, "service_certificate"): - request_data["service_certificate"] = self.service_certificate - elif scheme_to_use in ["L1", "L2"]: - pass + if self.vaults: + key_ids = [] + if hasattr(pssh, "key_ids"): + key_ids = pssh.key_ids + elif hasattr(pssh, "kids"): + key_ids = pssh.kids - request_data["get_cached_keys_if_exists"] = True + for kid in key_ids: + key, _ = self.vaults.get_key(kid) + if key and key.count("0") != len(key): + return True - if not hasattr(self, "session_schemes"): - self.session_schemes = {} - self.session_schemes[session_id] = scheme_to_use - res = self.session( - self.host + "/get-request", - request_data, - ) + if self.service_name: + try: + key_ids = [] + if hasattr(pssh, "key_ids"): + key_ids = [kid.hex for kid in pssh.key_ids] + elif hasattr(pssh, "kids"): + key_ids = [kid.hex for kid in pssh.kids] - if res.get("message_type") == "cached-keys": - if session_id not in self.keys: - self.keys[session_id] = [] - session_keys = self.keys[session_id] - - cached_keys_for_vault = {} - - for cached_key in res.get("cached_keys", []): - kid_str = cached_key["kid"] - try: - kid_uuid = UUID(kid_str) - except ValueError: - try: - kid_uuid = UUID(bytes=bytes.fromhex(kid_str)) - except ValueError: - kid_uuid = Key.kid_to_uuid(kid_str) - - session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"]))) - cached_keys_for_vault[kid_uuid] = cached_key["key"] - - if self.vaults and cached_keys_for_vault: - try: - self.vaults.add_keys(cached_keys_for_vault) - except Exception: - pass - - if self.service_name == "NF" or "netflix" in self.service_name.lower(): - request_data_no_cache = request_data.copy() - request_data_no_cache["get_cached_keys_if_exists"] = False - - res_challenge = self.session( - self.host + "/get-request", - request_data_no_cache, - ) - - if res_challenge.get("challenge"): - self.license_request = res_challenge["challenge"] - self.api_session_ids[session_id] = res_challenge.get("session_id") - return base64.b64decode(self.license_request) - - self.license_request = "" - self.api_session_ids[session_id] = None - self._has_cached_keys = True - return b"" - - self.license_request = res["challenge"] - self.api_session_ids[session_id] = res["session_id"] - self._has_cached_keys = False - - return base64.b64decode(self.license_request) - - def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None: - """Parse license response and extract decryption keys. - - Args: - session_id: Session identifier - license_message: License response from DRM server - """ - session_id_api = self.api_session_ids[session_id] - if session_id not in self.keys: - self.keys[session_id] = [] - session_keys = self.keys[session_id] - - if session_id_api is None and session_keys: - return - - if isinstance(license_message, dict) and "keys" in license_message: - session_keys.extend( - [ - Key(kid=Key.kid_to_uuid(x["kid"]), type_=x.get("type", "CONTENT"), key=bytes.fromhex(x["key"])) - for x in license_message["keys"] - ] - ) - - else: - if isinstance(license_message, bytes): - license_response_b64 = base64.b64encode(license_message).decode() - elif isinstance(license_message, str): - license_response_b64 = license_message - else: - license_response_b64 = str(license_message) - scheme_for_session = getattr(self, "session_schemes", {}).get(session_id, self.scheme) - - res = self.session( - self.host + "/decrypt-response", - { - "session_id": session_id_api, - "init_data": self.pssh.dumps(), - "license_request": self.license_request, - "license_response": license_response_b64, - "scheme": scheme_for_session, - }, - ) - - if scheme_for_session in ["SL2", "SL3"]: - if "keys" in res and res["keys"]: - keys_data = res["keys"] - if isinstance(keys_data, str): - original_keys = keys_data.replace("\n", " ") - keys_separated = original_keys.split("--key ") - for k in keys_separated: - if ":" in k: - key_parts = k.strip().split(":") - if len(key_parts) == 2: - try: - kid_hex, key_hex = key_parts - session_keys.append( - Key( - kid=UUID(bytes=bytes.fromhex(kid_hex)), - type_="CONTENT", - key=bytes.fromhex(key_hex), - ) - ) - except (ValueError, TypeError): - continue - elif isinstance(keys_data, list): - for key_info in keys_data: - if isinstance(key_info, dict) and "kid" in key_info and "key" in key_info: - session_keys.append( - Key( - kid=Key.kid_to_uuid(key_info["kid"]), - type_=key_info.get("type", "CONTENT"), - key=bytes.fromhex(key_info["key"]), - ) - ) - else: - original_keys = res["keys"].replace("\n", " ") - keys_separated = original_keys.split("--key ") - formatted_keys = [] - for k in keys_separated: - if ":" in k: - key = k.strip() - formatted_keys.append(key) - for keys in formatted_keys: - session_keys.append( - Key( - kid=UUID(bytes=bytes.fromhex(keys.split(":")[0])), - type_="CONTENT", - key=bytes.fromhex(keys.split(":")[1]), - ) + if key_ids: + response = self._http_session.post( + f"{self.host}/get-cached-keys", + json={"service": self.service_name, "kid": key_ids}, + timeout=30, ) - def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]: - """Get decryption keys for a session. + if response.status_code == 200: + data = response.json() + return ( + data.get("message") == "success" + and "cached_keys" in data + and isinstance(data["cached_keys"], list) + and len(data["cached_keys"]) > 0 + ) - Args: - session_id: Session identifier - type_: Key type filter (optional) - - Returns: - List of decryption keys for the session - """ - return self.keys[session_id] - - def has_cached_keys(self, session_id: bytes) -> bool: - """Check if this session has cached keys and doesn't need license request. - - Args: - session_id: Session identifier to check - - Returns: - True if session has cached keys, False otherwise - """ - return getattr(self, "_has_cached_keys", False) and session_id in self.keys and len(self.keys[session_id]) > 0 - - def session(self, url, data, retries=3): - """Make authenticated request to DecryptLabs API. - - Args: - url: API endpoint URL - data: Request payload data - retries: Number of retry attempts for failed requests - - Returns: - API response JSON data - - Raises: - ValueError: If API returns an error after retries - """ - res = self.req_session.post(url, json=data).json() - - if res.get("message") != "success": - if "License Response Decryption Process Failed at the very beginning" in res.get("Error", ""): - if retries > 0: - return self.session(url, data, retries=retries - 1) - else: - raise ValueError(f"CDM API returned an error: {res['Error']}") - else: - raise ValueError(f"CDM API returned an error: {res['Error']}") - - return res - - def use_cached_keys_as_fallback(self, session_id: bytes) -> bool: - """Use cached keys from DecryptLabs as a fallback when license server fails. - - Args: - session_id: Session identifier - - Returns: - True if cached keys were successfully applied, False otherwise - """ - if not hasattr(self, "_cached_keys_available") or not self._cached_keys_available: - return False - - if session_id not in self.keys: - self.keys[session_id] = [] - session_keys = self.keys[session_id] - - cached_keys_for_vault = {} - - for cached_key in self._cached_keys_available: - kid_str = cached_key["kid"] - try: - kid_uuid = UUID(kid_str) - except ValueError: - try: - kid_uuid = UUID(bytes=bytes.fromhex(kid_str)) - except ValueError: - kid_uuid = Key.kid_to_uuid(kid_str) - - session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"]))) - cached_keys_for_vault[kid_uuid] = cached_key["key"] - - if self.vaults and cached_keys_for_vault: - try: - self.vaults.add_keys(cached_keys_for_vault) except Exception: pass - self._has_cached_keys = True - return True + return False - -class DecryptLabsRemotePlayReadyCDM(PlayReadyCdm): - """PlayReady Remote CDM implementation for DecryptLabs KeyXtractor API. - - Provides PlayReady CDM functionality through DecryptLabs' remote API service, - supporting PlayReady DRM schemes like SL2 and SL3. - """ - - def __init__( - self, - security_level: int, - host: str, - secret: str, - device_name: str, - service_name: str, - vaults=None, - client_version: str = "10.0.16384.10011", - ): - """Initialize DecryptLabs Remote PlayReady CDM. - - Args: - security_level: DRM security level - host: DecryptLabs API host URL - secret: DecryptLabs API key for authentication - device_name: Device/scheme name (used as scheme identifier) - service_name: Service/platform name - vaults: Optional vaults reference for caching keys - client_version: PlayReady client version + def get_license_challenge( + self, session_id: bytes, pssh_or_wrm: Any, license_type: str = "STREAMING", privacy_mode: bool = True + ) -> bytes: """ - super().__init__( - security_level=security_level, - certificate_chain=None, - encryption_key=None, - signing_key=None, - client_version=client_version, - ) - - self.host = host - self.service_name = service_name - self.device_name = device_name - self.scheme = device_name - self.vaults = vaults - self.keys = {} - self.api_session_ids = {} - self.pssh_b64 = None - self.license_request = None - self._has_cached_keys = False - - self.req_session = requests.Session() - self.req_session.headers.update({"decrypt-labs-api-key": secret}) - - class MockCertificateChain: - """Mock certificate chain for DecryptLabs remote CDM compatibility.""" - - def __init__(self, scheme: str, security_level: int): - self.scheme = scheme - self.security_level = security_level - - def get_name(self) -> str: - """Return the certificate chain name for logging.""" - return f"DecryptLabs-{self.scheme}" - - def get_security_level(self) -> int: - """Return the security level.""" - return self.security_level - - self.certificate_chain = MockCertificateChain(self.scheme, security_level) - - def set_pssh_b64(self, pssh_b64: str): - """Set the original base64-encoded PSSH box for DecryptLabs API. - - Args: - pssh_b64: Base64-encoded PSSH box from the manifest - """ - self.pssh_b64 = pssh_b64 - - def open(self) -> bytes: - """Open a new CDM session. - - Returns: - Random session ID bytes for internal tracking - """ - return bytes.fromhex(secrets.token_hex(16)) - - def close(self, session_id: bytes) -> None: - """Close a CDM session. - - Args: - session_id: Session identifier to close - """ - pass - - def get_license_challenge(self, session_id: bytes, _) -> str: - """Generate license challenge using DecryptLabs API for PlayReady. + Generate a license challenge using Decrypt Labs API. Args: session_id: Session identifier + pssh_or_wrm: PSSH object or WRM header (for PlayReady compatibility) + license_type: Type of license (STREAMING, OFFLINE, AUTOMATIC) - for compatibility only + privacy_mode: Whether to use privacy mode - for compatibility only Returns: - License challenge as XML string - """ - if not (hasattr(self, "pssh_b64") and self.pssh_b64): - raise ValueError("DecryptLabs CDM requires original PSSH box data. Call set_pssh_b64() first.") + License challenge as bytes - init_data = self.pssh_b64 + Raises: + InvalidSession: If session ID is invalid + requests.RequestException: If API request fails + """ + _ = license_type, privacy_mode + + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") + + session = self._sessions[session_id] + + session["pssh"] = pssh_or_wrm + init_data = self._get_init_data_from_pssh(pssh_or_wrm) + + if self.has_cached_keys(session_id): + self._load_cached_keys(session_id) + return b"" + + request_data = {"scheme": self.device_name, "init_data": init_data} + + if self.device_name in ["L1", "L2", "SL2", "SL3"] and self.service_name: + request_data["service"] = self.service_name + + if session["service_certificate"]: + request_data["service_certificate"] = base64.b64encode(session["service_certificate"]).decode("utf-8") + + response = self._http_session.post(f"{self.host}/get-request", json=request_data, timeout=30) + + if response.status_code != 200: + raise requests.RequestException(f"API request failed: {response.status_code} {response.text}") + + data = response.json() + + if data.get("message") != "success": + error_msg = data.get("message", "Unknown error") + if "details" in data: + error_msg += f" - Details: {data['details']}" + if "error" in data: + error_msg += f" - Error: {data['error']}" + raise requests.RequestException(f"API error: {error_msg}") + + if data.get("message_type") == "cached-keys" or "cached_keys" in data: + cached_keys = data.get("cached_keys", []) + session["keys"] = self._parse_cached_keys(cached_keys) + return b"" + + challenge = base64.b64decode(data["challenge"]) + session["challenge"] = challenge + session["decrypt_labs_session_id"] = data["session_id"] + + return challenge + + def parse_license(self, session_id: bytes, license_message: Union[bytes, str]) -> None: + """ + Parse license response using Decrypt Labs API. + + Args: + session_id: Session identifier + license_message: License response from license server + + Raises: + ValueError: If session ID is invalid or no challenge available + requests.RequestException: If API request fails + """ + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") + + session = self._sessions[session_id] + + if session["keys"]: + return + + if not session.get("challenge") or not session.get("decrypt_labs_session_id"): + raise ValueError("No challenge available - call get_license_challenge first") + + if isinstance(license_message, str): + if self.is_playready and license_message.strip().startswith(" None: - """Parse license response and extract decryption keys. - - Args: - session_id: Session identifier - license_message: License response from DRM server (XML string) - """ - session_id_api = self.api_session_ids[session_id] - if session_id not in self.keys: - self.keys[session_id] = [] - session_keys = self.keys[session_id] - - if session_id_api is None and session_keys: - return - - try: - license_response_b64 = base64.b64encode(license_message.encode("utf-8")).decode("utf-8") - except Exception: - return - - if not (hasattr(self, "pssh_b64") and self.pssh_b64): - raise ValueError("DecryptLabs CDM requires original PSSH box data. Call set_pssh_b64() first.") - init_data = self.pssh_b64 - - res = self.session( - self.host + "/decrypt-response", - { - "session_id": session_id_api, - "init_data": init_data, - "license_request": self.license_request, - "license_response": license_response_b64, - "scheme": self.scheme, - }, - ) - - if "keys" in res and res["keys"]: - keys_data = res["keys"] - if isinstance(keys_data, str): - original_keys = keys_data.replace("\n", " ") - keys_separated = original_keys.split("--key ") - for k in keys_separated: - if ":" in k: - key_parts = k.strip().split(":") - if len(key_parts) == 2: - try: - kid_hex, key_hex = key_parts - session_keys.append( - Key( - kid=UUID(bytes=bytes.fromhex(kid_hex)), - type_="CONTENT", - key=bytes.fromhex(key_hex), - ) - ) - except (ValueError, TypeError): - continue - elif isinstance(keys_data, list): - for key_info in keys_data: - if isinstance(key_info, dict) and "kid" in key_info and "key" in key_info: - session_keys.append( - Key( - kid=Key.kid_to_uuid(key_info["kid"]), - type_=key_info.get("type", "CONTENT"), - key=bytes.fromhex(key_info["key"]), - ) - ) - - def get_keys(self, session_id: bytes) -> list: - """Get decryption keys for a session. + session["keys"] = self._parse_keys_response(data) + + if self.vaults and session["keys"]: + key_dict = {UUID(hex=key["kid"]): key["key"] for key in session["keys"] if key["type"] == "CONTENT"} + self.vaults.add_keys(key_dict) + + def get_keys(self, session_id: bytes, type_: Optional[str] = None) -> List[Key]: + """ + Get keys from the session. Args: session_id: Session identifier + type_: Optional key type filter (CONTENT, SIGNING, etc.) Returns: - List of decryption keys for the session - """ - return self.keys.get(session_id, []) - - def has_cached_keys(self, session_id: bytes) -> bool: - """Check if this session has cached keys and doesn't need license request. - - Args: - session_id: Session identifier to check - - Returns: - True if session has cached keys, False otherwise - """ - return getattr(self, "_has_cached_keys", False) and session_id in self.keys and len(self.keys[session_id]) > 0 - - def session(self, url, data, retries=3): - """Make authenticated request to DecryptLabs API. - - Args: - url: API endpoint URL - data: Request payload data - retries: Number of retry attempts for failed requests - - Returns: - API response JSON data + List of Key objects Raises: - ValueError: If API returns an error after retries + InvalidSession: If session ID is invalid """ - res = self.req_session.post(url, json=data).json() + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") - if res.get("message") != "success": - if "License Response Decryption Process Failed at the very beginning" in res.get("Error", ""): - if retries > 0: - return self.session(url, data, retries=retries - 1) - else: - raise ValueError(f"CDM API returned an error: {res['Error']}") - else: - raise ValueError(f"CDM API returned an error: {res['Error']}") + key_dicts = self._sessions[session_id]["keys"] + keys = [Key(kid=k["kid"], key=k["key"], type_=k["type"]) for k in key_dicts] - return res + if type_: + keys = [key for key in keys if key.type == type_] - def use_cached_keys_as_fallback(self, session_id: bytes) -> bool: - """Use cached keys from DecryptLabs as a fallback when license server fails. + return keys - Args: - session_id: Session identifier + def _load_cached_keys(self, session_id: bytes) -> None: + """Load cached keys from vaults and Decrypt Labs API.""" + session = self._sessions[session_id] + pssh = session["pssh"] + keys = [] - Returns: - True if cached keys were successfully applied, False otherwise - """ - if not hasattr(self, "_cached_keys_available") or not self._cached_keys_available: - return False + if self.vaults: + key_ids = [] + if hasattr(pssh, "key_ids"): + key_ids = pssh.key_ids + elif hasattr(pssh, "kids"): + key_ids = pssh.kids - if session_id not in self.keys: - self.keys[session_id] = [] - session_keys = self.keys[session_id] + for kid in key_ids: + key, _ = self.vaults.get_key(kid) + if key and key.count("0") != len(key): + keys.append({"kid": kid.hex, "key": key, "type": "CONTENT"}) - cached_keys_for_vault = {} - - for cached_key in self._cached_keys_available: - kid_str = cached_key["kid"] + if not keys and self.service_name: try: - kid_uuid = UUID(kid_str) - except ValueError: - try: - kid_uuid = UUID(bytes=bytes.fromhex(kid_str)) - except ValueError: - kid_uuid = Key.kid_to_uuid(kid_str) + key_ids = [] + if hasattr(pssh, "key_ids"): + key_ids = [kid.hex for kid in pssh.key_ids] + elif hasattr(pssh, "kids"): + key_ids = [kid.hex for kid in pssh.kids] - session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"]))) - cached_keys_for_vault[kid_uuid] = cached_key["key"] + if key_ids: + response = self._http_session.post( + f"{self.host}/get-cached-keys", + json={"service": self.service_name, "kid": key_ids}, + timeout=30, + ) + + if response.status_code == 200: + data = response.json() + if data.get("message") == "success" and "cached_keys" in data: + keys = self._parse_cached_keys(data["cached_keys"]) - if self.vaults and cached_keys_for_vault: - try: - self.vaults.add_keys(cached_keys_for_vault) except Exception: pass - self._has_cached_keys = True - return True + session["keys"] = keys + + def _parse_cached_keys(self, cached_keys_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Parse cached keys from API response. + + Args: + cached_keys_data: List of cached key objects from API + + Returns: + List of key dictionaries + """ + keys = [] + + try: + if cached_keys_data and isinstance(cached_keys_data, list): + for key_data in cached_keys_data: + if "kid" in key_data and "key" in key_data: + keys.append({"kid": key_data["kid"], "key": key_data["key"], "type": "CONTENT"}) + except Exception: + pass + return keys + + def _parse_keys_response(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: + """Parse keys from decrypt response.""" + keys = [] + + if "keys" in data and isinstance(data["keys"], str): + keys_string = data["keys"] + + for line in keys_string.split("\n"): + line = line.strip() + if line.startswith("--key "): + key_part = line[6:] + if ":" in key_part: + kid, key = key_part.split(":", 1) + keys.append({"kid": kid.strip(), "key": key.strip(), "type": "CONTENT"}) + elif "keys" in data and isinstance(data["keys"], list): + for key_data in data["keys"]: + keys.append( + {"kid": key_data.get("kid"), "key": key_data.get("key"), "type": key_data.get("type", "CONTENT")} + ) + + return keys + + +__all__ = ["DecryptLabsRemoteCDM"] diff --git a/unshackle/core/tracks/track.py b/unshackle/core/tracks/track.py index d77db62..d0c6f5a 100644 --- a/unshackle/core/tracks/track.py +++ b/unshackle/core/tracks/track.py @@ -420,6 +420,15 @@ class Track: for drm in self.drm: if isinstance(drm, PlayReady): return drm + elif hasattr(cdm, 'is_playready'): + if cdm.is_playready: + for drm in self.drm: + if isinstance(drm, PlayReady): + return drm + else: + for drm in self.drm: + if isinstance(drm, Widevine): + return drm return self.drm[0] diff --git a/uv.lock b/uv.lock index e8ff5a1..537fc74 100644 --- a/uv.lock +++ b/uv.lock @@ -1499,7 +1499,7 @@ wheels = [ [[package]] name = "unshackle" -version = "1.4.3" +version = "1.4.4" source = { editable = "." } dependencies = [ { name = "appdirs" },