2 Commits

5 changed files with 683 additions and 62 deletions

View File

@@ -299,21 +299,6 @@ class dl:
if getattr(config, "decryption_map", None):
config.decryption = config.decryption_map.get(self.service, config.decryption)
with console.status("Loading DRM CDM...", spinner="dots"):
try:
self.cdm = self.get_cdm(self.service, self.profile)
except ValueError as e:
self.log.error(f"Failed to load CDM, {e}")
sys.exit(1)
if self.cdm:
if hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]:
self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})")
else:
self.log.info(
f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})"
)
with console.status("Loading Key Vaults...", spinner="dots"):
self.vaults = Vaults(self.service)
total_vaults = len(config.key_vaults)
@@ -352,6 +337,21 @@ class dl:
else:
self.log.debug("No vaults are currently active")
with console.status("Loading DRM CDM...", spinner="dots"):
try:
self.cdm = self.get_cdm(self.service, self.profile)
except ValueError as e:
self.log.error(f"Failed to load CDM, {e}")
sys.exit(1)
if self.cdm:
if hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]:
self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})")
else:
self.log.info(
f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})"
)
self.proxy_providers = []
if no_proxy:
ctx.params["proxy"] = None
@@ -1442,8 +1442,8 @@ class dl:
return Credential(*credentials)
return Credential.loads(credentials) # type: ignore
@staticmethod
def get_cdm(
self,
service: str,
profile: Optional[str] = None,
drm: Optional[str] = None,
@@ -1478,9 +1478,26 @@ class dl:
cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None)
if cdm_api:
is_decrypt_lab = True if cdm_api["type"] == "decrypt_labs" else False
del cdm_api["name"]
del cdm_api["type"]
return DecryptLabsRemoteCDM(service_name=service, **cdm_api) if is_decrypt_lab else RemoteCdm(**cdm_api)
if is_decrypt_lab:
device_type = cdm_api.get("device_type")
del cdm_api["name"]
del cdm_api["type"]
# Use the appropriate DecryptLabs CDM class based on device type
if device_type == "PLAYREADY" or cdm_api.get("device_name") in ["SL2", "SL3"]:
from unshackle.core.cdm.decrypt_labs_remote_cdm import DecryptLabsRemotePlayReadyCDM
# Remove unused parameters for PlayReady CDM
cdm_params = cdm_api.copy()
cdm_params.pop("device_type", None)
cdm_params.pop("system_id", None)
return DecryptLabsRemotePlayReadyCDM(service_name=service, vaults=self.vaults, **cdm_params)
else:
return DecryptLabsRemoteCDM(service_name=service, vaults=self.vaults, **cdm_api)
else:
del cdm_api["name"]
del cdm_api["type"]
return RemoteCdm(**cdm_api)
prd_path = config.directories.prds / f"{cdm_name}.prd"
if not prd_path.is_file():

View File

@@ -4,13 +4,21 @@ from typing import Optional, Type, Union
from uuid import UUID
import requests
from pyplayready.cdm import Cdm as PlayReadyCdm
from pywidevine import PSSH, Device, DeviceTypes, Key, RemoteCdm
from pywidevine.license_protocol_pb2 import SignedDrmCertificate, SignedMessage
# Copyright 2024 by DevYukine.
# Copyright 2025 by sp4rk.y.
class DecryptLabsRemoteCDM(RemoteCdm):
"""Remote CDM implementation for DecryptLabs KeyXtractor API.
Provides CDM functionality through DecryptLabs' remote API service,
supporting multiple DRM schemes including Widevine and PlayReady.
"""
def __init__(
self,
device_type: Union[DeviceTypes, str],
@@ -20,7 +28,20 @@ class DecryptLabsRemoteCDM(RemoteCdm):
secret: str,
device_name: str,
service_name: str,
vaults=None,
):
"""Initialize DecryptLabs Remote CDM.
Args:
device_type: Type of device to emulate
system_id: System identifier
security_level: DRM security level
host: DecryptLabs API host URL
secret: DecryptLabs API key for authentication
device_name: Device/scheme name (used as scheme identifier)
service_name: Service/platform name
vaults: Optional vaults reference for caching keys
"""
self.response_counter = 0
self.pssh = None
self.api_session_ids = {}
@@ -28,7 +49,28 @@ class DecryptLabsRemoteCDM(RemoteCdm):
self.service_name = service_name
self.device_name = device_name
self.keys = {}
self.scheme = "L1" if device_name == "L1" else "widevine"
self.scheme = device_name
self._has_cached_keys = False
self.vaults = vaults
self.security_level = security_level
self.host = host
class MockCertificateChain:
"""Mock certificate chain for DecryptLabs remote CDM compatibility."""
def __init__(self, scheme: str, security_level: int):
self.scheme = scheme
self.security_level = security_level
def get_name(self) -> str:
"""Return the certificate chain name for logging."""
return f"DecryptLabs-{self.scheme}"
def get_security_level(self) -> int:
"""Return the security level."""
return self.security_level
self.certificate_chain = MockCertificateChain(self.scheme, security_level)
try:
super().__init__(device_type, system_id, security_level, host, secret, device_name)
except Exception:
@@ -41,22 +83,36 @@ class DecryptLabsRemoteCDM(RemoteCdm):
raise NotImplementedError("You cannot load a DecryptLabsRemoteCDM from a local Device file.")
def open(self) -> bytes:
# We stub this method to return a random session ID for now, later we save the api session id and resolve by our random generated one.
"""Open a new CDM session.
Returns:
Random session ID bytes for internal tracking
"""
return bytes.fromhex(secrets.token_hex(16))
def close(self, session_id: bytes) -> None:
# We stub this method to do nothing.
"""Close a CDM session.
Args:
session_id: Session identifier to close
"""
pass
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
"""Set service certificate for L1/L2 schemes.
Args:
session_id: Session identifier
certificate: Service certificate (bytes or base64 string)
Returns:
Success status string
"""
if isinstance(certificate, bytes):
certificate = base64.b64encode(certificate).decode()
# certificate needs to be base64 to be sent off to the API.
# it needs to intentionally be kept as base64 encoded SignedMessage.
self.req_session.signed_device_certificate = certificate
self.req_session.privacy_mode = True
self.service_certificate = certificate
self.privacy_mode = True
return "success"
@@ -66,63 +122,114 @@ class DecryptLabsRemoteCDM(RemoteCdm):
def get_license_challenge(
self, session_id: bytes, pssh: PSSH, license_type: str = "STREAMING", privacy_mode: bool = True
) -> bytes:
"""Generate license challenge using DecryptLabs API.
Args:
session_id: Session identifier
pssh: PSSH initialization data
license_type: Type of license (default: "STREAMING")
privacy_mode: Enable privacy mode
Returns:
License challenge bytes or empty bytes if using cached keys
"""
self.pssh = pssh
scheme_to_use = self.scheme
try:
pssh_data = pssh.dumps()
if b"edef8ba9-79d6-4ace-a3c8-27dcd51d21ed" in pssh_data or "edef8ba979d64acea3c827dcd51d21ed" in pssh_data:
if self.scheme in ["SL2", "SL3"]:
scheme_to_use = "L1" if self.scheme == "SL2" else "L1"
else:
scheme_to_use = self.scheme
except Exception:
scheme_to_use = self.scheme
request_data = {
"init_data": self.pssh.dumps(),
"service_certificate": self.req_session.signed_device_certificate,
"scheme": self.scheme,
"scheme": scheme_to_use,
"service": self.service_name,
}
# Add required parameter for L1 scheme
if self.scheme == "L1":
request_data["get_cached_keys_if_exists"] = True
if scheme_to_use in ["L1", "L2"] and hasattr(self, "service_certificate"):
request_data["service_certificate"] = self.service_certificate
elif scheme_to_use in ["L1", "L2"]:
pass
request_data["get_cached_keys_if_exists"] = True
if not hasattr(self, "session_schemes"):
self.session_schemes = {}
self.session_schemes[session_id] = scheme_to_use
res = self.session(
self.host + "/get-request",
request_data,
)
# Check if we got cached keys instead of a challenge
if res.get("message_type") == "cached-keys":
# Store cached keys directly
if session_id not in self.keys:
self.keys[session_id] = []
session_keys = self.keys[session_id]
cached_keys_for_vault = {}
for cached_key in res.get("cached_keys", []):
# Handle KID format - could be hex string or UUID string
kid_str = cached_key["kid"]
try:
# Try as UUID string first
kid_uuid = UUID(kid_str)
except ValueError:
try:
# Try as hex string (like the existing code)
kid_uuid = UUID(bytes=bytes.fromhex(kid_str))
except ValueError:
# Fallback: use Key.kid_to_uuid
kid_uuid = Key.kid_to_uuid(kid_str)
session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"])))
cached_keys_for_vault[kid_uuid] = cached_key["key"]
if self.vaults and cached_keys_for_vault:
try:
self.vaults.add_keys(cached_keys_for_vault)
except Exception:
pass
if self.service_name == "NF" or "netflix" in self.service_name.lower():
request_data_no_cache = request_data.copy()
request_data_no_cache["get_cached_keys_if_exists"] = False
res_challenge = self.session(
self.host + "/get-request",
request_data_no_cache,
)
if res_challenge.get("challenge"):
self.license_request = res_challenge["challenge"]
self.api_session_ids[session_id] = res_challenge.get("session_id")
return base64.b64decode(self.license_request)
# Return empty challenge since we already have the keys
self.license_request = ""
self.api_session_ids[session_id] = None
self._has_cached_keys = True
return b""
# Normal challenge response
self.license_request = res["challenge"]
self.api_session_ids[session_id] = res["session_id"]
self._has_cached_keys = False
return base64.b64decode(self.license_request)
def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None:
"""Parse license response and extract decryption keys.
Args:
session_id: Session identifier
license_message: License response from DRM server
"""
session_id_api = self.api_session_ids[session_id]
if session_id not in self.keys:
self.keys[session_id] = []
session_keys = self.keys[session_id]
# If we already have cached keys and no session_id_api, skip processing
if session_id_api is None and session_keys:
return
@@ -135,13 +242,14 @@ class DecryptLabsRemoteCDM(RemoteCdm):
)
else:
# Ensure license_message is base64 encoded
if isinstance(license_message, bytes):
license_response_b64 = base64.b64encode(license_message).decode()
elif isinstance(license_message, str):
license_response_b64 = license_message
else:
license_response_b64 = str(license_message)
scheme_for_session = getattr(self, "session_schemes", {}).get(session_id, self.scheme)
res = self.session(
self.host + "/decrypt-response",
{
@@ -149,32 +257,95 @@ class DecryptLabsRemoteCDM(RemoteCdm):
"init_data": self.pssh.dumps(),
"license_request": self.license_request,
"license_response": license_response_b64,
"scheme": self.scheme,
"scheme": scheme_for_session,
},
)
original_keys = res["keys"].replace("\n", " ")
keys_separated = original_keys.split("--key ")
formatted_keys = []
for k in keys_separated:
if ":" in k:
key = k.strip()
formatted_keys.append(key)
for keys in formatted_keys:
session_keys.append(
(
if scheme_for_session in ["SL2", "SL3"]:
if "keys" in res and res["keys"]:
keys_data = res["keys"]
if isinstance(keys_data, str):
original_keys = keys_data.replace("\n", " ")
keys_separated = original_keys.split("--key ")
for k in keys_separated:
if ":" in k:
key_parts = k.strip().split(":")
if len(key_parts) == 2:
try:
kid_hex, key_hex = key_parts
session_keys.append(
Key(
kid=UUID(bytes=bytes.fromhex(kid_hex)),
type_="CONTENT",
key=bytes.fromhex(key_hex),
)
)
except (ValueError, TypeError):
continue
elif isinstance(keys_data, list):
for key_info in keys_data:
if isinstance(key_info, dict) and "kid" in key_info and "key" in key_info:
session_keys.append(
Key(
kid=Key.kid_to_uuid(key_info["kid"]),
type_=key_info.get("type", "CONTENT"),
key=bytes.fromhex(key_info["key"]),
)
)
else:
original_keys = res["keys"].replace("\n", " ")
keys_separated = original_keys.split("--key ")
formatted_keys = []
for k in keys_separated:
if ":" in k:
key = k.strip()
formatted_keys.append(key)
for keys in formatted_keys:
session_keys.append(
Key(
kid=UUID(bytes=bytes.fromhex(keys.split(":")[0])),
type_="CONTENT",
key=bytes.fromhex(keys.split(":")[1]),
)
)
)
def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]:
"""Get decryption keys for a session.
Args:
session_id: Session identifier
type_: Key type filter (optional)
Returns:
List of decryption keys for the session
"""
return self.keys[session_id]
def has_cached_keys(self, session_id: bytes) -> bool:
"""Check if this session has cached keys and doesn't need license request.
Args:
session_id: Session identifier to check
Returns:
True if session has cached keys, False otherwise
"""
return getattr(self, "_has_cached_keys", False) and session_id in self.keys and len(self.keys[session_id]) > 0
def session(self, url, data, retries=3):
"""Make authenticated request to DecryptLabs API.
Args:
url: API endpoint URL
data: Request payload data
retries: Number of retry attempts for failed requests
Returns:
API response JSON data
Raises:
ValueError: If API returns an error after retries
"""
res = self.req_session.post(url, json=data).json()
if res.get("message") != "success":
@@ -187,3 +358,330 @@ class DecryptLabsRemoteCDM(RemoteCdm):
raise ValueError(f"CDM API returned an error: {res['Error']}")
return res
def use_cached_keys_as_fallback(self, session_id: bytes) -> bool:
"""Use cached keys from DecryptLabs as a fallback when license server fails.
Args:
session_id: Session identifier
Returns:
True if cached keys were successfully applied, False otherwise
"""
if not hasattr(self, "_cached_keys_available") or not self._cached_keys_available:
return False
if session_id not in self.keys:
self.keys[session_id] = []
session_keys = self.keys[session_id]
cached_keys_for_vault = {}
for cached_key in self._cached_keys_available:
kid_str = cached_key["kid"]
try:
kid_uuid = UUID(kid_str)
except ValueError:
try:
kid_uuid = UUID(bytes=bytes.fromhex(kid_str))
except ValueError:
kid_uuid = Key.kid_to_uuid(kid_str)
session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"])))
cached_keys_for_vault[kid_uuid] = cached_key["key"]
if self.vaults and cached_keys_for_vault:
try:
self.vaults.add_keys(cached_keys_for_vault)
except Exception:
pass
self._has_cached_keys = True
return True
class DecryptLabsRemotePlayReadyCDM(PlayReadyCdm):
"""PlayReady Remote CDM implementation for DecryptLabs KeyXtractor API.
Provides PlayReady CDM functionality through DecryptLabs' remote API service,
supporting PlayReady DRM schemes like SL2 and SL3.
"""
def __init__(
self,
security_level: int,
host: str,
secret: str,
device_name: str,
service_name: str,
vaults=None,
client_version: str = "10.0.16384.10011",
):
"""Initialize DecryptLabs Remote PlayReady CDM.
Args:
security_level: DRM security level
host: DecryptLabs API host URL
secret: DecryptLabs API key for authentication
device_name: Device/scheme name (used as scheme identifier)
service_name: Service/platform name
vaults: Optional vaults reference for caching keys
client_version: PlayReady client version
"""
super().__init__(
security_level=security_level,
certificate_chain=None,
encryption_key=None,
signing_key=None,
client_version=client_version,
)
self.host = host
self.service_name = service_name
self.device_name = device_name
self.scheme = device_name
self.vaults = vaults
self.keys = {}
self.api_session_ids = {}
self.pssh_b64 = None
self.license_request = None
self._has_cached_keys = False
self.req_session = requests.Session()
self.req_session.headers.update({"decrypt-labs-api-key": secret})
class MockCertificateChain:
"""Mock certificate chain for DecryptLabs remote CDM compatibility."""
def __init__(self, scheme: str, security_level: int):
self.scheme = scheme
self.security_level = security_level
def get_name(self) -> str:
"""Return the certificate chain name for logging."""
return f"DecryptLabs-{self.scheme}"
def get_security_level(self) -> int:
"""Return the security level."""
return self.security_level
self.certificate_chain = MockCertificateChain(self.scheme, security_level)
def set_pssh_b64(self, pssh_b64: str):
"""Set the original base64-encoded PSSH box for DecryptLabs API.
Args:
pssh_b64: Base64-encoded PSSH box from the manifest
"""
self.pssh_b64 = pssh_b64
def open(self) -> bytes:
"""Open a new CDM session.
Returns:
Random session ID bytes for internal tracking
"""
return bytes.fromhex(secrets.token_hex(16))
def close(self, session_id: bytes) -> None:
"""Close a CDM session.
Args:
session_id: Session identifier to close
"""
pass
def get_license_challenge(self, session_id: bytes, _) -> str:
"""Generate license challenge using DecryptLabs API for PlayReady.
Args:
session_id: Session identifier
Returns:
License challenge as XML string
"""
if not (hasattr(self, "pssh_b64") and self.pssh_b64):
raise ValueError("DecryptLabs CDM requires original PSSH box data. Call set_pssh_b64() first.")
init_data = self.pssh_b64
request_data = {
"init_data": init_data,
"scheme": self.scheme,
"service": self.service_name,
"get_cached_keys_if_exists": False,
}
res = self.session(
self.host + "/get-request",
request_data,
)
if res.get("message_type") == "cached-keys":
self._cached_keys_available = res.get("cached_keys", [])
else:
self._cached_keys_available = None
self.license_request = res["challenge"]
self.api_session_ids[session_id] = res["session_id"]
self._has_cached_keys = False
try:
return base64.b64decode(self.license_request).decode()
except Exception:
return self.license_request
def parse_license(self, session_id: bytes, license_message: str) -> None:
"""Parse license response and extract decryption keys.
Args:
session_id: Session identifier
license_message: License response from DRM server (XML string)
"""
session_id_api = self.api_session_ids[session_id]
if session_id not in self.keys:
self.keys[session_id] = []
session_keys = self.keys[session_id]
if session_id_api is None and session_keys:
return
try:
license_response_b64 = base64.b64encode(license_message.encode("utf-8")).decode("utf-8")
except Exception:
return
if not (hasattr(self, "pssh_b64") and self.pssh_b64):
raise ValueError("DecryptLabs CDM requires original PSSH box data. Call set_pssh_b64() first.")
init_data = self.pssh_b64
res = self.session(
self.host + "/decrypt-response",
{
"session_id": session_id_api,
"init_data": init_data,
"license_request": self.license_request,
"license_response": license_response_b64,
"scheme": self.scheme,
},
)
if "keys" in res and res["keys"]:
keys_data = res["keys"]
if isinstance(keys_data, str):
original_keys = keys_data.replace("\n", " ")
keys_separated = original_keys.split("--key ")
for k in keys_separated:
if ":" in k:
key_parts = k.strip().split(":")
if len(key_parts) == 2:
try:
kid_hex, key_hex = key_parts
session_keys.append(
Key(
kid=UUID(bytes=bytes.fromhex(kid_hex)),
type_="CONTENT",
key=bytes.fromhex(key_hex),
)
)
except (ValueError, TypeError):
continue
elif isinstance(keys_data, list):
for key_info in keys_data:
if isinstance(key_info, dict) and "kid" in key_info and "key" in key_info:
session_keys.append(
Key(
kid=Key.kid_to_uuid(key_info["kid"]),
type_=key_info.get("type", "CONTENT"),
key=bytes.fromhex(key_info["key"]),
)
)
def get_keys(self, session_id: bytes) -> list:
"""Get decryption keys for a session.
Args:
session_id: Session identifier
Returns:
List of decryption keys for the session
"""
return self.keys.get(session_id, [])
def has_cached_keys(self, session_id: bytes) -> bool:
"""Check if this session has cached keys and doesn't need license request.
Args:
session_id: Session identifier to check
Returns:
True if session has cached keys, False otherwise
"""
return getattr(self, "_has_cached_keys", False) and session_id in self.keys and len(self.keys[session_id]) > 0
def session(self, url, data, retries=3):
"""Make authenticated request to DecryptLabs API.
Args:
url: API endpoint URL
data: Request payload data
retries: Number of retry attempts for failed requests
Returns:
API response JSON data
Raises:
ValueError: If API returns an error after retries
"""
res = self.req_session.post(url, json=data).json()
if res.get("message") != "success":
if "License Response Decryption Process Failed at the very beginning" in res.get("Error", ""):
if retries > 0:
return self.session(url, data, retries=retries - 1)
else:
raise ValueError(f"CDM API returned an error: {res['Error']}")
else:
raise ValueError(f"CDM API returned an error: {res['Error']}")
return res
def use_cached_keys_as_fallback(self, session_id: bytes) -> bool:
"""Use cached keys from DecryptLabs as a fallback when license server fails.
Args:
session_id: Session identifier
Returns:
True if cached keys were successfully applied, False otherwise
"""
if not hasattr(self, "_cached_keys_available") or not self._cached_keys_available:
return False
if session_id not in self.keys:
self.keys[session_id] = []
session_keys = self.keys[session_id]
cached_keys_for_vault = {}
for cached_key in self._cached_keys_available:
kid_str = cached_key["kid"]
try:
kid_uuid = UUID(kid_str)
except ValueError:
try:
kid_uuid = UUID(bytes=bytes.fromhex(kid_str))
except ValueError:
kid_uuid = Key.kid_to_uuid(kid_str)
session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"])))
cached_keys_for_vault[kid_uuid] = cached_key["key"]
if self.vaults and cached_keys_for_vault:
try:
self.vaults.add_keys(cached_keys_for_vault)
except Exception:
pass
self._has_cached_keys = True
return True

View File

@@ -224,14 +224,59 @@ class PlayReady:
def kids(self) -> list[UUID]:
return self._kids
def _extract_keys_from_cdm(self, cdm: PlayReadyCdm, session_id: bytes) -> dict:
"""Extract keys from CDM session with cross-library compatibility.
Args:
cdm: CDM instance
session_id: Session identifier
Returns:
Dictionary mapping KID UUIDs to hex keys
"""
keys = {}
for key in cdm.get_keys(session_id):
if hasattr(key, "key_id"):
kid = key.key_id
elif hasattr(key, "kid"):
kid = key.kid
else:
continue
if hasattr(key, "key") and hasattr(key.key, "hex"):
key_hex = key.key.hex()
elif hasattr(key, "key") and isinstance(key.key, bytes):
key_hex = key.key.hex()
elif hasattr(key, "key") and isinstance(key.key, str):
key_hex = key.key
else:
continue
keys[kid] = key_hex
return keys
def get_content_keys(self, cdm: PlayReadyCdm, certificate: Callable, licence: Callable) -> None:
for kid in self.kids:
if kid in self.content_keys:
continue
session_id = cdm.open()
try:
if hasattr(cdm, "set_pssh_b64") and self.pssh_b64:
cdm.set_pssh_b64(self.pssh_b64)
challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0])
license_res = licence(challenge=challenge)
try:
license_res = licence(challenge=challenge)
except Exception:
if hasattr(cdm, "use_cached_keys_as_fallback"):
if cdm.use_cached_keys_as_fallback(session_id):
keys = self._extract_keys_from_cdm(cdm, session_id)
self.content_keys.update(keys)
continue
raise
if isinstance(license_res, bytes):
license_str = license_res.decode(errors="ignore")
@@ -245,7 +290,7 @@ class PlayReady:
pass
cdm.parse_license(session_id, license_str)
keys = {key.key_id: key.key.hex() for key in cdm.get_keys(session_id)}
keys = self._extract_keys_from_cdm(cdm, session_id)
self.content_keys.update(keys)
finally:
cdm.close(session_id)

View File

@@ -185,7 +185,12 @@ class Widevine:
if cert and hasattr(cdm, "set_service_certificate"):
cdm.set_service_certificate(session_id, cert)
cdm.parse_license(session_id, licence(challenge=cdm.get_license_challenge(session_id, self.pssh)))
challenge = cdm.get_license_challenge(session_id, self.pssh)
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
pass
else:
cdm.parse_license(session_id, licence(challenge=challenge))
self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")}
if not self.content_keys:
@@ -213,10 +218,15 @@ class Widevine:
if cert and hasattr(cdm, "set_service_certificate"):
cdm.set_service_certificate(session_id, cert)
cdm.parse_license(
session_id,
licence(session_id=session_id, challenge=cdm.get_license_challenge(session_id, self.pssh)),
)
challenge = cdm.get_license_challenge(session_id, self.pssh)
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
pass
else:
cdm.parse_license(
session_id,
licence(session_id=session_id, challenge=challenge),
)
self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")}
if not self.content_keys:

View File

@@ -105,6 +105,50 @@ remote_cdm:
host: https://domain-2.com/api
secret: secret_key
- name: "decrypt_labs_chrome"
type: "decrypt_labs" # Required to identify as DecryptLabs CDM
device_name: "ChromeCDM" # Scheme identifier - must match exactly
device_type: CHROME
system_id: 4464 # Doesn't matter
security_level: 3
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here" # Replace with your API key
- name: "decrypt_labs_l1"
type: "decrypt_labs"
device_name: "L1" # Scheme identifier - must match exactly
device_type: ANDROID
system_id: 4464
security_level: 1
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
- name: "decrypt_labs_l2"
type: "decrypt_labs"
device_name: "L2" # Scheme identifier - must match exactly
device_type: ANDROID
system_id: 4464
security_level: 2
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
- name: "decrypt_labs_playready_sl2"
type: "decrypt_labs"
device_name: "SL2" # Scheme identifier - must match exactly
device_type: PLAYREADY
system_id: 0
security_level: 2000
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
- name: "decrypt_labs_playready_sl3"
type: "decrypt_labs"
device_name: "SL3" # Scheme identifier - must match exactly
device_type: PLAYREADY
system_id: 0
security_level: 3000
host: "https://keyxtractor.decryptlabs.com"
secret: "your_decrypt_labs_api_key_here"
# Key Vaults store your obtained Content Encryption Keys (CEKs)
# Use 'no_push: true' to prevent a vault from receiving pushed keys
# while still allowing it to provide keys when requested
@@ -171,7 +215,7 @@ chapter_fallback_name: "Chapter {j:02}"
# Case-Insensitive dictionary of headers for all Services
headers:
Accept-Language: "en-US,en;q=0.8"
User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36"
User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
# Override default filenames used across unshackle
filenames:
@@ -213,6 +257,13 @@ services:
# Global service config
api_key: "service_api_key"
# Service certificate for Widevine L1/L2 (base64 encoded)
# This certificate is automatically used when L1/L2 schemes are selected
# Services obtain this from their DRM provider or license server
certificate: |
CAUSwwUKvQIIAxIQ5US6QAvBDzfTtjb4tU/7QxiH8c+TBSKOAjCCAQoCggEBAObzvlu2hZRsapAPx4Aa4GUZj4/GjxgXUtBH4THSkM40x63wQeyVxlEEo
# ... (full base64 certificate here)
# Profile-specific device configurations
profiles:
john_sd: