mirror of
https://github.com/unshackle-dl/unshackle.git
synced 2025-10-23 15:11:08 +00:00
Compare commits
2 Commits
b4efdf3f2c
...
3ef43afeed
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ef43afeed | ||
|
|
26851cbe7c |
@@ -299,21 +299,6 @@ class dl:
|
||||
if getattr(config, "decryption_map", None):
|
||||
config.decryption = config.decryption_map.get(self.service, config.decryption)
|
||||
|
||||
with console.status("Loading DRM CDM...", spinner="dots"):
|
||||
try:
|
||||
self.cdm = self.get_cdm(self.service, self.profile)
|
||||
except ValueError as e:
|
||||
self.log.error(f"Failed to load CDM, {e}")
|
||||
sys.exit(1)
|
||||
|
||||
if self.cdm:
|
||||
if hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]:
|
||||
self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})")
|
||||
else:
|
||||
self.log.info(
|
||||
f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})"
|
||||
)
|
||||
|
||||
with console.status("Loading Key Vaults...", spinner="dots"):
|
||||
self.vaults = Vaults(self.service)
|
||||
total_vaults = len(config.key_vaults)
|
||||
@@ -352,6 +337,21 @@ class dl:
|
||||
else:
|
||||
self.log.debug("No vaults are currently active")
|
||||
|
||||
with console.status("Loading DRM CDM...", spinner="dots"):
|
||||
try:
|
||||
self.cdm = self.get_cdm(self.service, self.profile)
|
||||
except ValueError as e:
|
||||
self.log.error(f"Failed to load CDM, {e}")
|
||||
sys.exit(1)
|
||||
|
||||
if self.cdm:
|
||||
if hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]:
|
||||
self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})")
|
||||
else:
|
||||
self.log.info(
|
||||
f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})"
|
||||
)
|
||||
|
||||
self.proxy_providers = []
|
||||
if no_proxy:
|
||||
ctx.params["proxy"] = None
|
||||
@@ -1442,8 +1442,8 @@ class dl:
|
||||
return Credential(*credentials)
|
||||
return Credential.loads(credentials) # type: ignore
|
||||
|
||||
@staticmethod
|
||||
def get_cdm(
|
||||
self,
|
||||
service: str,
|
||||
profile: Optional[str] = None,
|
||||
drm: Optional[str] = None,
|
||||
@@ -1478,9 +1478,26 @@ class dl:
|
||||
cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None)
|
||||
if cdm_api:
|
||||
is_decrypt_lab = True if cdm_api["type"] == "decrypt_labs" else False
|
||||
del cdm_api["name"]
|
||||
del cdm_api["type"]
|
||||
return DecryptLabsRemoteCDM(service_name=service, **cdm_api) if is_decrypt_lab else RemoteCdm(**cdm_api)
|
||||
if is_decrypt_lab:
|
||||
device_type = cdm_api.get("device_type")
|
||||
del cdm_api["name"]
|
||||
del cdm_api["type"]
|
||||
|
||||
# Use the appropriate DecryptLabs CDM class based on device type
|
||||
if device_type == "PLAYREADY" or cdm_api.get("device_name") in ["SL2", "SL3"]:
|
||||
from unshackle.core.cdm.decrypt_labs_remote_cdm import DecryptLabsRemotePlayReadyCDM
|
||||
|
||||
# Remove unused parameters for PlayReady CDM
|
||||
cdm_params = cdm_api.copy()
|
||||
cdm_params.pop("device_type", None)
|
||||
cdm_params.pop("system_id", None)
|
||||
return DecryptLabsRemotePlayReadyCDM(service_name=service, vaults=self.vaults, **cdm_params)
|
||||
else:
|
||||
return DecryptLabsRemoteCDM(service_name=service, vaults=self.vaults, **cdm_api)
|
||||
else:
|
||||
del cdm_api["name"]
|
||||
del cdm_api["type"]
|
||||
return RemoteCdm(**cdm_api)
|
||||
|
||||
prd_path = config.directories.prds / f"{cdm_name}.prd"
|
||||
if not prd_path.is_file():
|
||||
|
||||
@@ -4,13 +4,21 @@ from typing import Optional, Type, Union
|
||||
from uuid import UUID
|
||||
|
||||
import requests
|
||||
from pyplayready.cdm import Cdm as PlayReadyCdm
|
||||
from pywidevine import PSSH, Device, DeviceTypes, Key, RemoteCdm
|
||||
from pywidevine.license_protocol_pb2 import SignedDrmCertificate, SignedMessage
|
||||
|
||||
# Copyright 2024 by DevYukine.
|
||||
# Copyright 2025 by sp4rk.y.
|
||||
|
||||
|
||||
class DecryptLabsRemoteCDM(RemoteCdm):
|
||||
"""Remote CDM implementation for DecryptLabs KeyXtractor API.
|
||||
|
||||
Provides CDM functionality through DecryptLabs' remote API service,
|
||||
supporting multiple DRM schemes including Widevine and PlayReady.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
device_type: Union[DeviceTypes, str],
|
||||
@@ -20,7 +28,20 @@ class DecryptLabsRemoteCDM(RemoteCdm):
|
||||
secret: str,
|
||||
device_name: str,
|
||||
service_name: str,
|
||||
vaults=None,
|
||||
):
|
||||
"""Initialize DecryptLabs Remote CDM.
|
||||
|
||||
Args:
|
||||
device_type: Type of device to emulate
|
||||
system_id: System identifier
|
||||
security_level: DRM security level
|
||||
host: DecryptLabs API host URL
|
||||
secret: DecryptLabs API key for authentication
|
||||
device_name: Device/scheme name (used as scheme identifier)
|
||||
service_name: Service/platform name
|
||||
vaults: Optional vaults reference for caching keys
|
||||
"""
|
||||
self.response_counter = 0
|
||||
self.pssh = None
|
||||
self.api_session_ids = {}
|
||||
@@ -28,7 +49,28 @@ class DecryptLabsRemoteCDM(RemoteCdm):
|
||||
self.service_name = service_name
|
||||
self.device_name = device_name
|
||||
self.keys = {}
|
||||
self.scheme = "L1" if device_name == "L1" else "widevine"
|
||||
self.scheme = device_name
|
||||
self._has_cached_keys = False
|
||||
self.vaults = vaults
|
||||
self.security_level = security_level
|
||||
self.host = host
|
||||
|
||||
class MockCertificateChain:
|
||||
"""Mock certificate chain for DecryptLabs remote CDM compatibility."""
|
||||
|
||||
def __init__(self, scheme: str, security_level: int):
|
||||
self.scheme = scheme
|
||||
self.security_level = security_level
|
||||
|
||||
def get_name(self) -> str:
|
||||
"""Return the certificate chain name for logging."""
|
||||
return f"DecryptLabs-{self.scheme}"
|
||||
|
||||
def get_security_level(self) -> int:
|
||||
"""Return the security level."""
|
||||
return self.security_level
|
||||
|
||||
self.certificate_chain = MockCertificateChain(self.scheme, security_level)
|
||||
try:
|
||||
super().__init__(device_type, system_id, security_level, host, secret, device_name)
|
||||
except Exception:
|
||||
@@ -41,22 +83,36 @@ class DecryptLabsRemoteCDM(RemoteCdm):
|
||||
raise NotImplementedError("You cannot load a DecryptLabsRemoteCDM from a local Device file.")
|
||||
|
||||
def open(self) -> bytes:
|
||||
# We stub this method to return a random session ID for now, later we save the api session id and resolve by our random generated one.
|
||||
"""Open a new CDM session.
|
||||
|
||||
Returns:
|
||||
Random session ID bytes for internal tracking
|
||||
"""
|
||||
return bytes.fromhex(secrets.token_hex(16))
|
||||
|
||||
def close(self, session_id: bytes) -> None:
|
||||
# We stub this method to do nothing.
|
||||
"""Close a CDM session.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier to close
|
||||
"""
|
||||
pass
|
||||
|
||||
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
|
||||
"""Set service certificate for L1/L2 schemes.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier
|
||||
certificate: Service certificate (bytes or base64 string)
|
||||
|
||||
Returns:
|
||||
Success status string
|
||||
"""
|
||||
if isinstance(certificate, bytes):
|
||||
certificate = base64.b64encode(certificate).decode()
|
||||
|
||||
# certificate needs to be base64 to be sent off to the API.
|
||||
# it needs to intentionally be kept as base64 encoded SignedMessage.
|
||||
|
||||
self.req_session.signed_device_certificate = certificate
|
||||
self.req_session.privacy_mode = True
|
||||
self.service_certificate = certificate
|
||||
self.privacy_mode = True
|
||||
|
||||
return "success"
|
||||
|
||||
@@ -66,63 +122,114 @@ class DecryptLabsRemoteCDM(RemoteCdm):
|
||||
def get_license_challenge(
|
||||
self, session_id: bytes, pssh: PSSH, license_type: str = "STREAMING", privacy_mode: bool = True
|
||||
) -> bytes:
|
||||
"""Generate license challenge using DecryptLabs API.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier
|
||||
pssh: PSSH initialization data
|
||||
license_type: Type of license (default: "STREAMING")
|
||||
privacy_mode: Enable privacy mode
|
||||
|
||||
Returns:
|
||||
License challenge bytes or empty bytes if using cached keys
|
||||
"""
|
||||
self.pssh = pssh
|
||||
|
||||
scheme_to_use = self.scheme
|
||||
try:
|
||||
pssh_data = pssh.dumps()
|
||||
if b"edef8ba9-79d6-4ace-a3c8-27dcd51d21ed" in pssh_data or "edef8ba979d64acea3c827dcd51d21ed" in pssh_data:
|
||||
if self.scheme in ["SL2", "SL3"]:
|
||||
scheme_to_use = "L1" if self.scheme == "SL2" else "L1"
|
||||
else:
|
||||
scheme_to_use = self.scheme
|
||||
except Exception:
|
||||
scheme_to_use = self.scheme
|
||||
|
||||
request_data = {
|
||||
"init_data": self.pssh.dumps(),
|
||||
"service_certificate": self.req_session.signed_device_certificate,
|
||||
"scheme": self.scheme,
|
||||
"scheme": scheme_to_use,
|
||||
"service": self.service_name,
|
||||
}
|
||||
# Add required parameter for L1 scheme
|
||||
if self.scheme == "L1":
|
||||
request_data["get_cached_keys_if_exists"] = True
|
||||
|
||||
if scheme_to_use in ["L1", "L2"] and hasattr(self, "service_certificate"):
|
||||
request_data["service_certificate"] = self.service_certificate
|
||||
elif scheme_to_use in ["L1", "L2"]:
|
||||
pass
|
||||
|
||||
request_data["get_cached_keys_if_exists"] = True
|
||||
|
||||
if not hasattr(self, "session_schemes"):
|
||||
self.session_schemes = {}
|
||||
self.session_schemes[session_id] = scheme_to_use
|
||||
res = self.session(
|
||||
self.host + "/get-request",
|
||||
request_data,
|
||||
)
|
||||
|
||||
# Check if we got cached keys instead of a challenge
|
||||
if res.get("message_type") == "cached-keys":
|
||||
# Store cached keys directly
|
||||
if session_id not in self.keys:
|
||||
self.keys[session_id] = []
|
||||
session_keys = self.keys[session_id]
|
||||
|
||||
cached_keys_for_vault = {}
|
||||
|
||||
for cached_key in res.get("cached_keys", []):
|
||||
# Handle KID format - could be hex string or UUID string
|
||||
kid_str = cached_key["kid"]
|
||||
try:
|
||||
# Try as UUID string first
|
||||
kid_uuid = UUID(kid_str)
|
||||
except ValueError:
|
||||
try:
|
||||
# Try as hex string (like the existing code)
|
||||
kid_uuid = UUID(bytes=bytes.fromhex(kid_str))
|
||||
except ValueError:
|
||||
# Fallback: use Key.kid_to_uuid
|
||||
kid_uuid = Key.kid_to_uuid(kid_str)
|
||||
|
||||
session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"])))
|
||||
cached_keys_for_vault[kid_uuid] = cached_key["key"]
|
||||
|
||||
if self.vaults and cached_keys_for_vault:
|
||||
try:
|
||||
self.vaults.add_keys(cached_keys_for_vault)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if self.service_name == "NF" or "netflix" in self.service_name.lower():
|
||||
request_data_no_cache = request_data.copy()
|
||||
request_data_no_cache["get_cached_keys_if_exists"] = False
|
||||
|
||||
res_challenge = self.session(
|
||||
self.host + "/get-request",
|
||||
request_data_no_cache,
|
||||
)
|
||||
|
||||
if res_challenge.get("challenge"):
|
||||
self.license_request = res_challenge["challenge"]
|
||||
self.api_session_ids[session_id] = res_challenge.get("session_id")
|
||||
return base64.b64decode(self.license_request)
|
||||
|
||||
# Return empty challenge since we already have the keys
|
||||
self.license_request = ""
|
||||
self.api_session_ids[session_id] = None
|
||||
self._has_cached_keys = True
|
||||
return b""
|
||||
|
||||
# Normal challenge response
|
||||
self.license_request = res["challenge"]
|
||||
self.api_session_ids[session_id] = res["session_id"]
|
||||
self._has_cached_keys = False
|
||||
|
||||
return base64.b64decode(self.license_request)
|
||||
|
||||
def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None:
|
||||
"""Parse license response and extract decryption keys.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier
|
||||
license_message: License response from DRM server
|
||||
"""
|
||||
session_id_api = self.api_session_ids[session_id]
|
||||
if session_id not in self.keys:
|
||||
self.keys[session_id] = []
|
||||
session_keys = self.keys[session_id]
|
||||
|
||||
# If we already have cached keys and no session_id_api, skip processing
|
||||
if session_id_api is None and session_keys:
|
||||
return
|
||||
|
||||
@@ -135,13 +242,14 @@ class DecryptLabsRemoteCDM(RemoteCdm):
|
||||
)
|
||||
|
||||
else:
|
||||
# Ensure license_message is base64 encoded
|
||||
if isinstance(license_message, bytes):
|
||||
license_response_b64 = base64.b64encode(license_message).decode()
|
||||
elif isinstance(license_message, str):
|
||||
license_response_b64 = license_message
|
||||
else:
|
||||
license_response_b64 = str(license_message)
|
||||
scheme_for_session = getattr(self, "session_schemes", {}).get(session_id, self.scheme)
|
||||
|
||||
res = self.session(
|
||||
self.host + "/decrypt-response",
|
||||
{
|
||||
@@ -149,32 +257,95 @@ class DecryptLabsRemoteCDM(RemoteCdm):
|
||||
"init_data": self.pssh.dumps(),
|
||||
"license_request": self.license_request,
|
||||
"license_response": license_response_b64,
|
||||
"scheme": self.scheme,
|
||||
"scheme": scheme_for_session,
|
||||
},
|
||||
)
|
||||
|
||||
original_keys = res["keys"].replace("\n", " ")
|
||||
keys_separated = original_keys.split("--key ")
|
||||
formatted_keys = []
|
||||
for k in keys_separated:
|
||||
if ":" in k:
|
||||
key = k.strip()
|
||||
formatted_keys.append(key)
|
||||
for keys in formatted_keys:
|
||||
session_keys.append(
|
||||
(
|
||||
if scheme_for_session in ["SL2", "SL3"]:
|
||||
if "keys" in res and res["keys"]:
|
||||
keys_data = res["keys"]
|
||||
if isinstance(keys_data, str):
|
||||
original_keys = keys_data.replace("\n", " ")
|
||||
keys_separated = original_keys.split("--key ")
|
||||
for k in keys_separated:
|
||||
if ":" in k:
|
||||
key_parts = k.strip().split(":")
|
||||
if len(key_parts) == 2:
|
||||
try:
|
||||
kid_hex, key_hex = key_parts
|
||||
session_keys.append(
|
||||
Key(
|
||||
kid=UUID(bytes=bytes.fromhex(kid_hex)),
|
||||
type_="CONTENT",
|
||||
key=bytes.fromhex(key_hex),
|
||||
)
|
||||
)
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
elif isinstance(keys_data, list):
|
||||
for key_info in keys_data:
|
||||
if isinstance(key_info, dict) and "kid" in key_info and "key" in key_info:
|
||||
session_keys.append(
|
||||
Key(
|
||||
kid=Key.kid_to_uuid(key_info["kid"]),
|
||||
type_=key_info.get("type", "CONTENT"),
|
||||
key=bytes.fromhex(key_info["key"]),
|
||||
)
|
||||
)
|
||||
else:
|
||||
original_keys = res["keys"].replace("\n", " ")
|
||||
keys_separated = original_keys.split("--key ")
|
||||
formatted_keys = []
|
||||
for k in keys_separated:
|
||||
if ":" in k:
|
||||
key = k.strip()
|
||||
formatted_keys.append(key)
|
||||
for keys in formatted_keys:
|
||||
session_keys.append(
|
||||
Key(
|
||||
kid=UUID(bytes=bytes.fromhex(keys.split(":")[0])),
|
||||
type_="CONTENT",
|
||||
key=bytes.fromhex(keys.split(":")[1]),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]:
|
||||
"""Get decryption keys for a session.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier
|
||||
type_: Key type filter (optional)
|
||||
|
||||
Returns:
|
||||
List of decryption keys for the session
|
||||
"""
|
||||
return self.keys[session_id]
|
||||
|
||||
def has_cached_keys(self, session_id: bytes) -> bool:
|
||||
"""Check if this session has cached keys and doesn't need license request.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier to check
|
||||
|
||||
Returns:
|
||||
True if session has cached keys, False otherwise
|
||||
"""
|
||||
return getattr(self, "_has_cached_keys", False) and session_id in self.keys and len(self.keys[session_id]) > 0
|
||||
|
||||
def session(self, url, data, retries=3):
|
||||
"""Make authenticated request to DecryptLabs API.
|
||||
|
||||
Args:
|
||||
url: API endpoint URL
|
||||
data: Request payload data
|
||||
retries: Number of retry attempts for failed requests
|
||||
|
||||
Returns:
|
||||
API response JSON data
|
||||
|
||||
Raises:
|
||||
ValueError: If API returns an error after retries
|
||||
"""
|
||||
res = self.req_session.post(url, json=data).json()
|
||||
|
||||
if res.get("message") != "success":
|
||||
@@ -187,3 +358,330 @@ class DecryptLabsRemoteCDM(RemoteCdm):
|
||||
raise ValueError(f"CDM API returned an error: {res['Error']}")
|
||||
|
||||
return res
|
||||
|
||||
def use_cached_keys_as_fallback(self, session_id: bytes) -> bool:
|
||||
"""Use cached keys from DecryptLabs as a fallback when license server fails.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier
|
||||
|
||||
Returns:
|
||||
True if cached keys were successfully applied, False otherwise
|
||||
"""
|
||||
if not hasattr(self, "_cached_keys_available") or not self._cached_keys_available:
|
||||
return False
|
||||
|
||||
if session_id not in self.keys:
|
||||
self.keys[session_id] = []
|
||||
session_keys = self.keys[session_id]
|
||||
|
||||
cached_keys_for_vault = {}
|
||||
|
||||
for cached_key in self._cached_keys_available:
|
||||
kid_str = cached_key["kid"]
|
||||
try:
|
||||
kid_uuid = UUID(kid_str)
|
||||
except ValueError:
|
||||
try:
|
||||
kid_uuid = UUID(bytes=bytes.fromhex(kid_str))
|
||||
except ValueError:
|
||||
kid_uuid = Key.kid_to_uuid(kid_str)
|
||||
|
||||
session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"])))
|
||||
cached_keys_for_vault[kid_uuid] = cached_key["key"]
|
||||
|
||||
if self.vaults and cached_keys_for_vault:
|
||||
try:
|
||||
self.vaults.add_keys(cached_keys_for_vault)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self._has_cached_keys = True
|
||||
return True
|
||||
|
||||
|
||||
class DecryptLabsRemotePlayReadyCDM(PlayReadyCdm):
|
||||
"""PlayReady Remote CDM implementation for DecryptLabs KeyXtractor API.
|
||||
|
||||
Provides PlayReady CDM functionality through DecryptLabs' remote API service,
|
||||
supporting PlayReady DRM schemes like SL2 and SL3.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
security_level: int,
|
||||
host: str,
|
||||
secret: str,
|
||||
device_name: str,
|
||||
service_name: str,
|
||||
vaults=None,
|
||||
client_version: str = "10.0.16384.10011",
|
||||
):
|
||||
"""Initialize DecryptLabs Remote PlayReady CDM.
|
||||
|
||||
Args:
|
||||
security_level: DRM security level
|
||||
host: DecryptLabs API host URL
|
||||
secret: DecryptLabs API key for authentication
|
||||
device_name: Device/scheme name (used as scheme identifier)
|
||||
service_name: Service/platform name
|
||||
vaults: Optional vaults reference for caching keys
|
||||
client_version: PlayReady client version
|
||||
"""
|
||||
super().__init__(
|
||||
security_level=security_level,
|
||||
certificate_chain=None,
|
||||
encryption_key=None,
|
||||
signing_key=None,
|
||||
client_version=client_version,
|
||||
)
|
||||
|
||||
self.host = host
|
||||
self.service_name = service_name
|
||||
self.device_name = device_name
|
||||
self.scheme = device_name
|
||||
self.vaults = vaults
|
||||
self.keys = {}
|
||||
self.api_session_ids = {}
|
||||
self.pssh_b64 = None
|
||||
self.license_request = None
|
||||
self._has_cached_keys = False
|
||||
|
||||
self.req_session = requests.Session()
|
||||
self.req_session.headers.update({"decrypt-labs-api-key": secret})
|
||||
|
||||
class MockCertificateChain:
|
||||
"""Mock certificate chain for DecryptLabs remote CDM compatibility."""
|
||||
|
||||
def __init__(self, scheme: str, security_level: int):
|
||||
self.scheme = scheme
|
||||
self.security_level = security_level
|
||||
|
||||
def get_name(self) -> str:
|
||||
"""Return the certificate chain name for logging."""
|
||||
return f"DecryptLabs-{self.scheme}"
|
||||
|
||||
def get_security_level(self) -> int:
|
||||
"""Return the security level."""
|
||||
return self.security_level
|
||||
|
||||
self.certificate_chain = MockCertificateChain(self.scheme, security_level)
|
||||
|
||||
def set_pssh_b64(self, pssh_b64: str):
|
||||
"""Set the original base64-encoded PSSH box for DecryptLabs API.
|
||||
|
||||
Args:
|
||||
pssh_b64: Base64-encoded PSSH box from the manifest
|
||||
"""
|
||||
self.pssh_b64 = pssh_b64
|
||||
|
||||
def open(self) -> bytes:
|
||||
"""Open a new CDM session.
|
||||
|
||||
Returns:
|
||||
Random session ID bytes for internal tracking
|
||||
"""
|
||||
return bytes.fromhex(secrets.token_hex(16))
|
||||
|
||||
def close(self, session_id: bytes) -> None:
|
||||
"""Close a CDM session.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier to close
|
||||
"""
|
||||
pass
|
||||
|
||||
def get_license_challenge(self, session_id: bytes, _) -> str:
|
||||
"""Generate license challenge using DecryptLabs API for PlayReady.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier
|
||||
|
||||
Returns:
|
||||
License challenge as XML string
|
||||
"""
|
||||
if not (hasattr(self, "pssh_b64") and self.pssh_b64):
|
||||
raise ValueError("DecryptLabs CDM requires original PSSH box data. Call set_pssh_b64() first.")
|
||||
|
||||
init_data = self.pssh_b64
|
||||
|
||||
request_data = {
|
||||
"init_data": init_data,
|
||||
"scheme": self.scheme,
|
||||
"service": self.service_name,
|
||||
"get_cached_keys_if_exists": False,
|
||||
}
|
||||
|
||||
res = self.session(
|
||||
self.host + "/get-request",
|
||||
request_data,
|
||||
)
|
||||
|
||||
if res.get("message_type") == "cached-keys":
|
||||
self._cached_keys_available = res.get("cached_keys", [])
|
||||
else:
|
||||
self._cached_keys_available = None
|
||||
|
||||
self.license_request = res["challenge"]
|
||||
self.api_session_ids[session_id] = res["session_id"]
|
||||
self._has_cached_keys = False
|
||||
|
||||
try:
|
||||
return base64.b64decode(self.license_request).decode()
|
||||
except Exception:
|
||||
return self.license_request
|
||||
|
||||
def parse_license(self, session_id: bytes, license_message: str) -> None:
|
||||
"""Parse license response and extract decryption keys.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier
|
||||
license_message: License response from DRM server (XML string)
|
||||
"""
|
||||
session_id_api = self.api_session_ids[session_id]
|
||||
if session_id not in self.keys:
|
||||
self.keys[session_id] = []
|
||||
session_keys = self.keys[session_id]
|
||||
|
||||
if session_id_api is None and session_keys:
|
||||
return
|
||||
|
||||
try:
|
||||
license_response_b64 = base64.b64encode(license_message.encode("utf-8")).decode("utf-8")
|
||||
except Exception:
|
||||
return
|
||||
|
||||
if not (hasattr(self, "pssh_b64") and self.pssh_b64):
|
||||
raise ValueError("DecryptLabs CDM requires original PSSH box data. Call set_pssh_b64() first.")
|
||||
init_data = self.pssh_b64
|
||||
|
||||
res = self.session(
|
||||
self.host + "/decrypt-response",
|
||||
{
|
||||
"session_id": session_id_api,
|
||||
"init_data": init_data,
|
||||
"license_request": self.license_request,
|
||||
"license_response": license_response_b64,
|
||||
"scheme": self.scheme,
|
||||
},
|
||||
)
|
||||
|
||||
if "keys" in res and res["keys"]:
|
||||
keys_data = res["keys"]
|
||||
if isinstance(keys_data, str):
|
||||
original_keys = keys_data.replace("\n", " ")
|
||||
keys_separated = original_keys.split("--key ")
|
||||
for k in keys_separated:
|
||||
if ":" in k:
|
||||
key_parts = k.strip().split(":")
|
||||
if len(key_parts) == 2:
|
||||
try:
|
||||
kid_hex, key_hex = key_parts
|
||||
session_keys.append(
|
||||
Key(
|
||||
kid=UUID(bytes=bytes.fromhex(kid_hex)),
|
||||
type_="CONTENT",
|
||||
key=bytes.fromhex(key_hex),
|
||||
)
|
||||
)
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
elif isinstance(keys_data, list):
|
||||
for key_info in keys_data:
|
||||
if isinstance(key_info, dict) and "kid" in key_info and "key" in key_info:
|
||||
session_keys.append(
|
||||
Key(
|
||||
kid=Key.kid_to_uuid(key_info["kid"]),
|
||||
type_=key_info.get("type", "CONTENT"),
|
||||
key=bytes.fromhex(key_info["key"]),
|
||||
)
|
||||
)
|
||||
|
||||
def get_keys(self, session_id: bytes) -> list:
|
||||
"""Get decryption keys for a session.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier
|
||||
|
||||
Returns:
|
||||
List of decryption keys for the session
|
||||
"""
|
||||
return self.keys.get(session_id, [])
|
||||
|
||||
def has_cached_keys(self, session_id: bytes) -> bool:
|
||||
"""Check if this session has cached keys and doesn't need license request.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier to check
|
||||
|
||||
Returns:
|
||||
True if session has cached keys, False otherwise
|
||||
"""
|
||||
return getattr(self, "_has_cached_keys", False) and session_id in self.keys and len(self.keys[session_id]) > 0
|
||||
|
||||
def session(self, url, data, retries=3):
|
||||
"""Make authenticated request to DecryptLabs API.
|
||||
|
||||
Args:
|
||||
url: API endpoint URL
|
||||
data: Request payload data
|
||||
retries: Number of retry attempts for failed requests
|
||||
|
||||
Returns:
|
||||
API response JSON data
|
||||
|
||||
Raises:
|
||||
ValueError: If API returns an error after retries
|
||||
"""
|
||||
res = self.req_session.post(url, json=data).json()
|
||||
|
||||
if res.get("message") != "success":
|
||||
if "License Response Decryption Process Failed at the very beginning" in res.get("Error", ""):
|
||||
if retries > 0:
|
||||
return self.session(url, data, retries=retries - 1)
|
||||
else:
|
||||
raise ValueError(f"CDM API returned an error: {res['Error']}")
|
||||
else:
|
||||
raise ValueError(f"CDM API returned an error: {res['Error']}")
|
||||
|
||||
return res
|
||||
|
||||
def use_cached_keys_as_fallback(self, session_id: bytes) -> bool:
|
||||
"""Use cached keys from DecryptLabs as a fallback when license server fails.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier
|
||||
|
||||
Returns:
|
||||
True if cached keys were successfully applied, False otherwise
|
||||
"""
|
||||
if not hasattr(self, "_cached_keys_available") or not self._cached_keys_available:
|
||||
return False
|
||||
|
||||
if session_id not in self.keys:
|
||||
self.keys[session_id] = []
|
||||
session_keys = self.keys[session_id]
|
||||
|
||||
cached_keys_for_vault = {}
|
||||
|
||||
for cached_key in self._cached_keys_available:
|
||||
kid_str = cached_key["kid"]
|
||||
try:
|
||||
kid_uuid = UUID(kid_str)
|
||||
except ValueError:
|
||||
try:
|
||||
kid_uuid = UUID(bytes=bytes.fromhex(kid_str))
|
||||
except ValueError:
|
||||
kid_uuid = Key.kid_to_uuid(kid_str)
|
||||
|
||||
session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"])))
|
||||
cached_keys_for_vault[kid_uuid] = cached_key["key"]
|
||||
|
||||
if self.vaults and cached_keys_for_vault:
|
||||
try:
|
||||
self.vaults.add_keys(cached_keys_for_vault)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self._has_cached_keys = True
|
||||
return True
|
||||
|
||||
@@ -224,14 +224,59 @@ class PlayReady:
|
||||
def kids(self) -> list[UUID]:
|
||||
return self._kids
|
||||
|
||||
def _extract_keys_from_cdm(self, cdm: PlayReadyCdm, session_id: bytes) -> dict:
|
||||
"""Extract keys from CDM session with cross-library compatibility.
|
||||
|
||||
Args:
|
||||
cdm: CDM instance
|
||||
session_id: Session identifier
|
||||
|
||||
Returns:
|
||||
Dictionary mapping KID UUIDs to hex keys
|
||||
"""
|
||||
keys = {}
|
||||
for key in cdm.get_keys(session_id):
|
||||
if hasattr(key, "key_id"):
|
||||
kid = key.key_id
|
||||
elif hasattr(key, "kid"):
|
||||
kid = key.kid
|
||||
else:
|
||||
continue
|
||||
|
||||
if hasattr(key, "key") and hasattr(key.key, "hex"):
|
||||
key_hex = key.key.hex()
|
||||
elif hasattr(key, "key") and isinstance(key.key, bytes):
|
||||
key_hex = key.key.hex()
|
||||
elif hasattr(key, "key") and isinstance(key.key, str):
|
||||
key_hex = key.key
|
||||
else:
|
||||
continue
|
||||
|
||||
keys[kid] = key_hex
|
||||
return keys
|
||||
|
||||
def get_content_keys(self, cdm: PlayReadyCdm, certificate: Callable, licence: Callable) -> None:
|
||||
for kid in self.kids:
|
||||
if kid in self.content_keys:
|
||||
continue
|
||||
|
||||
session_id = cdm.open()
|
||||
try:
|
||||
if hasattr(cdm, "set_pssh_b64") and self.pssh_b64:
|
||||
cdm.set_pssh_b64(self.pssh_b64)
|
||||
|
||||
challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0])
|
||||
license_res = licence(challenge=challenge)
|
||||
|
||||
try:
|
||||
license_res = licence(challenge=challenge)
|
||||
except Exception:
|
||||
if hasattr(cdm, "use_cached_keys_as_fallback"):
|
||||
if cdm.use_cached_keys_as_fallback(session_id):
|
||||
keys = self._extract_keys_from_cdm(cdm, session_id)
|
||||
self.content_keys.update(keys)
|
||||
continue
|
||||
|
||||
raise
|
||||
|
||||
if isinstance(license_res, bytes):
|
||||
license_str = license_res.decode(errors="ignore")
|
||||
@@ -245,7 +290,7 @@ class PlayReady:
|
||||
pass
|
||||
|
||||
cdm.parse_license(session_id, license_str)
|
||||
keys = {key.key_id: key.key.hex() for key in cdm.get_keys(session_id)}
|
||||
keys = self._extract_keys_from_cdm(cdm, session_id)
|
||||
self.content_keys.update(keys)
|
||||
finally:
|
||||
cdm.close(session_id)
|
||||
|
||||
@@ -185,7 +185,12 @@ class Widevine:
|
||||
if cert and hasattr(cdm, "set_service_certificate"):
|
||||
cdm.set_service_certificate(session_id, cert)
|
||||
|
||||
cdm.parse_license(session_id, licence(challenge=cdm.get_license_challenge(session_id, self.pssh)))
|
||||
challenge = cdm.get_license_challenge(session_id, self.pssh)
|
||||
|
||||
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
|
||||
pass
|
||||
else:
|
||||
cdm.parse_license(session_id, licence(challenge=challenge))
|
||||
|
||||
self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")}
|
||||
if not self.content_keys:
|
||||
@@ -213,10 +218,15 @@ class Widevine:
|
||||
if cert and hasattr(cdm, "set_service_certificate"):
|
||||
cdm.set_service_certificate(session_id, cert)
|
||||
|
||||
cdm.parse_license(
|
||||
session_id,
|
||||
licence(session_id=session_id, challenge=cdm.get_license_challenge(session_id, self.pssh)),
|
||||
)
|
||||
challenge = cdm.get_license_challenge(session_id, self.pssh)
|
||||
|
||||
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
|
||||
pass
|
||||
else:
|
||||
cdm.parse_license(
|
||||
session_id,
|
||||
licence(session_id=session_id, challenge=challenge),
|
||||
)
|
||||
|
||||
self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")}
|
||||
if not self.content_keys:
|
||||
|
||||
@@ -105,6 +105,50 @@ remote_cdm:
|
||||
host: https://domain-2.com/api
|
||||
secret: secret_key
|
||||
|
||||
- name: "decrypt_labs_chrome"
|
||||
type: "decrypt_labs" # Required to identify as DecryptLabs CDM
|
||||
device_name: "ChromeCDM" # Scheme identifier - must match exactly
|
||||
device_type: CHROME
|
||||
system_id: 4464 # Doesn't matter
|
||||
security_level: 3
|
||||
host: "https://keyxtractor.decryptlabs.com"
|
||||
secret: "your_decrypt_labs_api_key_here" # Replace with your API key
|
||||
- name: "decrypt_labs_l1"
|
||||
type: "decrypt_labs"
|
||||
device_name: "L1" # Scheme identifier - must match exactly
|
||||
device_type: ANDROID
|
||||
system_id: 4464
|
||||
security_level: 1
|
||||
host: "https://keyxtractor.decryptlabs.com"
|
||||
secret: "your_decrypt_labs_api_key_here"
|
||||
|
||||
- name: "decrypt_labs_l2"
|
||||
type: "decrypt_labs"
|
||||
device_name: "L2" # Scheme identifier - must match exactly
|
||||
device_type: ANDROID
|
||||
system_id: 4464
|
||||
security_level: 2
|
||||
host: "https://keyxtractor.decryptlabs.com"
|
||||
secret: "your_decrypt_labs_api_key_here"
|
||||
|
||||
- name: "decrypt_labs_playready_sl2"
|
||||
type: "decrypt_labs"
|
||||
device_name: "SL2" # Scheme identifier - must match exactly
|
||||
device_type: PLAYREADY
|
||||
system_id: 0
|
||||
security_level: 2000
|
||||
host: "https://keyxtractor.decryptlabs.com"
|
||||
secret: "your_decrypt_labs_api_key_here"
|
||||
|
||||
- name: "decrypt_labs_playready_sl3"
|
||||
type: "decrypt_labs"
|
||||
device_name: "SL3" # Scheme identifier - must match exactly
|
||||
device_type: PLAYREADY
|
||||
system_id: 0
|
||||
security_level: 3000
|
||||
host: "https://keyxtractor.decryptlabs.com"
|
||||
secret: "your_decrypt_labs_api_key_here"
|
||||
|
||||
# Key Vaults store your obtained Content Encryption Keys (CEKs)
|
||||
# Use 'no_push: true' to prevent a vault from receiving pushed keys
|
||||
# while still allowing it to provide keys when requested
|
||||
@@ -171,7 +215,7 @@ chapter_fallback_name: "Chapter {j:02}"
|
||||
# Case-Insensitive dictionary of headers for all Services
|
||||
headers:
|
||||
Accept-Language: "en-US,en;q=0.8"
|
||||
User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36"
|
||||
User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
|
||||
|
||||
# Override default filenames used across unshackle
|
||||
filenames:
|
||||
@@ -213,6 +257,13 @@ services:
|
||||
# Global service config
|
||||
api_key: "service_api_key"
|
||||
|
||||
# Service certificate for Widevine L1/L2 (base64 encoded)
|
||||
# This certificate is automatically used when L1/L2 schemes are selected
|
||||
# Services obtain this from their DRM provider or license server
|
||||
certificate: |
|
||||
CAUSwwUKvQIIAxIQ5US6QAvBDzfTtjb4tU/7QxiH8c+TBSKOAjCCAQoCggEBAObzvlu2hZRsapAPx4Aa4GUZj4/GjxgXUtBH4THSkM40x63wQeyVxlEEo
|
||||
# ... (full base64 certificate here)
|
||||
|
||||
# Profile-specific device configurations
|
||||
profiles:
|
||||
john_sd:
|
||||
|
||||
Reference in New Issue
Block a user