2 Commits

5 changed files with 683 additions and 62 deletions

View File

@@ -299,21 +299,6 @@ class dl:
if getattr(config, "decryption_map", None): if getattr(config, "decryption_map", None):
config.decryption = config.decryption_map.get(self.service, config.decryption) config.decryption = config.decryption_map.get(self.service, config.decryption)
with console.status("Loading DRM CDM...", spinner="dots"):
try:
self.cdm = self.get_cdm(self.service, self.profile)
except ValueError as e:
self.log.error(f"Failed to load CDM, {e}")
sys.exit(1)
if self.cdm:
if hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]:
self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})")
else:
self.log.info(
f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})"
)
with console.status("Loading Key Vaults...", spinner="dots"): with console.status("Loading Key Vaults...", spinner="dots"):
self.vaults = Vaults(self.service) self.vaults = Vaults(self.service)
total_vaults = len(config.key_vaults) total_vaults = len(config.key_vaults)
@@ -352,6 +337,21 @@ class dl:
else: else:
self.log.debug("No vaults are currently active") self.log.debug("No vaults are currently active")
with console.status("Loading DRM CDM...", spinner="dots"):
try:
self.cdm = self.get_cdm(self.service, self.profile)
except ValueError as e:
self.log.error(f"Failed to load CDM, {e}")
sys.exit(1)
if self.cdm:
if hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]:
self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})")
else:
self.log.info(
f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})"
)
self.proxy_providers = [] self.proxy_providers = []
if no_proxy: if no_proxy:
ctx.params["proxy"] = None ctx.params["proxy"] = None
@@ -1442,8 +1442,8 @@ class dl:
return Credential(*credentials) return Credential(*credentials)
return Credential.loads(credentials) # type: ignore return Credential.loads(credentials) # type: ignore
@staticmethod
def get_cdm( def get_cdm(
self,
service: str, service: str,
profile: Optional[str] = None, profile: Optional[str] = None,
drm: Optional[str] = None, drm: Optional[str] = None,
@@ -1478,9 +1478,26 @@ class dl:
cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None) cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None)
if cdm_api: if cdm_api:
is_decrypt_lab = True if cdm_api["type"] == "decrypt_labs" else False is_decrypt_lab = True if cdm_api["type"] == "decrypt_labs" else False
del cdm_api["name"] if is_decrypt_lab:
del cdm_api["type"] device_type = cdm_api.get("device_type")
return DecryptLabsRemoteCDM(service_name=service, **cdm_api) if is_decrypt_lab else RemoteCdm(**cdm_api) del cdm_api["name"]
del cdm_api["type"]
# Use the appropriate DecryptLabs CDM class based on device type
if device_type == "PLAYREADY" or cdm_api.get("device_name") in ["SL2", "SL3"]:
from unshackle.core.cdm.decrypt_labs_remote_cdm import DecryptLabsRemotePlayReadyCDM
# Remove unused parameters for PlayReady CDM
cdm_params = cdm_api.copy()
cdm_params.pop("device_type", None)
cdm_params.pop("system_id", None)
return DecryptLabsRemotePlayReadyCDM(service_name=service, vaults=self.vaults, **cdm_params)
else:
return DecryptLabsRemoteCDM(service_name=service, vaults=self.vaults, **cdm_api)
else:
del cdm_api["name"]
del cdm_api["type"]
return RemoteCdm(**cdm_api)
prd_path = config.directories.prds / f"{cdm_name}.prd" prd_path = config.directories.prds / f"{cdm_name}.prd"
if not prd_path.is_file(): if not prd_path.is_file():

View File

