From 3277ab0d77eb577fc93e2a7e3f7d9e85849f6fdc Mon Sep 17 00:00:00 2001 From: Andy Date: Tue, 5 Aug 2025 23:28:30 +0000 Subject: [PATCH] feat(playready): Enhance KID extraction from PSSH with base64 support and XML parsing --- unshackle/core/drm/playready.py | 82 +++++++++++++++++++++++++++++---- 1 file changed, 74 insertions(+), 8 deletions(-) diff --git a/unshackle/core/drm/playready.py b/unshackle/core/drm/playready.py index 37590c7..38d77c4 100644 --- a/unshackle/core/drm/playready.py +++ b/unshackle/core/drm/playready.py @@ -39,17 +39,23 @@ class PlayReady: if not isinstance(pssh, PSSH): raise TypeError(f"Expected pssh to be a {PSSH}, not {pssh!r}") - kids: list[UUID] = [] - for header in pssh.wrm_headers: - try: - signed_ids, _, _, _ = header.read_attributes() - except Exception: - continue - for signed_id in signed_ids: + if pssh_b64: + kids = self._extract_kids_from_pssh_b64(pssh_b64) + else: + kids = [] + + # Extract KIDs using pyplayready's method (may miss some KIDs) + if not kids: + for header in pssh.wrm_headers: try: - kids.append(UUID(bytes_le=base64.b64decode(signed_id.value))) + signed_ids, _, _, _ = header.read_attributes() except Exception: continue + for signed_id in signed_ids: + try: + kids.append(UUID(bytes_le=base64.b64decode(signed_id.value))) + except Exception: + continue if kid: if isinstance(kid, str): @@ -72,6 +78,66 @@ class PlayReady: if pssh_b64: self.data.setdefault("pssh_b64", pssh_b64) + def _extract_kids_from_pssh_b64(self, pssh_b64: str) -> list[UUID]: + """Extract all KIDs from base64-encoded PSSH data.""" + try: + import xml.etree.ElementTree as ET + + # Decode the PSSH + pssh_bytes = base64.b64decode(pssh_b64) + + # Try to find XML in the PSSH data + # PlayReady PSSH usually has XML embedded in it + pssh_str = pssh_bytes.decode("utf-16le", errors="ignore") + + # Find WRMHEADER + xml_start = pssh_str.find("") + len("") + clean_xml = clean_xml[:xml_end] + + root = ET.fromstring(clean_xml) + ns = {"pr": "http://schemas.microsoft.com/DRM/2007/03/PlayReadyHeader"} + + kids = [] + + # Extract from CUSTOMATTRIBUTES/KIDS + kid_elements = root.findall(".//pr:CUSTOMATTRIBUTES/pr:KIDS/pr:KID", ns) + for kid_elem in kid_elements: + value = kid_elem.get("VALUE") + if value: + try: + kid_bytes = base64.b64decode(value + "==") + kid_uuid = UUID(bytes_le=kid_bytes) + kids.append(kid_uuid) + except Exception: + pass + + # Also get individual KID + individual_kids = root.findall(".//pr:DATA/pr:KID", ns) + for kid_elem in individual_kids: + if kid_elem.text: + try: + kid_bytes = base64.b64decode(kid_elem.text.strip() + "==") + kid_uuid = UUID(bytes_le=kid_bytes) + if kid_uuid not in kids: + kids.append(kid_uuid) + except Exception: + pass + + return kids + + except Exception: + pass + + return [] + @classmethod def from_track(cls, track: AnyTrack, session: Optional[Session] = None) -> PlayReady: if not session: