feat(playready): Enhance KID extraction from PSSH with base64 support and XML parsing

This commit is contained in:
Andy
2025-08-05 23:28:30 +00:00
parent be0f7299f8
commit 3277ab0d77

View File

@@ -39,17 +39,23 @@ class PlayReady:
if not isinstance(pssh, PSSH):
raise TypeError(f"Expected pssh to be a {PSSH}, not {pssh!r}")
kids: list[UUID] = []
for header in pssh.wrm_headers:
try:
signed_ids, _, _, _ = header.read_attributes()
except Exception:
continue
for signed_id in signed_ids:
if pssh_b64:
kids = self._extract_kids_from_pssh_b64(pssh_b64)
else:
kids = []
# Extract KIDs using pyplayready's method (may miss some KIDs)
if not kids:
for header in pssh.wrm_headers:
try:
kids.append(UUID(bytes_le=base64.b64decode(signed_id.value)))
signed_ids, _, _, _ = header.read_attributes()
except Exception:
continue
for signed_id in signed_ids:
try:
kids.append(UUID(bytes_le=base64.b64decode(signed_id.value)))
except Exception:
continue
if kid:
if isinstance(kid, str):
@@ -72,6 +78,66 @@ class PlayReady:
if pssh_b64:
self.data.setdefault("pssh_b64", pssh_b64)
def _extract_kids_from_pssh_b64(self, pssh_b64: str) -> list[UUID]:
"""Extract all KIDs from base64-encoded PSSH data."""
try:
import xml.etree.ElementTree as ET
# Decode the PSSH
pssh_bytes = base64.b64decode(pssh_b64)
# Try to find XML in the PSSH data
# PlayReady PSSH usually has XML embedded in it
pssh_str = pssh_bytes.decode("utf-16le", errors="ignore")
# Find WRMHEADER
xml_start = pssh_str.find("<WRMHEADER")
if xml_start == -1:
# Try UTF-8
pssh_str = pssh_bytes.decode("utf-8", errors="ignore")
xml_start = pssh_str.find("<WRMHEADER")
if xml_start != -1:
clean_xml = pssh_str[xml_start:]
xml_end = clean_xml.find("</WRMHEADER>") + len("</WRMHEADER>")
clean_xml = clean_xml[:xml_end]
root = ET.fromstring(clean_xml)
ns = {"pr": "http://schemas.microsoft.com/DRM/2007/03/PlayReadyHeader"}
kids = []
# Extract from CUSTOMATTRIBUTES/KIDS
kid_elements = root.findall(".//pr:CUSTOMATTRIBUTES/pr:KIDS/pr:KID", ns)
for kid_elem in kid_elements:
value = kid_elem.get("VALUE")
if value:
try:
kid_bytes = base64.b64decode(value + "==")
kid_uuid = UUID(bytes_le=kid_bytes)
kids.append(kid_uuid)
except Exception:
pass
# Also get individual KID
individual_kids = root.findall(".//pr:DATA/pr:KID", ns)
for kid_elem in individual_kids:
if kid_elem.text:
try:
kid_bytes = base64.b64decode(kid_elem.text.strip() + "==")
kid_uuid = UUID(bytes_le=kid_bytes)
if kid_uuid not in kids:
kids.append(kid_uuid)
except Exception:
pass
return kids
except Exception:
pass
return []
@classmethod
def from_track(cls, track: AnyTrack, session: Optional[Session] = None) -> PlayReady:
if not session: