feat: implement intelligent caching system for CDM license requests

This commit is contained in:
Andy
2025-09-02 18:48:34 +00:00
parent ea5ec40bcd
commit 76fb2eea95
3 changed files with 197 additions and 81 deletions

View File

@@ -70,10 +70,24 @@ class DecryptLabsRemoteCDMExceptions:
class DecryptLabsRemoteCDM: class DecryptLabsRemoteCDM:
""" """
Decrypt Labs Remote CDM implementation compatible with pywidevine's CDM interface. Decrypt Labs Remote CDM implementation with intelligent caching system.
This class provides a drop-in replacement for pywidevine's local CDM using This class provides a drop-in replacement for pywidevine's local CDM using
Decrypt Labs' KeyXtractor API service. Decrypt Labs' KeyXtractor API service, enhanced with smart caching logic
that minimizes unnecessary license requests.
Key Features:
- Compatible with both Widevine and PlayReady DRM schemes
- Intelligent caching that compares required vs. available keys
- Automatic key combination for mixed cache/license scenarios
- Seamless fallback to license requests when keys are missing
Intelligent Caching System:
1. DRM classes (PlayReady/Widevine) provide required KIDs via set_required_kids()
2. get_license_challenge() first checks for cached keys
3. If cached keys satisfy requirements, returns empty challenge (no license needed)
4. If keys are missing, makes targeted license request for remaining keys
5. parse_license() combines cached and license keys intelligently
""" """
service_certificate_challenge = b"\x08\x04" service_certificate_challenge = b"\x08\x04"
@@ -127,6 +141,7 @@ class DecryptLabsRemoteCDM:
self._sessions: Dict[bytes, Dict[str, Any]] = {} self._sessions: Dict[bytes, Dict[str, Any]] = {}
self._pssh_b64 = None self._pssh_b64 = None
self._required_kids: Optional[List[str]] = None
self._http_session = Session() self._http_session = Session()
self._http_session.headers.update( self._http_session.headers.update(
{ {
@@ -160,6 +175,29 @@ class DecryptLabsRemoteCDM:
"""Store base64-encoded PSSH data for PlayReady compatibility.""" """Store base64-encoded PSSH data for PlayReady compatibility."""
self._pssh_b64 = pssh_b64 self._pssh_b64 = pssh_b64
def set_required_kids(self, kids: List[Union[str, UUID]]) -> None:
"""
Set the required Key IDs for intelligent caching decisions.
This method enables the CDM to make smart decisions about when to request
additional keys via license challenges. When cached keys are available,
the CDM will compare them against the required KIDs to determine if a
license request is still needed for missing keys.
Args:
kids: List of required Key IDs as UUIDs or hex strings
Note:
Should be called by DRM classes (PlayReady/Widevine) before making
license challenge requests to enable optimal caching behavior.
"""
self._required_kids = []
for kid in kids:
if isinstance(kid, UUID):
self._required_kids.append(str(kid).replace("-", "").lower())
else:
self._required_kids.append(str(kid).replace("-", "").lower())
def _generate_session_id(self) -> bytes: def _generate_session_id(self) -> bytes:
"""Generate a unique session ID.""" """Generate a unique session ID."""
return secrets.token_bytes(16) return secrets.token_bytes(16)
@@ -292,34 +330,25 @@ class DecryptLabsRemoteCDM:
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
session = self._sessions[session_id] session = self._sessions[session_id]
pssh = session.get("pssh")
if not pssh:
return False
if self.vaults:
key_ids = []
if hasattr(pssh, "key_ids"):
key_ids = pssh.key_ids
elif hasattr(pssh, "kids"):
key_ids = pssh.kids
for kid in key_ids:
key, _ = self.vaults.get_key(kid)
if key and key.count("0") != len(key):
return True
session_keys = session.get("keys", []) session_keys = session.get("keys", [])
if session_keys and len(session_keys) > 0: return len(session_keys) > 0
return True
return False
def get_license_challenge( def get_license_challenge(
self, session_id: bytes, pssh_or_wrm: Any, license_type: str = "STREAMING", privacy_mode: bool = True self, session_id: bytes, pssh_or_wrm: Any, license_type: str = "STREAMING", privacy_mode: bool = True
) -> bytes: ) -> bytes:
""" """
Generate a license challenge using Decrypt Labs API. Generate a license challenge using Decrypt Labs API with intelligent caching.
This method implements smart caching logic that:
1. First attempts to retrieve cached keys from the API
2. If required KIDs are set, compares cached keys against requirements
3. Only makes a license request if keys are missing
4. Returns empty challenge if all required keys are cached
The intelligent caching works as follows:
- With required KIDs set: Only requests license for missing keys
- Without required KIDs: Returns any available cached keys
- For PlayReady: Combines cached keys with license keys seamlessly
Args: Args:
session_id: Session identifier session_id: Session identifier
@@ -328,11 +357,14 @@ class DecryptLabsRemoteCDM:
privacy_mode: Whether to use privacy mode - for compatibility only privacy_mode: Whether to use privacy mode - for compatibility only
Returns: Returns:
License challenge as bytes License challenge as bytes, or empty bytes if cached keys satisfy requirements
Raises: Raises:
InvalidSession: If session ID is invalid InvalidSession: If session ID is invalid
requests.RequestException: If API request fails requests.RequestException: If API request fails
Note:
Call set_required_kids() before this method for optimal caching behavior.
""" """
_ = license_type, privacy_mode _ = license_type, privacy_mode
@@ -343,8 +375,13 @@ class DecryptLabsRemoteCDM:
session["pssh"] = pssh_or_wrm session["pssh"] = pssh_or_wrm
init_data = self._get_init_data_from_pssh(pssh_or_wrm) init_data = self._get_init_data_from_pssh(pssh_or_wrm)
already_tried_cache = session.get("tried_cache", False)
request_data = {"scheme": self.device_name, "init_data": init_data, "get_cached_keys_if_exists": True} request_data = {
"scheme": self.device_name,
"init_data": init_data,
"get_cached_keys_if_exists": not already_tried_cache,
}
if self.device_name in ["L1", "L2", "SL2", "SL3"] and self.service_name: if self.device_name in ["L1", "L2", "SL2", "SL3"] and self.service_name:
request_data["service"] = self.service_name request_data["service"] = self.service_name
@@ -367,20 +404,80 @@ class DecryptLabsRemoteCDM:
error_msg += f" - Error: {data['error']}" error_msg += f" - Error: {data['error']}"
raise requests.RequestException(f"API error: {error_msg}") raise requests.RequestException(f"API error: {error_msg}")
if data.get("message_type") == "cached-keys" or "cached_keys" in data: message_type = data.get("message_type")
if message_type == "cached-keys" or "cached_keys" in data:
"""
Handle cached keys response from API.
When the API returns cached keys, we need to determine if they satisfy
our requirements or if we need to make an additional license request
for missing keys.
"""
cached_keys = data.get("cached_keys", []) cached_keys = data.get("cached_keys", [])
session["keys"] = self._parse_cached_keys(cached_keys) parsed_keys = self._parse_cached_keys(cached_keys)
session["keys"] = parsed_keys
session["tried_cache"] = True
if self._required_kids:
cached_kids = set()
for key in parsed_keys:
if isinstance(key, dict) and "kid" in key:
cached_kids.add(key["kid"].replace("-", "").lower())
required_kids = set(self._required_kids)
missing_kids = required_kids - cached_kids
if missing_kids:
session["cached_keys"] = parsed_keys
request_data["get_cached_keys_if_exists"] = False
response = self._http_session.post(f"{self.host}/get-request", json=request_data, timeout=30)
if response.status_code == 200:
data = response.json()
if data.get("message") == "success" and "challenge" in data:
challenge = base64.b64decode(data["challenge"])
session["challenge"] = challenge
session["decrypt_labs_session_id"] = data["session_id"]
return challenge
return b""
else:
return b""
else:
return b""
if message_type == "license-request" or "challenge" in data:
challenge = base64.b64decode(data["challenge"])
session["challenge"] = challenge
session["decrypt_labs_session_id"] = data["session_id"]
return challenge
error_msg = f"Unexpected API response format. message_type={message_type}, available_fields={list(data.keys())}"
if data.get("message"):
error_msg = f"API response: {data['message']} - {error_msg}"
if "details" in data:
error_msg += f" - Details: {data['details']}"
if "error" in data:
error_msg += f" - Error: {data['error']}"
if already_tried_cache and data.get("message") == "success":
return b"" return b""
challenge = base64.b64decode(data["challenge"]) raise requests.RequestException(error_msg)
session["challenge"] = challenge
session["decrypt_labs_session_id"] = data["session_id"]
return challenge
def parse_license(self, session_id: bytes, license_message: Union[bytes, str]) -> None: def parse_license(self, session_id: bytes, license_message: Union[bytes, str]) -> None:
""" """
Parse license response using Decrypt Labs API. Parse license response using Decrypt Labs API with intelligent key combination.
For PlayReady content with partial cached keys, this method intelligently
combines the cached keys with newly obtained license keys, avoiding
duplicates while ensuring all required keys are available.
The key combination process:
1. Extracts keys from the license response
2. If cached keys exist (PlayReady), combines them with license keys
3. Removes duplicate keys by comparing normalized KIDs
4. Updates the session with the complete key set
Args: Args:
session_id: Session identifier session_id: Session identifier
@@ -395,7 +492,7 @@ class DecryptLabsRemoteCDM:
session = self._sessions[session_id] session = self._sessions[session_id]
if session["keys"]: if session["keys"] and not (self.is_playready and "cached_keys" in session):
return return
if not session.get("challenge") or not session.get("decrypt_labs_session_id"): if not session.get("challenge") or not session.get("decrypt_labs_session_id"):
@@ -439,7 +536,48 @@ class DecryptLabsRemoteCDM:
error_msg += f" - Details: {data['details']}" error_msg += f" - Details: {data['details']}"
raise requests.RequestException(f"License decrypt error: {error_msg}") raise requests.RequestException(f"License decrypt error: {error_msg}")
session["keys"] = self._parse_keys_response(data) license_keys = self._parse_keys_response(data)
if self.is_playready and "cached_keys" in session:
"""
Combine cached keys with license keys for PlayReady content.
This ensures we have both the cached keys (obtained earlier) and
any additional keys from the license response, without duplicates.
"""
cached_keys = session.get("cached_keys", [])
all_keys = list(cached_keys)
for license_key in license_keys:
already_exists = False
license_kid = None
if isinstance(license_key, dict) and "kid" in license_key:
license_kid = license_key["kid"].replace("-", "").lower()
elif hasattr(license_key, "kid"):
license_kid = str(license_key.kid).replace("-", "").lower()
elif hasattr(license_key, "key_id"):
license_kid = str(license_key.key_id).replace("-", "").lower()
if license_kid:
for cached_key in cached_keys:
cached_kid = None
if isinstance(cached_key, dict) and "kid" in cached_key:
cached_kid = cached_key["kid"].replace("-", "").lower()
elif hasattr(cached_key, "kid"):
cached_kid = str(cached_key.kid).replace("-", "").lower()
elif hasattr(cached_key, "key_id"):
cached_kid = str(cached_key.key_id).replace("-", "").lower()
if cached_kid == license_kid:
already_exists = True
break
if not already_exists:
all_keys.append(license_key)
session["keys"] = all_keys
else:
session["keys"] = license_keys
if self.vaults and session["keys"]: if self.vaults and session["keys"]:
key_dict = {UUID(hex=key["kid"]): key["key"] for key in session["keys"] if key["type"] == "CONTENT"} key_dict = {UUID(hex=key["kid"]): key["key"] for key in session["keys"] if key["type"] == "CONTENT"}
@@ -470,26 +608,6 @@ class DecryptLabsRemoteCDM:
return keys return keys
def _load_cached_keys(self, session_id: bytes) -> None:
"""Load cached keys from vaults and Decrypt Labs API."""
session = self._sessions[session_id]
pssh = session["pssh"]
keys = []
if self.vaults:
key_ids = []
if hasattr(pssh, "key_ids"):
key_ids = pssh.key_ids
elif hasattr(pssh, "kids"):
key_ids = pssh.kids
for kid in key_ids:
key, _ = self.vaults.get_key(kid)
if key and key.count("0") != len(key):
keys.append({"kid": kid.hex, "key": key, "type": "CONTENT"})
session["keys"] = keys
def _parse_cached_keys(self, cached_keys_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def _parse_cached_keys(self, cached_keys_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Parse cached keys from API response. """Parse cached keys from API response.

View File

@@ -256,30 +256,19 @@ class PlayReady:
return keys return keys
def get_content_keys(self, cdm: PlayReadyCdm, certificate: Callable, licence: Callable) -> None: def get_content_keys(self, cdm: PlayReadyCdm, certificate: Callable, licence: Callable) -> None:
for kid in self.kids: session_id = cdm.open()
if kid in self.content_keys: try:
continue if hasattr(cdm, "set_pssh_b64") and self.pssh_b64:
cdm.set_pssh_b64(self.pssh_b64)
session_id = cdm.open() if hasattr(cdm, "set_required_kids"):
try: cdm.set_required_kids(self.kids)
if hasattr(cdm, "set_pssh_b64") and self.pssh_b64:
cdm.set_pssh_b64(self.pssh_b64)
challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0]) challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0])
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
pass
else:
try:
license_res = licence(challenge=challenge)
except Exception:
if hasattr(cdm, "use_cached_keys_as_fallback"):
if cdm.use_cached_keys_as_fallback(session_id):
keys = self._extract_keys_from_cdm(cdm, session_id)
self.content_keys.update(keys)
continue
raise
if challenge:
try:
license_res = licence(challenge=challenge)
if isinstance(license_res, bytes): if isinstance(license_res, bytes):
license_str = license_res.decode(errors="ignore") license_str = license_res.decode(errors="ignore")
else: else:
@@ -292,10 +281,13 @@ class PlayReady:
pass pass
cdm.parse_license(session_id, license_str) cdm.parse_license(session_id, license_str)
keys = self._extract_keys_from_cdm(cdm, session_id) except Exception:
self.content_keys.update(keys) raise
finally:
cdm.close(session_id) keys = self._extract_keys_from_cdm(cdm, session_id)
self.content_keys.update(keys)
finally:
cdm.close(session_id)
if not self.content_keys: if not self.content_keys:
raise PlayReady.Exceptions.EmptyLicense("No Content Keys were within the License") raise PlayReady.Exceptions.EmptyLicense("No Content Keys were within the License")

View File

@@ -185,6 +185,9 @@ class Widevine:
if cert and hasattr(cdm, "set_service_certificate"): if cert and hasattr(cdm, "set_service_certificate"):
cdm.set_service_certificate(session_id, cert) cdm.set_service_certificate(session_id, cert)
if hasattr(cdm, "set_required_kids"):
cdm.set_required_kids(self.kids)
challenge = cdm.get_license_challenge(session_id, self.pssh) challenge = cdm.get_license_challenge(session_id, self.pssh)
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id): if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):
@@ -218,6 +221,9 @@ class Widevine:
if cert and hasattr(cdm, "set_service_certificate"): if cert and hasattr(cdm, "set_service_certificate"):
cdm.set_service_certificate(session_id, cert) cdm.set_service_certificate(session_id, cert)
if hasattr(cdm, "set_required_kids"):
cdm.set_required_kids(self.kids)
challenge = cdm.get_license_challenge(session_id, self.pssh) challenge = cdm.get_license_challenge(session_id, self.pssh)
if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id): if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id):