@@ -4,13 +4,21 @@ from typing import Optional, Type, Union
from uuid import UUID from uuid import UUID
import requests import requests
from pyplayready.cdm import Cdm as PlayReadyCdm
from pywidevine import PSSH, Device, DeviceTypes, Key, RemoteCdm from pywidevine import PSSH, Device, DeviceTypes, Key, RemoteCdm
from pywidevine.license_protocol_pb2 import SignedDrmCertificate, SignedMessage from pywidevine.license_protocol_pb2 import SignedDrmCertificate, SignedMessage
# Copyright 2024 by DevYukine. # Copyright 2024 by DevYukine.
# Copyright 2025 by sp4rk.y.
class DecryptLabsRemoteCDM(RemoteCdm): class DecryptLabsRemoteCDM(RemoteCdm):
"""Remote CDM implementation for DecryptLabs KeyXtractor API.
Provides CDM functionality through DecryptLabs' remote API service,
supporting multiple DRM schemes including Widevine and PlayReady.
"""
def __init__( def __init__(
self, self,
device_type: Union[DeviceTypes, str], device_type: Union[DeviceTypes, str],
@@ -20,7 +28,20 @@ class DecryptLabsRemoteCDM(RemoteCdm):
secret: str, secret: str,
device_name: str, device_name: str,
service_name: str, service_name: str,
vaults=None,
): ):
"""Initialize DecryptLabs Remote CDM.
Args:
device_type: Type of device to emulate
system_id: System identifier
security_level: DRM security level
host: DecryptLabs API host URL
secret: DecryptLabs API key for authentication
device_name: Device/scheme name (used as scheme identifier)
service_name: Service/platform name
vaults: Optional vaults reference for caching keys
"""
self.response_counter = 0 self.response_counter = 0
self.pssh = None self.pssh = None
self.api_session_ids = {} self.api_session_ids = {}
@@ -28,7 +49,28 @@ class DecryptLabsRemoteCDM(RemoteCdm):
self.service_name = service_name self.service_name = service_name
self.device_name = device_name self.device_name = device_name
self.keys = {} self.keys = {}
self.scheme = "L1" if device_name == "L1" else "widevine" self.scheme = device_name
self._has_cached_keys = False
self.vaults = vaults
self.security_level = security_level
self.host = host
class MockCertificateChain:
"""Mock certificate chain for DecryptLabs remote CDM compatibility."""
def __init__(self, scheme: str, security_level: int):
self.scheme = scheme
self.security_level = security_level
def get_name(self) -> str:
"""Return the certificate chain name for logging."""
return f"DecryptLabs-{self.scheme}"
def get_security_level(self) -> int:
"""Return the security level."""
return self.security_level
self.certificate_chain = MockCertificateChain(self.scheme, security_level)
try: try:
super().__init__(device_type, system_id, security_level, host, secret, device_name) super().__init__(device_type, system_id, security_level, host, secret, device_name)
except Exception: except Exception:
@@ -41,22 +83,36 @@ class DecryptLabsRemoteCDM(RemoteCdm):
raise NotImplementedError("You cannot load a DecryptLabsRemoteCDM from a local Device file.") raise NotImplementedError("You cannot load a DecryptLabsRemoteCDM from a local Device file.")
def open(self) -> bytes: def open(self) -> bytes:
# We stub this method to return a random session ID for now, later we save the api session id and resolve by our random generated one. """Open a new CDM session.
Returns:
Random session ID bytes for internal tracking
"""
return bytes.fromhex(secrets.token_hex(16)) return bytes.fromhex(secrets.token_hex(16))
def close(self, session_id: bytes) -> None: def close(self, session_id: bytes) -> None:
# We stub this method to do nothing. """Close a CDM session.
Args:
session_id: Session identifier to close
"""
pass pass
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str: def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
"""Set service certificate for L1/L2 schemes.
Args:
session_id: Session identifier
certificate: Service certificate (bytes or base64 string)
Returns:
Success status string
"""
if isinstance(certificate, bytes): if isinstance(certificate, bytes):
certificate = base64.b64encode(certificate).decode() certificate = base64.b64encode(certificate).decode()
# certificate needs to be base64 to be sent off to the API. self.service_certificate = certificate
# it needs to intentionally be kept as base64 encoded SignedMessage. self.privacy_mode = True
self.req_session.signed_device_certificate = certificate
self.req_session.privacy_mode = True
return "success" return "success"
@@ -66,63 +122,114 @@ class DecryptLabsRemoteCDM(RemoteCdm):
def get_license_challenge( def get_license_challenge(
self, session_id: bytes, pssh: PSSH, license_type: str = "STREAMING", privacy_mode: bool = True self, session_id: bytes, pssh: PSSH, license_type: str = "STREAMING", privacy_mode: bool = True
) -> bytes: ) -> bytes:
"""Generate license challenge using DecryptLabs API.
Args:
session_id: Session identifier
pssh: PSSH initialization data
license_type: Type of license (default: "STREAMING")
privacy_mode: Enable privacy mode
Returns:
License challenge bytes or empty bytes if using cached keys
"""
self.pssh = pssh self.pssh = pssh
scheme_to_use = self.scheme
try:
pssh_data = pssh.dumps()
if b"edef8ba9-79d6-4ace-a3c8-27dcd51d21ed" in pssh_data or "edef8ba979d64acea3c827dcd51d21ed" in pssh_data:
if self.scheme in ["SL2", "SL3"]:
scheme_to_use = "L1" if self.scheme == "SL2" else "L1"
else:
scheme_to_use = self.scheme
except Exception:
scheme_to_use = self.scheme
request_data = { request_data = {
"init_data": self.pssh.dumps(), "init_data": self.pssh.dumps(),
"service_certificate": self.req_session.signed_device_certificate, "scheme": scheme_to_use,
"scheme": self.scheme,
"service": self.service_name, "service": self.service_name,
} }
# Add required parameter for L1 scheme
if self.scheme == "L1": if scheme_to_use in ["L1", "L2"] and hasattr(self, "service_certificate"):
request_data["get_cached_keys_if_exists"] = True request_data["service_certificate"] = self.service_certificate
elif scheme_to_use in ["L1", "L2"]:
pass
request_data["get_cached_keys_if_exists"] = True
if not hasattr(self, "session_schemes"):
self.session_schemes = {}
self.session_schemes[session_id] = scheme_to_use
res = self.session( res = self.session(
self.host + "/get-request", self.host + "/get-request",
request_data, request_data,
) )
# Check if we got cached keys instead of a challenge
if res.get("message_type") == "cached-keys": if res.get("message_type") == "cached-keys":
# Store cached keys directly
if session_id not in self.keys: if session_id not in self.keys:
self.keys[session_id] = [] self.keys[session_id] = []
session_keys = self.keys[session_id] session_keys = self.keys[session_id]
cached_keys_for_vault = {}
for cached_key in res.get("cached_keys", []): for cached_key in res.get("cached_keys", []):
# Handle KID format - could be hex string or UUID string
kid_str = cached_key["kid"] kid_str = cached_key["kid"]
try: try:
# Try as UUID string first
kid_uuid = UUID(kid_str) kid_uuid = UUID(kid_str)
except ValueError: except ValueError:
try: try:
# Try as hex string (like the existing code)
kid_uuid = UUID(bytes=bytes.fromhex(kid_str)) kid_uuid = UUID(bytes=bytes.fromhex(kid_str))
except ValueError: except ValueError:
# Fallback: use Key.kid_to_uuid
kid_uuid = Key.kid_to_uuid(kid_str) kid_uuid = Key.kid_to_uuid(kid_str)
session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"]))) session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"])))
cached_keys_for_vault[kid_uuid] = cached_key["key"]
if self.vaults and cached_keys_for_vault:
try:
self.vaults.add_keys(cached_keys_for_vault)
except Exception:
pass
if self.service_name == "NF" or "netflix" in self.service_name.lower():
request_data_no_cache = request_data.copy()
request_data_no_cache["get_cached_keys_if_exists"] = False
res_challenge = self.session(
self.host + "/get-request",
request_data_no_cache,
)
if res_challenge.get("challenge"):
self.license_request = res_challenge["challenge"]
self.api_session_ids[session_id] = res_challenge.get("session_id")
return base64.b64decode(self.license_request)
# Return empty challenge since we already have the keys
self.license_request = "" self.license_request = ""
self.api_session_ids[session_id] = None self.api_session_ids[session_id] = None
self._has_cached_keys = True
return b"" return b""
# Normal challenge response
self.license_request = res["challenge"] self.license_request = res["challenge"]
self.api_session_ids[session_id] = res["session_id"] self.api_session_ids[session_id] = res["session_id"]
self._has_cached_keys = False
return base64.b64decode(self.license_request) return base64.b64decode(self.license_request)
def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None: def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None:
"""Parse license response and extract decryption keys.
Args:
session_id: Session identifier
license_message: License response from DRM server
"""
session_id_api = self.api_session_ids[session_id] session_id_api = self.api_session_ids[session_id]
if session_id not in self.keys: if session_id not in self.keys:
self.keys[session_id] = [] self.keys[session_id] = []
session_keys = self.keys[session_id] session_keys = self.keys[session_id]
# If we already have cached keys and no session_id_api, skip processing
if session_id_api is None and session_keys: if session_id_api is None and session_keys:
return return
@@ -135,13 +242,14 @@ class DecryptLabsRemoteCDM(RemoteCdm):
) )
else: else:
# Ensure license_message is base64 encoded
if isinstance(license_message, bytes): if isinstance(license_message, bytes):
license_response_b64 = base64.b64encode(license_message).decode() license_response_b64 = base64.b64encode(license_message).decode()
elif isinstance(license_message, str): elif isinstance(license_message, str):
license_response_b64 = license_message license_response_b64 = license_message
else: else:
license_response_b64 = str(license_message) license_response_b64 = str(license_message)
scheme_for_session = getattr(self, "session_schemes", {}).get(session_id, self.scheme)
res = self.session( res = self.session(
self.host + "/decrypt-response", self.host + "/decrypt-response",
{ {
@@ -149,32 +257,95 @@ class DecryptLabsRemoteCDM(RemoteCdm):
"init_data": self.pssh.dumps(), "init_data": self.pssh.dumps(),
"license_request": self.license_request, "license_request": self.license_request,
"license_response": license_response_b64, "license_response": license_response_b64,
"scheme": self.scheme, "scheme": scheme_for_session,
}, },
) )
original_keys = res["keys"].replace("\n", " ") if scheme_for_session in ["SL2", "SL3"]:
keys_separated = original_keys.split("--key ") if "keys" in res and res["keys"]:
formatted_keys = [] keys_data = res["keys"]
for k in keys_separated: if isinstance(keys_data, str):
if ":" in k: original_keys = keys_data.replace("\n", " ")
key = k.strip() keys_separated = original_keys.split("--key ")
formatted_keys.append(key) for k in keys_separated:
for keys in formatted_keys: if ":" in k:
session_keys.append( key_parts = k.strip().split(":")
( if len(key_parts) == 2:
try:
kid_hex, key_hex = key_parts
session_keys.append(
Key(
kid=UUID(bytes=bytes.fromhex(kid_hex)),
type_="CONTENT",
key=bytes.fromhex(key_hex),
)
)
except (ValueError, TypeError):
continue
elif isinstance(keys_data, list):
for key_info in keys_data:
if isinstance(key_info, dict) and "kid" in key_info and "key" in key_info:
session_keys.append(
Key(
kid=Key.kid_to_uuid(key_info["kid"]),
type_=key_info.get("type", "CONTENT"),
key=bytes.fromhex(key_info["key"]),
)
)
else:
original_keys = res["keys"].replace("\n", " ")
keys_separated = original_keys.split("--key ")
formatted_keys = []
for k in keys_separated:
if ":" in k:
key = k.strip()
formatted_keys.append(key)
for keys in formatted_keys:
session_keys.append(
Key( Key(
kid=UUID(bytes=bytes.fromhex(keys.split(":")[0])), kid=UUID(bytes=bytes.fromhex(keys.split(":")[0])),
type_="CONTENT", type_="CONTENT",
key=bytes.fromhex(keys.split(":")[1]), key=bytes.fromhex(keys.split(":")[1]),
) )
) )
)
def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]: def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]:
"""Get decryption keys for a session.
Args:
session_id: Session identifier
type_: Key type filter (optional)
Returns:
List of decryption keys for the session
"""
return self.keys[session_id] return self.keys[session_id]
def has_cached_keys(self, session_id: bytes) -> bool:
"""Check if this session has cached keys and doesn't need license request.
Args:
session_id: Session identifier to check
Returns:
True if session has cached keys, False otherwise
"""
return getattr(self, "_has_cached_keys", False) and session_id in self.keys and len(self.keys[session_id]) > 0
def session(self, url, data, retries=3): def session(self, url, data, retries=3):
"""Make authenticated request to DecryptLabs API.
Args:
url: API endpoint URL
data: Request payload data
retries: Number of retry attempts for failed requests
Returns:
API response JSON data
Raises:
ValueError: If API returns an error after retries
"""
res = self.req_session.post(url, json=data).json() res = self.req_session.post(url, json=data).json()
if res.get("message") != "success": if res.get("message") != "success":
@@ -187,3 +358,330 @@ class DecryptLabsRemoteCDM(RemoteCdm):
raise ValueError(f"CDM API returned an error: {res['Error']}") raise ValueError(f"CDM API returned an error: {res['Error']}")
return res return res
def use_cached_keys_as_fallback(self, session_id: bytes) -> bool:
"""Use cached keys from DecryptLabs as a fallback when license server fails.
Args:
session_id: Session identifier
Returns:
True if cached keys were successfully applied, False otherwise
"""
if not hasattr(self, "_cached_keys_available") or not self._cached_keys_available:
return False
if session_id not in self.keys:
self.keys[session_id] = []
session_keys = self.keys[session_id]
cached_keys_for_vault = {}
for cached_key in self._cached_keys_available:
kid_str = cached_key["kid"]
try:
kid_uuid = UUID(kid_str)
except ValueError:
try:
kid_uuid = UUID(bytes=bytes.fromhex(kid_str))
except ValueError:
kid_uuid = Key.kid_to_uuid(kid_str)
session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"])))
cached_keys_for_vault[kid_uuid] = cached_key["key"]
if self.vaults and cached_keys_for_vault:
try:
self.vaults.add_keys(cached_keys_for_vault)
except Exception:
pass
self._has_cached_keys = True
return True
class DecryptLabsRemotePlayReadyCDM(PlayReadyCdm):
"""PlayReady Remote CDM implementation for DecryptLabs KeyXtractor API.
Provides PlayReady CDM functionality through DecryptLabs' remote API service,
supporting PlayReady DRM schemes like SL2 and SL3.
"""
def __init__(
self,
security_level: int,
host: str,
secret: str,
device_name: str,
service_name: str,
vaults=None,
client_version: str = "10.0.16384.10011",
):
"""Initialize DecryptLabs Remote PlayReady CDM.
Args:
security_level: DRM security level
host: DecryptLabs API host URL
secret: DecryptLabs API key for authentication
device_name: Device/scheme name (used as scheme identifier)
service_name: Service/platform name
vaults: Optional vaults reference for caching keys
client_version: PlayReady client version
"""
super().__init__(
security_level=security_level,
certificate_chain=None,
encryption_key=None,
signing_key=None,
client_version=client_version,
)
self.host = host
self.service_name = service_name
self.device_name = device_name
self.scheme = device_name
self.vaults = vaults
self.keys = {}
self.api_session_ids = {}
self.pssh_b64 = None
self.license_request = None
self._has_cached_keys = False
self.req_session = requests.Session()
self.req_session.headers.update({"decrypt-labs-api-key": secret})
class MockCertificateChain:
"""Mock certificate chain for DecryptLabs remote CDM compatibility."""
def __init__(self, scheme: str, security_level: int):
self.scheme = scheme
self.security_level = security_level
def get_name(self) -> str:
"""Return the certificate chain name for logging."""
return f"DecryptLabs-{self.scheme}"
def get_security_level(self) -> int:
"""Return the security level."""
return self.security_level
self.certificate_chain = MockCertificateChain(self.scheme, security_level)
def set_pssh_b64(self, pssh_b64: str):
"""Set the original base64-encoded PSSH box for DecryptLabs API.
Args:
pssh_b64: Base64-encoded PSSH box from the manifest
"""
self.pssh_b64 = pssh_b64
def open(self) -> bytes:
"""Open a new CDM session.
Returns:
Random session ID bytes for internal tracking
"""
return bytes.fromhex(secrets.token_hex(16))
def close(self, session_id: bytes) -> None:
"""Close a CDM session.
Args:
session_id: Session identifier to close
"""
pass
def get_license_challenge(self, session_id: bytes, _) -> str:
"""Generate license challenge using DecryptLabs API for PlayReady.
Args:
session_id: Session identifier
Returns:
License challenge as XML string
"""
if not (hasattr(self, "pssh_b64") and self.pssh_b64):
raise ValueError("DecryptLabs CDM requires original PSSH box data. Call set_pssh_b64() first.")
init_data = self.pssh_b64
request_data = {
"init_data": init_data,
"scheme": self.scheme,
"service": self.service_name,
"get_cached_keys_if_exists": False,
}
res = self.session(
self.host + "/get-request",
request_data,
)
if res.get("message_type") == "cached-keys":
self._cached_keys_available = res.get("cached_keys", [])
else:
self._cached_keys_available = None
self.license_request = res["challenge"]
self.api_session_ids[session_id] = res["session_id"]
self._has_cached_keys = False
try:
return base64.b64decode(self.license_request).decode()
except Exception:
return self.license_request
def parse_license(self, session_id: bytes, license_message: str) -> None:
"""Parse license response and extract decryption keys.
Args:
session_id: Session identifier
license_message: License response from DRM server (XML string)
"""
session_id_api = self.api_session_ids[session_id]
if session_id not in self.keys:
self.keys[session_id] = []
session_keys = self.keys[session_id]
if session_id_api is None and session_keys:
return
try:
license_response_b64 = base64.b64encode(license_message.encode("utf-8")).decode("utf-8")
except Exception:
return
if not (hasattr(self, "pssh_b64") and self.pssh_b64):
raise ValueError("DecryptLabs CDM requires original PSSH box data. Call set_pssh_b64() first.")
init_data = self.pssh_b64
res = self.session(
self.host + "/decrypt-response",
{
"session_id": session_id_api,
"init_data": init_data,
"license_request": self.license_request,
"license_response": license_response_b64,
"scheme": self.scheme,
},
)
if "keys" in res and res["keys"]:
keys_data = res["keys"]
if isinstance(keys_data, str):
original_keys = keys_data.replace("\n", " ")
keys_separated = original_keys.split("--key ")
for k in keys_separated:
if ":" in k:
key_parts = k.strip().split(":")
if len(key_parts) == 2:
try:
kid_hex, key_hex = key_parts
session_keys.append(
Key(
kid=UUID(bytes=bytes.fromhex(kid_hex)),
type_="CONTENT",
key=bytes.fromhex(key_hex),
)
)
except (ValueError, TypeError):
continue
elif isinstance(keys_data, list):
for key_info in keys_data:
if isinstance(key_info, dict) and "kid" in key_info and "key" in key_info:
session_keys.append(
Key(
kid=Key.kid_to_uuid(key_info["kid"]),
type_=key_info.get("type", "CONTENT"),
key=bytes.fromhex(key_info["key"]),
)
)
def get_keys(self, session_id: bytes) -> list:
"""Get decryption keys for a session.
Args:
session_id: Session identifier
Returns:
List of decryption keys for the session
"""
return self.keys.get(session_id, [])
def has_cached_keys(self, session_id: bytes) -> bool:
"""Check if this session has cached keys and doesn't need license request.
Args:
session_id: Session identifier to check
Returns:
True if session has cached keys, False otherwise
"""
return getattr(self, "_has_cached_keys", False) and session_id in self.keys and len(self.keys[session_id]) > 0
def session(self, url, data, retries=3):
"""Make authenticated request to DecryptLabs API.
Args:
url: API endpoint URL
data: Request payload data
retries: Number of retry attempts for failed requests
Returns:
API response JSON data
Raises:
ValueError: If API returns an error after retries
"""
res = self.req_session.post(url, json=data).json()
if res.get("message") != "success":
if "License Response Decryption Process Failed at the very beginning" in res.get("Error", ""):
if retries > 0:
return self.session(url, data, retries=retries - 1)
else:
raise ValueError(f"CDM API returned an error: {res['Error']}")
else:
raise ValueError(f"CDM API returned an error: {res['Error']}")
return res
def use_cached_keys_as_fallback(self, session_id: bytes) -> bool:
"""Use cached keys from DecryptLabs as a fallback when license server fails.
Args:
session_id: Session identifier
Returns:
True if cached keys were successfully applied, False otherwise
"""
if not hasattr(self, "_cached_keys_available") or not self._cached_keys_available:
return False
if session_id not in self.keys:
self.keys[session_id] = []
session_keys = self.keys[session_id]
cached_keys_for_vault = {}
for cached_key in self._cached_keys_available:
kid_str = cached_key["kid"]
try:
kid_uuid = UUID(kid_str)
except ValueError:
try:
kid_uuid = UUID(bytes=bytes.fromhex(kid_str))
except ValueError:
kid_uuid = Key.kid_to_uuid(kid_str)
session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"])))
cached_keys_for_vault[kid_uuid] = cached_key["key"]
if self.vaults and cached_keys_for_vault:
try:
self.vaults.add_keys(cached_keys_for_vault)
except Exception:
pass
self._has_cached_keys = True
return True

View File

@@ -224,14 +224,59 @@ class PlayReady:
def kids(self) -> list[UUID]: def kids(self) -> list[UUID]:
return self._kids return self._kids
def _extract_keys_from_cdm(self, cdm: PlayReadyCdm, session_id: bytes) -> dict:
"""Extract keys from CDM session with cross-library compatibility.
Args:
cdm: CDM instance
session_id: Session identifier
Returns:
Dictionary mapping KID UUIDs to hex keys
"""
keys = {}
for key in cdm.get_keys(session_id):
if hasattr(key, "key_id"):
kid = key.key_id
elif hasattr(key, "kid"):
kid = key.kid
else:
continue
if hasattr(key, "key") and hasattr(key.key, "hex"):
key_hex = key.key.hex()
elif hasattr(key, "key") and isinstance(key.key, bytes):
key_hex = key.key.hex()
elif hasattr(key, "key") and isinstance(key.key, str):
key_hex = key.key
else:
continue
keys[kid] = key_hex
return keys
def get_content_keys(self, cdm: PlayReadyCdm, certificate: Callable, licence: Callable) -> None: def get_content_keys(self, cdm: PlayReadyCdm, certificate: Callable, licence: Callable) -> None:
for kid in self.kids: for kid in self.kids:
if kid in self.content_keys: if kid in self.content_keys:
continue continue
session_id = cdm.open() session_id = cdm.open()
try: try:
if hasattr(cdm, "set_pssh_b64") and self.pssh_b64:
cdm.set_pssh_b64(self.pssh_b64)
challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0]) challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0])
license_res = licence(challenge=challenge)
try:
license_res = licence(challenge=challenge)
except Exception:
if hasattr(cdm, "use_cached_keys_as_fallback"):
if cdm.use_cached_keys_as_fallback(session_id):
keys = self._extract_keys_from_cdm(cdm, session_id)
self.content_keys.update(keys)
continue
raise
if isinstance(license_res, bytes): if isinstance(license_res, bytes):
license_str = license_res.decode(errors="ignore") license_str = license_res.decode(errors="ignore")
@@ -245,7 +290,7 @@ class PlayReady:
pass pass
cdm.parse_license(session_id, license_str) cdm.parse_license(session_id, license_str)
keys = {key.key_id: key.key.hex() for key in cdm.get_keys(session_id)} keys = self._extract_keys_from_cdm(cdm, session_id)
self.content_keys.update(keys) self.content_keys.update(keys)
finally: finally:
cdm.close(session_id) cdm.close(session_id)

View File

@@ -185,7 +185,12 @@ class Widevine:
if cert and hasattr(cdm, "set_service_certificate"): if cert and hasattr(cdm, "set_service_certificate"):
cdm.set_service_certificate(session_id, cert) cdm.set_service_certificate(session_id, cert)
cdm.parse_license(session_id, licence(challenge=cdm.get_license_challenge(session_id, self.pssh))) challenge = cdm.get_license_challenge(session_id, self.pssh)
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
pass
else:
cdm.parse_license(session_id, licence(challenge=challenge))
self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")} self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")}
if not self.content_keys: if not self.content_keys:
@@ -213,10 +218,15 @@ class Widevine:
if cert and hasattr(cdm, "set_service_certificate"): if cert and hasattr(cdm, "set_service_certificate"):
cdm.set_service_certificate(session_id, cert) cdm.set_service_certificate(session_id, cert)
cdm.parse_license( challenge = cdm.get_license_challenge(session_id, self.pssh)
session_id,
licence(session_id=session_id, challenge=cdm.get_license_challenge(session_id, self.pssh)), if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
) pass
else:
cdm.parse_license(
session_id,
licence(session_id=session_id, challenge=challenge),
)
self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")} self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")}
if not self.content_keys: if not self.content_keys:

View File

@@ -105,6 +105,50 @@ remote_cdm:
host: https://domain-2.com/api host: https://domain-2.com/api
secret: secret_key secret: secret_key
- name: "decrypt_labs_chrome"
type: "decrypt_labs" # Required to identify as DecryptLabs CDM
device_name: "ChromeCDM" # Scheme identifier - must match exactly
device_type: CHROME
system_id: 4464 # Doesn't matter
security_level: 3
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here" # Replace with your API key
- name: "decrypt_labs_l1"
type: "decrypt_labs"
device_name: "L1" # Scheme identifier - must match exactly
device_type: ANDROID
system_id: 4464
security_level: 1
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
- name: "decrypt_labs_l2"
type: "decrypt_labs"
device_name: "L2" # Scheme identifier - must match exactly
device_type: ANDROID
system_id: 4464
security_level: 2
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
- name: "decrypt_labs_playready_sl2"
type: "decrypt_labs"
device_name: "SL2" # Scheme identifier - must match exactly
device_type: PLAYREADY
system_id: 0
security_level: 2000
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
- name: "decrypt_labs_playready_sl3"
type: "decrypt_labs"
device_name: "SL3" # Scheme identifier - must match exactly
device_type: PLAYREADY
system_id: 0
security_level: 3000
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
# Key Vaults store your obtained Content Encryption Keys (CEKs) # Key Vaults store your obtained Content Encryption Keys (CEKs)
# Use 'no_push: true' to prevent a vault from receiving pushed keys # Use 'no_push: true' to prevent a vault from receiving pushed keys
# while still allowing it to provide keys when requested # while still allowing it to provide keys when requested
@@ -171,7 +215,7 @@ chapter_fallback_name: "Chapter {j:02}"
# Case-Insensitive dictionary of headers for all Services # Case-Insensitive dictionary of headers for all Services
headers: headers:
Accept-Language: "en-US,en;q=0.8" Accept-Language: "en-US,en;q=0.8"
User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36" User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
# Override default filenames used across unshackle # Override default filenames used across unshackle
filenames: filenames:
@@ -213,6 +257,13 @@ services:
# Global service config # Global service config
api_key: "service_api_key" api_key: "service_api_key"
# Service certificate for Widevine L1/L2 (base64 encoded)
# This certificate is automatically used when L1/L2 schemes are selected
# Services obtain this from their DRM provider or license server
certificate: |
CAUSwwUKvQIIAxIQ5US6QAvBDzfTtjb4tU/7QxiH8c+TBSKOAjCCAQoCggEBAObzvlu2hZRsapAPx4Aa4GUZj4/GjxgXUtBH4THSkM40x63wQeyVxlEEo
# ... (full base64 certificate here)
# Profile-specific device configurations # Profile-specific device configurations
profiles: profiles:
john_sd: john_sd: