Files
unshackle/unshackle/commands/prd.py
Andy 283736c57b revert: remove tinycss SyntaxWarning suppression and fix isort
Revert the warnings filter added in 2d5e807 as it didn't work as expected to suppress the tinycss SyntaxWarning. Also fix isort order in prd.py for pyplayready imports.
2025-10-08 21:26:26 +00:00

272 lines
9.9 KiB
Python

import logging
from pathlib import Path
from typing import Optional
import click
import requests
from Crypto.Random import get_random_bytes
from pyplayready import InvalidCertificateChain, OutdatedDevice
from pyplayready.cdm import Cdm
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.device import Device
from pyplayready.system.bcert import Certificate, CertificateChain
from pyplayready.system.pssh import PSSH
from unshackle.core.config import config
from unshackle.core.constants import context_settings
@click.group(
short_help="Manage creation of PRD (Playready Device) files.",
context_settings=context_settings,
)
def prd() -> None:
"""Manage creation of PRD (Playready Device) files."""
@prd.command()
@click.argument("paths", type=Path, nargs=-1)
@click.option(
"-e",
"--encryption_key",
type=Path,
required=False,
help="Optional Device ECC private encryption key",
)
@click.option(
"-s",
"--signing_key",
type=Path,
required=False,
help="Optional Device ECC private signing key",
)
@click.option("-o", "--output", type=Path, default=None, help="Output Directory")
@click.pass_context
def new(
ctx: click.Context,
paths: tuple[Path, ...],
encryption_key: Optional[Path],
signing_key: Optional[Path],
output: Optional[Path],
) -> None:
"""Create a new .PRD PlayReady Device file.
Accepts either paths to a group key and certificate or a single directory
containing ``zgpriv.dat`` and ``bgroupcert.dat``.
"""
if len(paths) == 1 and paths[0].is_dir():
device_dir = paths[0]
group_key = device_dir / "zgpriv.dat"
group_certificate = device_dir / "bgroupcert.dat"
if not group_key.is_file() or not group_certificate.is_file():
raise click.UsageError("Folder must contain zgpriv.dat and bgroupcert.dat", ctx)
elif len(paths) == 2:
group_key, group_certificate = paths
if not group_key.is_file():
raise click.UsageError("group_key: Not a path to a file, or it doesn't exist.", ctx)
if not group_certificate.is_file():
raise click.UsageError("group_certificate: Not a path to a file, or it doesn't exist.", ctx)
device_dir = None
else:
raise click.UsageError(
"Provide either a folder path or paths to group_key and group_certificate",
ctx,
)
if encryption_key and not encryption_key.is_file():
raise click.UsageError("encryption_key: Not a path to a file, or it doesn't exist.", ctx)
if signing_key and not signing_key.is_file():
raise click.UsageError("signing_key: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("prd")
encryption_key_obj = ECCKey.load(encryption_key) if encryption_key else ECCKey.generate()
signing_key_obj = ECCKey.load(signing_key) if signing_key else ECCKey.generate()
group_key_obj = ECCKey.load(group_key)
certificate_chain = CertificateChain.load(group_certificate)
if certificate_chain.get(0).get_issuer_key() != group_key_obj.public_bytes():
raise InvalidCertificateChain("Group key does not match this certificate")
new_certificate = Certificate.new_leaf_cert(
cert_id=get_random_bytes(16),
security_level=certificate_chain.get_security_level(),
client_id=get_random_bytes(16),
signing_key=signing_key_obj,
encryption_key=encryption_key_obj,
group_key=group_key_obj,
parent=certificate_chain,
)
certificate_chain.prepend(new_certificate)
certificate_chain.verify()
device = Device(
group_key=group_key_obj.dumps(),
encryption_key=encryption_key_obj.dumps(),
signing_key=signing_key_obj.dumps(),
group_certificate=certificate_chain.dumps(),
)
if output and output.suffix:
if output.suffix.lower() != ".prd":
log.warning(
"Saving PRD with the file extension '%s' but '.prd' is recommended.",
output.suffix,
)
out_path = output
else:
out_dir = output or (device_dir or config.directories.prds)
out_path = out_dir / f"{device.get_name()}.prd"
if out_path.exists():
log.error("A file already exists at the path '%s', cannot overwrite.", out_path)
return
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(device.dumps())
log.info("Created Playready Device (.prd) file, %s", out_path.name)
log.info(" + Security Level: %s", device.security_level)
log.info(" + Group Key: %s bytes", len(device.group_key.dumps()))
log.info(" + Encryption Key: %s bytes", len(device.encryption_key.dumps()))
log.info(" + Signing Key: %s bytes", len(device.signing_key.dumps()))
log.info(" + Group Certificate: %s bytes", len(device.group_certificate.dumps()))
log.info(" + Saved to: %s", out_path.absolute())
@prd.command(name="reprovision")
@click.argument("prd_path", type=Path)
@click.option(
"-e",
"--encryption_key",
type=Path,
required=False,
help="Optional Device ECC private encryption key",
)
@click.option(
"-s",
"--signing_key",
type=Path,
required=False,
help="Optional Device ECC private signing key",
)
@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
@click.pass_context
def reprovision_device(
ctx: click.Context,
prd_path: Path,
encryption_key: Optional[Path],
signing_key: Optional[Path],
output: Optional[Path] = None,
) -> None:
"""Reprovision a Playready Device (.prd) file."""
if not prd_path.is_file():
raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("prd")
log.info("Reprovisioning Playready Device (.prd) file, %s", prd_path.name)
device = Device.load(prd_path)
if device.group_key is None:
raise OutdatedDevice(
"Device does not support reprovisioning, re-create it or use a Device with a version of 3 or higher"
)
device.group_certificate.remove(0)
encryption_key_obj = ECCKey.load(encryption_key) if encryption_key else ECCKey.generate()
signing_key_obj = ECCKey.load(signing_key) if signing_key else ECCKey.generate()
device.encryption_key = encryption_key_obj
device.signing_key = signing_key_obj
new_certificate = Certificate.new_leaf_cert(
cert_id=get_random_bytes(16),
security_level=device.group_certificate.get_security_level(),
client_id=get_random_bytes(16),
signing_key=signing_key_obj,
encryption_key=encryption_key_obj,
group_key=device.group_key,
parent=device.group_certificate,
)
device.group_certificate.prepend(new_certificate)
if output and output.suffix:
if output.suffix.lower() != ".prd":
log.warning(
"Saving PRD with the file extension '%s' but '.prd' is recommended.",
output.suffix,
)
out_path = output
else:
out_path = prd_path
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(device.dumps())
log.info("Reprovisioned Playready Device (.prd) file, %s", out_path.name)
@prd.command()
@click.argument("device", type=Path)
@click.option(
"-c",
"--ckt",
type=click.Choice(["aesctr", "aescbc"], case_sensitive=False),
default="aesctr",
help="Content Key Encryption Type",
)
@click.option(
"-sl",
"--security-level",
type=click.Choice(["150", "2000", "3000"], case_sensitive=False),
default="2000",
help="Minimum Security Level",
)
@click.pass_context
def test(
ctx: click.Context,
device: Path,
ckt: str,
security_level: str,
) -> None:
"""Test a Playready Device on the Microsoft demo server."""
if not device.is_file():
raise click.UsageError("device: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("prd")
prd_device = Device.load(device)
log.info("Loaded Device: %s", prd_device.get_name())
cdm = Cdm.from_device(prd_device)
log.info("Loaded CDM")
session_id = cdm.open()
log.info("Opened Session")
pssh_b64 = "AAADfHBzc2gAAAAAmgTweZhAQoarkuZb4IhflQAAA1xcAwAAAQABAFIDPABXAFIATQBIAEUAQQBEAEUAUgAgAHgAbQBsAG4AcwA9ACIAaAB0AHQAcAA6AC8ALwBzAGMAaABlAG0AYQBzAC4AbQBpAGMAcgBvAHMAbwBmAHQALgBjAG8AbQAvAEQAUgBNAC8AMgAwADAANwAvADAAMwAvAFAAbABhAHkAUgBlAGEAZAB5AEgAZQBhAGQAZQByACIAIAB2AGUAcgBzAGkAbwBuAD0AIgA0AC4AMAAuADAALgAwACIAPgA8AEQAQQBUAEEAPgA8AFAAUgBPAFQARQBDAFQASQBOAEYATwA+ADwASwBFAFkATABFAE4APgAxADYAPAAvAEsARQBZAEwARQBOAD4APABBAEwARwBJAEQAPgBBAEUAUwBDAFQAUgA8AC8AQQBMAEcASQBEAD4APAAvAFAAUgBPAFQARQBDAFQASQBOAEYATwA+ADwASwBJAEQAPgA0AFIAcABsAGIAKwBUAGIATgBFAFMAOAB0AEcAawBOAEYAVwBUAEUASABBAD0APQA8AC8ASwBJAEQAPgA8AEMASABFAEMASwBTAFUATQA+AEsATABqADMAUQB6AFEAUAAvAE4AQQA9ADwALwBDAEgARQBDAEsAUwBVAE0APgA8AEwAQQBfAFUAUgBMAD4AaAB0AHQAcABzADoALwAvAHAAcgBvAGYAZgBpAGMAaQBhAGwAcwBpAHQAZQAuAGsAZQB5AGQAZQBsAGkAdgBlAHIAeQAuAG0AZQBkAGkAYQBzAGUAcgB2AGkAYwBlAHMALgB3AGkAbgBkAG8AdwBzAC4AbgBlAHQALwBQAGwAYQB5AFIAZQBhAGQAeQAvADwALwBMAEEAXwBVAFIATAA+ADwAQwBVAFMAVABPAE0AQQBUAFQAUgBJAEIAVQBUAEUAUwA+ADwASQBJAFMAXwBEAFIATQBfAFYARQBSAFMASQBPAE4APgA4AC4AMQAuADIAMwAwADQALgAzADEAPAAvAEkASQBTAF8ARABSAE0AXwBWAEUAUgBTAEkATwBOAD4APAAvAEMAVQBTAFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
pssh = PSSH(pssh_b64)
challenge = cdm.get_license_challenge(session_id, pssh.wrm_headers[0])
log.info("Created License Request")
license_server = f"https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:{security_level},ckt:{ckt})"
response = requests.post(
url=license_server,
headers={"Content-Type": "text/xml; charset=UTF-8"},
data=challenge,
)
cdm.parse_license(session_id, response.text)
log.info("License Parsed Successfully")
for key in cdm.get_keys(session_id):
log.info(f"{key.key_id.hex}:{key.key.hex()}")
cdm.close(session_id)
log.info("Closed Session")