feat: Add comprehensive JSON debug logging system

Implements a complete structured logging system for troubleshooting and service development.

Features:
- Binary toggle via --debug flag or debug: true in config
- JSON Lines (.jsonl) format for easy parsing and analysis
- Comprehensive logging of all operations:
  * Session info (version, platform, Python version)
  * CLI parameters and service configuration
  * CDM details (Widevine/PlayReady, security levels)
  * Authentication status
  * Title and track metadata
  * DRM operations (PSSH, KIDs, license requests)
  * Vault queries with key retrieval
  * Full error traces with context

- Configurable key logging via debug_keys option
- Smart redaction (passwords, tokens, cookies always redacted)
- Error logging for all critical operations:
  * Authentication failures
  * Title fetching errors
  * Track retrieval errors
  * License request failures (Widevine & PlayReady)
  * Vault operation errors

- Removed old text logging system
This commit is contained in:
Andy
2025-10-13 23:49:01 +00:00
parent 45902bba13
commit 8437ba24d5
5 changed files with 687 additions and 38 deletions

View File

@@ -56,7 +56,8 @@ from unshackle.core.titles.episode import Episode
from unshackle.core.tracks import Audio, Subtitle, Tracks, Video from unshackle.core.tracks import Audio, Subtitle, Tracks, Video
from unshackle.core.tracks.attachment import Attachment from unshackle.core.tracks.attachment import Attachment
from unshackle.core.tracks.hybrid import Hybrid from unshackle.core.tracks.hybrid import Hybrid
from unshackle.core.utilities import get_system_fonts, is_close_match, time_elapsed_since from unshackle.core.utilities import (get_debug_logger, get_system_fonts, init_debug_logger, is_close_match,
time_elapsed_since)
from unshackle.core.utils import tags from unshackle.core.utils import tags
from unshackle.core.utils.click_types import (LANGUAGE_RANGE, QUALITY_LIST, SEASON_RANGE, ContextData, MultipleChoice, from unshackle.core.utils.click_types import (LANGUAGE_RANGE, QUALITY_LIST, SEASON_RANGE, ContextData, MultipleChoice,
SubtitleCodecChoice, VideoCodecChoice) SubtitleCodecChoice, VideoCodecChoice)
@@ -313,6 +314,40 @@ class dl:
self.tmdb_name = tmdb_name self.tmdb_name = tmdb_name
self.tmdb_year = tmdb_year self.tmdb_year = tmdb_year
# Initialize debug logger with service name if debug logging is enabled
if config.debug or logging.root.level == logging.DEBUG:
from collections import defaultdict
from datetime import datetime
debug_log_path = config.directories.logs / config.filenames.debug_log.format_map(
defaultdict(str, service=self.service, time=datetime.now().strftime("%Y%m%d-%H%M%S"))
)
init_debug_logger(
log_path=debug_log_path,
enabled=True,
log_keys=config.debug_keys
)
self.debug_logger = get_debug_logger()
if self.debug_logger:
self.debug_logger.log(
level="INFO",
operation="download_init",
message=f"Download command initialized for service {self.service}",
service=self.service,
context={
"profile": profile,
"proxy": proxy,
"tag": tag,
"tmdb_id": tmdb_id,
"tmdb_name": tmdb_name,
"tmdb_year": tmdb_year,
"cli_params": {k: v for k, v in ctx.params.items() if k not in ['profile', 'proxy', 'tag', 'tmdb_id', 'tmdb_name', 'tmdb_year']}
}
)
else:
self.debug_logger = None
if self.profile: if self.profile:
self.log.info(f"Using profile: '{self.profile}'") self.log.info(f"Using profile: '{self.profile}'")
@@ -321,6 +356,13 @@ class dl:
if service_config_path.exists(): if service_config_path.exists():
self.service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8")) self.service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8"))
self.log.info("Service Config loaded") self.log.info("Service Config loaded")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="load_service_config",
service=self.service,
context={"config_path": str(service_config_path), "config": self.service_config}
)
else: else:
self.service_config = {} self.service_config = {}
merge_dict(config.services.get(self.service), self.service_config) merge_dict(config.services.get(self.service), self.service_config)
@@ -384,18 +426,32 @@ class dl:
self.cdm = self.get_cdm(self.service, self.profile) self.cdm = self.get_cdm(self.service, self.profile)
except ValueError as e: except ValueError as e:
self.log.error(f"Failed to load CDM, {e}") self.log.error(f"Failed to load CDM, {e}")
if self.debug_logger:
self.debug_logger.log_error("load_cdm", e, service=self.service)
sys.exit(1) sys.exit(1)
if self.cdm: if self.cdm:
cdm_info = {}
if isinstance(self.cdm, DecryptLabsRemoteCDM): if isinstance(self.cdm, DecryptLabsRemoteCDM):
drm_type = "PlayReady" if self.cdm.is_playready else "Widevine" drm_type = "PlayReady" if self.cdm.is_playready else "Widevine"
self.log.info(f"Loaded {drm_type} Remote CDM: DecryptLabs (L{self.cdm.security_level})") self.log.info(f"Loaded {drm_type} Remote CDM: DecryptLabs (L{self.cdm.security_level})")
cdm_info = {"type": "DecryptLabs", "drm_type": drm_type, "security_level": self.cdm.security_level}
elif hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]: elif hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]:
self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})") self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})")
cdm_info = {"type": "Widevine", "system_id": self.cdm.system_id, "security_level": self.cdm.security_level, "device_type": self.cdm.device_type.name}
else: else:
self.log.info( self.log.info(
f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})" f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})"
) )
cdm_info = {"type": "PlayReady", "certificate": self.cdm.certificate_chain.get_name(), "security_level": self.cdm.security_level}
if self.debug_logger and cdm_info:
self.debug_logger.log(
level="INFO",
operation="load_cdm",
service=self.service,
context={"cdm": cdm_info}
)
self.proxy_providers = [] self.proxy_providers = []
if no_proxy: if no_proxy:
@@ -521,18 +577,83 @@ class dl:
else: else:
vaults_only = not cdm_only vaults_only = not cdm_only
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="drm_mode_config",
service=self.service,
context={
"cdm_only": cdm_only,
"vaults_only": vaults_only,
"mode": "CDM only" if cdm_only else ("Vaults only" if vaults_only else "Both CDM and Vaults")
}
)
with console.status("Authenticating with Service...", spinner="dots"): with console.status("Authenticating with Service...", spinner="dots"):
cookies = self.get_cookie_jar(self.service, self.profile) try:
credential = self.get_credentials(self.service, self.profile) cookies = self.get_cookie_jar(self.service, self.profile)
service.authenticate(cookies, credential) credential = self.get_credentials(self.service, self.profile)
if cookies or credential: service.authenticate(cookies, credential)
self.log.info("Authenticated with Service") if cookies or credential:
self.log.info("Authenticated with Service")
if self.debug_logger:
self.debug_logger.log(
level="INFO",
operation="authenticate",
service=self.service,
context={
"has_cookies": bool(cookies),
"has_credentials": bool(credential),
"profile": self.profile
}
)
except Exception as e:
if self.debug_logger:
self.debug_logger.log_error(
"authenticate",
e,
service=self.service,
context={"profile": self.profile}
)
raise
with console.status("Fetching Title Metadata...", spinner="dots"): with console.status("Fetching Title Metadata...", spinner="dots"):
titles = service.get_titles_cached() try:
if not titles: titles = service.get_titles_cached()
self.log.error("No titles returned, nothing to download...") if not titles:
sys.exit(1) self.log.error("No titles returned, nothing to download...")
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="get_titles",
service=self.service,
message="No titles returned from service",
success=False
)
sys.exit(1)
except Exception as e:
if self.debug_logger:
self.debug_logger.log_error(
"get_titles",
e,
service=self.service
)
raise
if self.debug_logger:
titles_info = {
"type": titles.__class__.__name__,
"count": len(titles) if hasattr(titles, "__len__") else 1,
"title": str(titles)
}
if hasattr(titles, "seasons"):
titles_info["seasons"] = len(titles.seasons) if hasattr(titles, "seasons") else 0
self.debug_logger.log(
level="INFO",
operation="get_titles",
service=self.service,
context={"titles": titles_info}
)
if self.tmdb_year and self.tmdb_id: if self.tmdb_year and self.tmdb_id:
sample_title = titles[0] if hasattr(titles, "__getitem__") else titles sample_title = titles[0] if hasattr(titles, "__getitem__") else titles
@@ -621,8 +742,55 @@ class dl:
title.tracks.subtitles = [] title.tracks.subtitles = []
with console.status("Getting tracks...", spinner="dots"): with console.status("Getting tracks...", spinner="dots"):
title.tracks.add(service.get_tracks(title), warn_only=True) try:
title.tracks.chapters = service.get_chapters(title) title.tracks.add(service.get_tracks(title), warn_only=True)
title.tracks.chapters = service.get_chapters(title)
except Exception as e:
if self.debug_logger:
self.debug_logger.log_error(
"get_tracks",
e,
service=self.service,
context={"title": str(title)}
)
raise
if self.debug_logger:
tracks_info = {
"title": str(title),
"video_tracks": len(title.tracks.videos),
"audio_tracks": len(title.tracks.audio),
"subtitle_tracks": len(title.tracks.subtitles),
"has_chapters": bool(title.tracks.chapters),
"videos": [{
"codec": str(v.codec),
"resolution": f"{v.width}x{v.height}" if v.width and v.height else "unknown",
"bitrate": v.bitrate,
"range": str(v.range),
"language": str(v.language) if v.language else None,
"drm": [str(type(d).__name__) for d in v.drm] if v.drm else []
} for v in title.tracks.videos],
"audio": [{
"codec": str(a.codec),
"bitrate": a.bitrate,
"channels": a.channels,
"language": str(a.language) if a.language else None,
"descriptive": a.descriptive,
"drm": [str(type(d).__name__) for d in a.drm] if a.drm else []
} for a in title.tracks.audio],
"subtitles": [{
"codec": str(s.codec),
"language": str(s.language) if s.language else None,
"forced": s.forced,
"sdh": s.sdh
} for s in title.tracks.subtitles]
}
self.debug_logger.log(
level="INFO",
operation="get_tracks",
service=self.service,
context=tracks_info
)
# strip SDH subs to non-SDH if no equivalent same-lang non-SDH is available # strip SDH subs to non-SDH if no equivalent same-lang non-SDH is available
# uses a loose check, e.g, wont strip en-US SDH sub if a non-SDH en-GB is available # uses a loose check, e.g, wont strip en-US SDH sub if a non-SDH en-GB is available
@@ -1009,6 +1177,14 @@ class dl:
download.result() download.result()
except KeyboardInterrupt: except KeyboardInterrupt:
console.print(Padding(":x: Download Cancelled...", (0, 5, 1, 5))) console.print(Padding(":x: Download Cancelled...", (0, 5, 1, 5)))
if self.debug_logger:
self.debug_logger.log(
level="WARNING",
operation="download_tracks",
service=self.service,
message="Download cancelled by user",
context={"title": str(title)}
)
return return
except Exception as e: # noqa except Exception as e: # noqa
error_messages = [ error_messages = [
@@ -1031,6 +1207,19 @@ class dl:
# CalledProcessError already lists the exception trace # CalledProcessError already lists the exception trace
console.print_exception() console.print_exception()
console.print(Padding(Group(*error_messages), (1, 5))) console.print(Padding(Group(*error_messages), (1, 5)))
if self.debug_logger:
self.debug_logger.log_error(
"download_tracks",
e,
service=self.service,
context={
"title": str(title),
"error_type": type(e).__name__,
"tracks_count": len(title.tracks),
"returncode": getattr(e, "returncode", None)
}
)
return return
if skip_dl: if skip_dl:
@@ -1394,6 +1583,20 @@ class dl:
self.cdm = playready_cdm self.cdm = playready_cdm
if isinstance(drm, Widevine): if isinstance(drm, Widevine):
if self.debug_logger:
self.debug_logger.log_drm_operation(
drm_type="Widevine",
operation="prepare_drm",
service=self.service,
context={
"track": str(track),
"title": str(title),
"pssh": drm.pssh.dumps() if drm.pssh else None,
"kids": [k.hex for k in drm.kids],
"track_kid": track_kid.hex if track_kid else None
}
)
with self.DRM_TABLE_LOCK: with self.DRM_TABLE_LOCK:
pssh_display = self._truncate_pssh_for_display(drm.pssh.dumps(), "Widevine") pssh_display = self._truncate_pssh_for_display(drm.pssh.dumps(), "Widevine")
cek_tree = Tree(Text.assemble(("Widevine", "cyan"), (f"({pssh_display})", "text"), overflow="fold")) cek_tree = Tree(Text.assemble(("Widevine", "cyan"), (f"({pssh_display})", "text"), overflow="fold"))
@@ -1422,11 +1625,32 @@ class dl:
if not any(f"{kid.hex}:{content_key}" in x.label for x in cek_tree.children): if not any(f"{kid.hex}:{content_key}" in x.label for x in cek_tree.children):
cek_tree.add(label) cek_tree.add(label)
self.vaults.add_key(kid, content_key, excluding=vault_used) self.vaults.add_key(kid, content_key, excluding=vault_used)
if self.debug_logger:
self.debug_logger.log_vault_query(
vault_name=vault_used,
operation="get_key_success",
service=self.service,
context={
"kid": kid.hex,
"content_key": content_key,
"track": str(track),
"from_cache": True
}
)
elif vaults_only: elif vaults_only:
msg = f"No Vault has a Key for {kid.hex} and --vaults-only was used" msg = f"No Vault has a Key for {kid.hex} and --vaults-only was used"
cek_tree.add(f"[logging.level.error]{msg}") cek_tree.add(f"[logging.level.error]{msg}")
if not pre_existing_tree: if not pre_existing_tree:
table.add_row(cek_tree) table.add_row(cek_tree)
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="vault_key_not_found",
service=self.service,
message=msg,
context={"kid": kid.hex, "track": str(track)}
)
raise Widevine.Exceptions.CEKNotFound(msg) raise Widevine.Exceptions.CEKNotFound(msg)
else: else:
need_license = True need_license = True
@@ -1437,6 +1661,18 @@ class dl:
if need_license and not vaults_only: if need_license and not vaults_only:
from_vaults = drm.content_keys.copy() from_vaults = drm.content_keys.copy()
if self.debug_logger:
self.debug_logger.log(
level="INFO",
operation="get_license",
service=self.service,
message="Requesting Widevine license from service",
context={
"track": str(track),
"kids_needed": [k.hex for k in all_kids if k not in drm.content_keys]
}
)
try: try:
if self.service == "NF": if self.service == "NF":
drm.get_NF_content_keys(cdm=self.cdm, licence=licence, certificate=certificate) drm.get_NF_content_keys(cdm=self.cdm, licence=licence, certificate=certificate)
@@ -1450,8 +1686,30 @@ class dl:
cek_tree.add(f"[logging.level.error]{msg}") cek_tree.add(f"[logging.level.error]{msg}")
if not pre_existing_tree: if not pre_existing_tree:
table.add_row(cek_tree) table.add_row(cek_tree)
if self.debug_logger:
self.debug_logger.log_error(
"get_license",
e,
service=self.service,
context={
"track": str(track),
"exception_type": type(e).__name__
}
)
raise e raise e
if self.debug_logger:
self.debug_logger.log(
level="INFO",
operation="license_keys_retrieved",
service=self.service,
context={
"track": str(track),
"keys_count": len(drm.content_keys),
"kids": [k.hex for k in drm.content_keys.keys()]
}
)
for kid_, key in drm.content_keys.items(): for kid_, key in drm.content_keys.items():
if key == "0" * 32: if key == "0" * 32:
key = f"[red]{key}[/]" key = f"[red]{key}[/]"
@@ -1497,6 +1755,20 @@ class dl:
export.write_text(jsonpickle.dumps(keys, indent=4), encoding="utf8") export.write_text(jsonpickle.dumps(keys, indent=4), encoding="utf8")
elif isinstance(drm, PlayReady): elif isinstance(drm, PlayReady):
if self.debug_logger:
self.debug_logger.log_drm_operation(
drm_type="PlayReady",
operation="prepare_drm",
service=self.service,
context={
"track": str(track),
"title": str(title),
"pssh": drm.pssh_b64 or "",
"kids": [k.hex for k in drm.kids],
"track_kid": track_kid.hex if track_kid else None
}
)
with self.DRM_TABLE_LOCK: with self.DRM_TABLE_LOCK:
pssh_display = self._truncate_pssh_for_display(drm.pssh_b64 or "", "PlayReady") pssh_display = self._truncate_pssh_for_display(drm.pssh_b64 or "", "PlayReady")
cek_tree = Tree( cek_tree = Tree(
@@ -1531,11 +1803,33 @@ class dl:
if not any(f"{kid.hex}:{content_key}" in x.label for x in cek_tree.children): if not any(f"{kid.hex}:{content_key}" in x.label for x in cek_tree.children):
cek_tree.add(label) cek_tree.add(label)
self.vaults.add_key(kid, content_key, excluding=vault_used) self.vaults.add_key(kid, content_key, excluding=vault_used)
if self.debug_logger:
self.debug_logger.log_vault_query(
vault_name=vault_used,
operation="get_key_success",
service=self.service,
context={
"kid": kid.hex,
"content_key": content_key,
"track": str(track),
"from_cache": True,
"drm_type": "PlayReady"
}
)
elif vaults_only: elif vaults_only:
msg = f"No Vault has a Key for {kid.hex} and --vaults-only was used" msg = f"No Vault has a Key for {kid.hex} and --vaults-only was used"
cek_tree.add(f"[logging.level.error]{msg}") cek_tree.add(f"[logging.level.error]{msg}")
if not pre_existing_tree: if not pre_existing_tree:
table.add_row(cek_tree) table.add_row(cek_tree)
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="vault_key_not_found",
service=self.service,
message=msg,
context={"kid": kid.hex, "track": str(track), "drm_type": "PlayReady"}
)
raise PlayReady.Exceptions.CEKNotFound(msg) raise PlayReady.Exceptions.CEKNotFound(msg)
else: else:
need_license = True need_license = True
@@ -1556,6 +1850,17 @@ class dl:
cek_tree.add(f"[logging.level.error]{msg}") cek_tree.add(f"[logging.level.error]{msg}")
if not pre_existing_tree: if not pre_existing_tree:
table.add_row(cek_tree) table.add_row(cek_tree)
if self.debug_logger:
self.debug_logger.log_error(
"get_license_playready",
e,
service=self.service,
context={
"track": str(track),
"exception_type": type(e).__name__,
"drm_type": "PlayReady"
}
)
raise e raise e
for kid_, key in drm.content_keys.items(): for kid_, key in drm.content_keys.items():

View File

@@ -1,6 +1,5 @@
import atexit import atexit
import logging import logging
from pathlib import Path
import click import click
import urllib3 import urllib3
@@ -16,23 +15,16 @@ from unshackle.core.config import config
from unshackle.core.console import ComfyRichHandler, console from unshackle.core.console import ComfyRichHandler, console
from unshackle.core.constants import context_settings from unshackle.core.constants import context_settings
from unshackle.core.update_checker import UpdateChecker from unshackle.core.update_checker import UpdateChecker
from unshackle.core.utilities import rotate_log_file from unshackle.core.utilities import close_debug_logger, init_debug_logger
LOGGING_PATH = None
@click.command(cls=Commands, invoke_without_command=True, context_settings=context_settings) @click.command(cls=Commands, invoke_without_command=True, context_settings=context_settings)
@click.option("-v", "--version", is_flag=True, default=False, help="Print version information.") @click.option("-v", "--version", is_flag=True, default=False, help="Print version information.")
@click.option("-d", "--debug", is_flag=True, default=False, help="Enable DEBUG level logs.") @click.option("-d", "--debug", is_flag=True, default=False, help="Enable DEBUG level logs and JSON debug logging.")
@click.option( def main(version: bool, debug: bool) -> None:
"--log",
"log_path",
type=Path,
default=config.directories.logs / config.filenames.log,
help="Log path (or filename). Path can contain the following f-string args: {name} {time}.",
)
def main(version: bool, debug: bool, log_path: Path) -> None:
"""unshackle—Modular Movie, TV, and Music Archival Software.""" """unshackle—Modular Movie, TV, and Music Archival Software."""
debug_logging_enabled = debug or config.debug
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG if debug else logging.INFO, level=logging.DEBUG if debug else logging.INFO,
format="%(message)s", format="%(message)s",
@@ -48,11 +40,8 @@ def main(version: bool, debug: bool, log_path: Path) -> None:
], ],
) )
if log_path: if debug_logging_enabled:
global LOGGING_PATH init_debug_logger(enabled=True)
console.record = True
new_log_path = rotate_log_file(log_path)
LOGGING_PATH = new_log_path
urllib3.disable_warnings(InsecureRequestWarning) urllib3.disable_warnings(InsecureRequestWarning)
@@ -98,10 +87,9 @@ def main(version: bool, debug: bool, log_path: Path) -> None:
@atexit.register @atexit.register
def save_log(): def cleanup():
if console.record and LOGGING_PATH: """Clean up resources on exit."""
# TODO: Currently semi-bust. Everything that refreshes gets duplicated. close_debug_logger()
console.save_text(LOGGING_PATH)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -31,6 +31,7 @@ class Config:
class _Filenames: class _Filenames:
# default filenames, do not modify here, set via config # default filenames, do not modify here, set via config
log = "unshackle_{name}_{time}.log" # Directories.logs log = "unshackle_{name}_{time}.log" # Directories.logs
debug_log = "unshackle_debug_{service}_{time}.jsonl" # Directories.logs
config = "config.yaml" # Directories.services / tag config = "config.yaml" # Directories.services / tag
root_config = "unshackle.yaml" # Directories.user_configs root_config = "unshackle.yaml" # Directories.user_configs
chapters = "Chapters_{title}_{random}.txt" # Directories.temp chapters = "Chapters_{title}_{random}.txt" # Directories.temp
@@ -98,6 +99,9 @@ class Config:
self.title_cache_max_retention: int = kwargs.get("title_cache_max_retention", 86400) # 24 hours default self.title_cache_max_retention: int = kwargs.get("title_cache_max_retention", 86400) # 24 hours default
self.title_cache_enabled: bool = kwargs.get("title_cache_enabled", True) self.title_cache_enabled: bool = kwargs.get("title_cache_enabled", True)
self.debug: bool = kwargs.get("debug", False)
self.debug_keys: bool = kwargs.get("debug_keys", False)
@classmethod @classmethod
def from_yaml(cls, path: Path) -> Config: def from_yaml(cls, path: Path) -> Config:
if not path.exists(): if not path.exists():

View File

@@ -1,19 +1,22 @@
import ast import ast
import contextlib import contextlib
import importlib.util import importlib.util
import json
import logging import logging
import os import os
import re import re
import socket import socket
import sys import sys
import time import time
import traceback
import unicodedata import unicodedata
from collections import defaultdict from collections import defaultdict
from datetime import datetime from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from types import ModuleType from types import ModuleType
from typing import Optional, Sequence, Union from typing import Any, Optional, Sequence, Union
from urllib.parse import ParseResult, urlparse from urllib.parse import ParseResult, urlparse
from uuid import uuid4
import chardet import chardet
import requests import requests
@@ -122,7 +125,7 @@ def is_exact_match(language: Union[str, Language], languages: Sequence[Union[str
return closest_match(language, list(map(str, languages)))[1] <= LANGUAGE_EXACT_DISTANCE return closest_match(language, list(map(str, languages)))[1] <= LANGUAGE_EXACT_DISTANCE
def get_boxes(data: bytes, box_type: bytes, as_bytes: bool = False) -> Box: def get_boxes(data: bytes, box_type: bytes, as_bytes: bool = False) -> Box: # type: ignore
""" """
Scan a byte array for a wanted MP4/ISOBMFF box, then parse and yield each find. Scan a byte array for a wanted MP4/ISOBMFF box, then parse and yield each find.
@@ -457,3 +460,334 @@ class FPS(ast.NodeVisitor):
@classmethod @classmethod
def parse(cls, expr: str) -> float: def parse(cls, expr: str) -> float:
return cls().visit(ast.parse(expr).body[0]) return cls().visit(ast.parse(expr).body[0])
"""
Structured JSON debug logging for unshackle.
Provides comprehensive debugging information for service developers and troubleshooting.
When enabled, logs all operations, requests, responses, DRM operations, and errors in JSON format.
"""
class DebugLogger:
"""
Structured JSON debug logger for unshackle.
Outputs JSON Lines format where each line is a complete JSON object.
This makes it easy to parse, filter, and analyze logs programmatically.
"""
def __init__(self, log_path: Optional[Path] = None, enabled: bool = False, log_keys: bool = False):
"""
Initialize the debug logger.
Args:
log_path: Path to the log file. If None, logging is disabled.
enabled: Whether debug logging is enabled.
log_keys: Whether to log decryption keys (for debugging key issues).
"""
self.enabled = enabled and log_path is not None
self.log_path = log_path
self.session_id = str(uuid4())[:8]
self.file_handle = None
self.log_keys = log_keys
if self.enabled:
self.log_path.parent.mkdir(parents=True, exist_ok=True)
self.file_handle = open(self.log_path, "a", encoding="utf-8")
self._log_session_start()
def _log_session_start(self):
"""Log the start of a new session with environment information."""
import platform
from unshackle.core import __version__
self.log(
level="INFO",
operation="session_start",
message="Debug logging session started",
context={
"unshackle_version": __version__,
"python_version": sys.version,
"platform": platform.platform(),
"platform_system": platform.system(),
"platform_release": platform.release(),
},
)
def log(
self,
level: str = "DEBUG",
operation: str = "",
message: str = "",
context: Optional[dict[str, Any]] = None,
service: Optional[str] = None,
error: Optional[Exception] = None,
request: Optional[dict[str, Any]] = None,
response: Optional[dict[str, Any]] = None,
duration_ms: Optional[float] = None,
success: Optional[bool] = None,
**kwargs,
):
"""
Log a structured JSON entry.
Args:
level: Log level (DEBUG, INFO, WARNING, ERROR)
operation: Name of the operation being performed
message: Human-readable message
context: Additional context information
service: Service name (e.g., DSNP, NF)
error: Exception object if an error occurred
request: Request details (URL, method, headers, body)
response: Response details (status, headers, body)
duration_ms: Operation duration in milliseconds
success: Whether the operation succeeded
**kwargs: Additional fields to include in the log entry
"""
if not self.enabled or not self.file_handle:
return
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"session_id": self.session_id,
"level": level,
}
if operation:
entry["operation"] = operation
if message:
entry["message"] = message
if service:
entry["service"] = service
if context:
entry["context"] = self._sanitize_data(context)
if request:
entry["request"] = self._sanitize_data(request)
if response:
entry["response"] = self._sanitize_data(response)
if duration_ms is not None:
entry["duration_ms"] = duration_ms
if success is not None:
entry["success"] = success
if error:
entry["error"] = {
"type": type(error).__name__,
"message": str(error),
"traceback": traceback.format_exception(type(error), error, error.__traceback__),
}
for key, value in kwargs.items():
if key not in entry:
entry[key] = self._sanitize_data(value)
try:
self.file_handle.write(json.dumps(entry, default=str) + "\n")
self.file_handle.flush()
except Exception as e:
print(f"Failed to write debug log: {e}", file=sys.stderr)
def _sanitize_data(self, data: Any) -> Any:
"""
Sanitize data for JSON serialization.
Handles complex objects and removes sensitive information.
"""
if data is None:
return None
if isinstance(data, (str, int, float, bool)):
return data
if isinstance(data, (list, tuple)):
return [self._sanitize_data(item) for item in data]
if isinstance(data, dict):
sanitized = {}
for key, value in data.items():
key_lower = str(key).lower()
has_prefix = key_lower.startswith("has_")
is_always_sensitive = not has_prefix and any(
sensitive in key_lower for sensitive in ["password", "token", "secret", "auth", "cookie"]
)
is_key_field = (
"key" in key_lower
and not has_prefix
and not any(safe in key_lower for safe in ["_count", "_id", "_type", "kid", "keys_", "key_found"])
)
should_redact = is_always_sensitive or (is_key_field and not self.log_keys)
if should_redact:
sanitized[key] = "[REDACTED]"
else:
sanitized[key] = self._sanitize_data(value)
return sanitized
if isinstance(data, bytes):
try:
return data.hex()
except Exception:
return "[BINARY_DATA]"
if isinstance(data, Path):
return str(data)
try:
return str(data)
except Exception:
return f"[{type(data).__name__}]"
def log_operation_start(self, operation: str, **kwargs) -> str:
"""
Log the start of an operation and return an operation ID.
Args:
operation: Name of the operation
**kwargs: Additional context
Returns:
Operation ID that can be used to log the end of the operation
"""
op_id = str(uuid4())[:8]
self.log(
level="DEBUG",
operation=f"{operation}_start",
message=f"Starting operation: {operation}",
operation_id=op_id,
**kwargs,
)
return op_id
def log_operation_end(
self, operation: str, operation_id: str, success: bool = True, duration_ms: Optional[float] = None, **kwargs
):
"""
Log the end of an operation.
Args:
operation: Name of the operation
operation_id: Operation ID from log_operation_start
success: Whether the operation succeeded
duration_ms: Operation duration in milliseconds
**kwargs: Additional context
"""
self.log(
level="INFO" if success else "ERROR",
operation=f"{operation}_end",
message=f"Finished operation: {operation}",
operation_id=operation_id,
success=success,
duration_ms=duration_ms,
**kwargs,
)
def log_service_call(self, method: str, url: str, **kwargs):
"""
Log a service API call.
Args:
method: HTTP method (GET, POST, etc.)
url: Request URL
**kwargs: Additional request details (headers, body, etc.)
"""
self.log(level="DEBUG", operation="service_call", request={"method": method, "url": url, **kwargs})
def log_drm_operation(self, drm_type: str, operation: str, **kwargs):
"""
Log a DRM operation (PSSH extraction, license request, key retrieval).
Args:
drm_type: DRM type (Widevine, PlayReady, etc.)
operation: DRM operation name
**kwargs: Additional context (PSSH, KIDs, keys, etc.)
"""
self.log(
level="DEBUG", operation=f"drm_{operation}", message=f"{drm_type} {operation}", drm_type=drm_type, **kwargs
)
def log_vault_query(self, vault_name: str, operation: str, **kwargs):
"""
Log a vault query operation.
Args:
vault_name: Name of the vault
operation: Vault operation (get_key, add_key, etc.)
**kwargs: Additional context (KID, key, success, etc.)
"""
self.log(
level="DEBUG",
operation=f"vault_{operation}",
message=f"Vault {vault_name}: {operation}",
vault=vault_name,
**kwargs,
)
def log_error(self, operation: str, error: Exception, **kwargs):
"""
Log an error with full context.
Args:
operation: Operation that failed
error: Exception that occurred
**kwargs: Additional context
"""
self.log(
level="ERROR",
operation=operation,
message=f"Error in {operation}: {str(error)}",
error=error,
success=False,
**kwargs,
)
def close(self):
"""Close the log file and clean up resources."""
if self.file_handle:
self.log(level="INFO", operation="session_end", message="Debug logging session ended")
self.file_handle.close()
self.file_handle = None
# Global debug logger instance
_debug_logger: Optional[DebugLogger] = None
def get_debug_logger() -> Optional[DebugLogger]:
"""Get the global debug logger instance."""
return _debug_logger
def init_debug_logger(log_path: Optional[Path] = None, enabled: bool = False, log_keys: bool = False):
"""
Initialize the global debug logger.
Args:
log_path: Path to the log file
enabled: Whether debug logging is enabled
log_keys: Whether to log decryption keys (for debugging key issues)
"""
global _debug_logger
if _debug_logger:
_debug_logger.close()
_debug_logger = DebugLogger(log_path=log_path, enabled=enabled, log_keys=log_keys)
def close_debug_logger():
"""Close the global debug logger."""
global _debug_logger
if _debug_logger:
_debug_logger.close()
_debug_logger = None
__all__ = (
"DebugLogger",
"get_debug_logger",
"init_debug_logger",
"close_debug_logger",
)

View File

@@ -32,6 +32,24 @@ title_cache_enabled: true # Enable/disable title caching globally (default: true
title_cache_time: 1800 # Cache duration in seconds (default: 1800 = 30 minutes) title_cache_time: 1800 # Cache duration in seconds (default: 1800 = 30 minutes)
title_cache_max_retention: 86400 # Maximum cache retention for fallback when API fails (default: 86400 = 24 hours) title_cache_max_retention: 86400 # Maximum cache retention for fallback when API fails (default: 86400 = 24 hours)
# Debug logging configuration
# Comprehensive JSON-based debug logging for troubleshooting and service development
debug: false # Enable structured JSON debug logging (default: false)
# When enabled with --debug flag or set to true:
# - Creates JSON Lines (.jsonl) log files with complete debugging context
# - Logs: session info, CLI params, service config, CDM details, authentication,
# titles, tracks metadata, DRM operations, vault queries, errors with stack traces
# - File location: logs/unshackle_debug_{service}_{timestamp}.jsonl
# - Also creates text log: logs/unshackle_root_{timestamp}.log
debug_keys: false # Log decryption keys in debug logs (default: false)
# Set to true to include actual decryption keys in logs
# Useful for debugging key retrieval and decryption issues
# SECURITY NOTE: Passwords, tokens, cookies, and session tokens
# are ALWAYS redacted regardless of this setting
# Only affects: content_key, key fields (the actual CEKs)
# Never affects: kid, keys_count, key_id (metadata is always logged)
# Muxing configuration # Muxing configuration
muxing: muxing:
set_title: false set_title: false
@@ -239,7 +257,7 @@ headers:
# Override default filenames used across unshackle # Override default filenames used across unshackle
filenames: filenames:
log: "unshackle_{name}_{time}.log" debug_log: "unshackle_debug_{service}_{time}.jsonl" # JSON Lines debug log file
config: "config.yaml" config: "config.yaml"
root_config: "unshackle.yaml" root_config: "unshackle.yaml"
chapters: "Chapters_{title}_{random}.txt" chapters: "Chapters_{title}_{random}.txt"