From 2afc59624d2159081af8126f1c7933f27f3b694a Mon Sep 17 00:00:00 2001 From: Sp5rky Date: Sun, 28 Sep 2025 21:49:00 -0600 Subject: [PATCH] feat: add REST API server with download management Very early dev work, more changes will be active in this branch. - Implement download queue management and worker system - Add OpenAPI/Swagger documentation - Include download progress tracking and status endpoints - Add API authentication and error handling - Update core components to support API integration --- CONFIG.md | 3 + pyproject.toml | 1 + unshackle/commands/serve.py | 78 ++- unshackle/core/__init__.py | 2 +- unshackle/core/api/__init__.py | 3 + unshackle/core/api/download_manager.py | 630 ++++++++++++++++++++++++ unshackle/core/api/download_worker.py | 84 ++++ unshackle/core/api/handlers.py | 653 +++++++++++++++++++++++++ unshackle/core/api/routes.py | 375 ++++++++++++++ unshackle/core/titles/episode.py | 12 +- unshackle/core/titles/movie.py | 12 +- unshackle/core/tracks/audio.py | 5 + unshackle/core/tracks/tracks.py | 7 +- unshackle/unshackle-example.yaml | 1 + uv.lock | 48 ++ 15 files changed, 1902 insertions(+), 12 deletions(-) create mode 100644 unshackle/core/api/__init__.py create mode 100644 unshackle/core/api/download_manager.py create mode 100644 unshackle/core/api/download_worker.py create mode 100644 unshackle/core/api/handlers.py create mode 100644 unshackle/core/api/routes.py diff --git a/CONFIG.md b/CONFIG.md index 880370a..5a5aae8 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -547,9 +547,12 @@ Configuration data for pywidevine's serve functionality run through unshackle. This effectively allows you to run `unshackle serve` to start serving pywidevine Serve-compliant CDMs right from your local widevine device files. +- `api_secret` - Secret key for REST API authentication. When set, enables the REST API server alongside the CDM serve functionality. This key is required for authenticating API requests. + For example, ```yaml +api_secret: "your-secret-key-here" users: secret_key_for_jane: # 32bit hex recommended, case-sensitive devices: # list of allowed devices for this user diff --git a/pyproject.toml b/pyproject.toml index 1b872a7..cfa5d0a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ dependencies = [ "httpx>=0.28.1,<0.29", "cryptography>=45.0.0", "subby", + "aiohttp-swagger3>=0.9.0,<1", ] [project.urls] diff --git a/unshackle/commands/serve.py b/unshackle/commands/serve.py index 85c9739..eaad5fe 100644 --- a/unshackle/commands/serve.py +++ b/unshackle/commands/serve.py @@ -1,19 +1,26 @@ +import logging import subprocess import click +from aiohttp import web from unshackle.core import binaries +from unshackle.core.api import setup_routes, setup_swagger from unshackle.core.config import config from unshackle.core.constants import context_settings -@click.command(short_help="Serve your Local Widevine Devices for Remote Access.", context_settings=context_settings) +@click.command( + short_help="Serve your Local Widevine Devices and REST API for Remote Access.", context_settings=context_settings +) @click.option("-h", "--host", type=str, default="0.0.0.0", help="Host to serve from.") @click.option("-p", "--port", type=int, default=8786, help="Port to serve from.") @click.option("--caddy", is_flag=True, default=False, help="Also serve with Caddy.") -def serve(host: str, port: int, caddy: bool) -> None: +@click.option("--api-only", is_flag=True, default=False, help="Serve only the REST API, not pywidevine CDM.") +@click.option("--no-key", is_flag=True, default=False, help="Disable API key authentication (allows all requests).") +def serve(host: str, port: int, caddy: bool, api_only: bool, no_key: bool) -> None: """ - Serve your Local Widevine Devices for Remote Access. + Serve your Local Widevine Devices and REST API for Remote Access. \b Host as 127.0.0.1 may block remote access even if port-forwarded. @@ -23,8 +30,25 @@ def serve(host: str, port: int, caddy: bool) -> None: You may serve with Caddy at the same time with --caddy. You can use Caddy as a reverse-proxy to serve with HTTPS. The config used will be the Caddyfile next to the unshackle config. + + \b + The REST API provides programmatic access to unshackle functionality. + Configure authentication in your config under serve.users and serve.api_secret. """ - from pywidevine import serve + from pywidevine import serve as pywidevine_serve + + log = logging.getLogger("serve") + + # Validate API secret for REST API routes (unless --no-key is used) + if not no_key: + api_secret = config.serve.get("api_secret") + if not api_secret: + raise click.ClickException( + "API secret key is not configured. Please add 'api_secret' to the 'serve' section in your config." + ) + else: + api_secret = None + log.warning("Running with --no-key: Authentication is DISABLED for all API endpoints!") if caddy: if not binaries.Caddy: @@ -39,7 +63,51 @@ def serve(host: str, port: int, caddy: bool) -> None: if not config.serve.get("devices"): config.serve["devices"] = [] config.serve["devices"].extend(list(config.directories.wvds.glob("*.wvd"))) - serve.run(config.serve, host, port) + + if api_only: + # API-only mode: serve just the REST API + log.info("Starting REST API server (pywidevine CDM disabled)") + if no_key: + app = web.Application() + app["config"] = {"users": []} + else: + app = web.Application(middlewares=[pywidevine_serve.authentication]) + app["config"] = {"users": [api_secret]} + setup_routes(app) + setup_swagger(app) + log.info(f"REST API endpoints available at http://{host}:{port}/api/") + log.info(f"Swagger UI available at http://{host}:{port}/api/docs/") + log.info("(Press CTRL+C to quit)") + web.run_app(app, host=host, port=port, print=None) + else: + # Integrated mode: serve both pywidevine + REST API + log.info("Starting integrated server (pywidevine CDM + REST API)") + + # Create integrated app with both pywidevine and API routes + if no_key: + app = web.Application() + app["config"] = dict(config.serve) + app["config"]["users"] = [] + else: + app = web.Application(middlewares=[pywidevine_serve.authentication]) + # Setup config - add API secret to users for authentication + serve_config = dict(config.serve) + if not serve_config.get("users"): + serve_config["users"] = [] + if api_secret not in serve_config["users"]: + serve_config["users"].append(api_secret) + app["config"] = serve_config + + app.on_startup.append(pywidevine_serve._startup) + app.on_cleanup.append(pywidevine_serve._cleanup) + app.add_routes(pywidevine_serve.routes) + setup_routes(app) + setup_swagger(app) + + log.info(f"REST API endpoints available at http://{host}:{port}/api/") + log.info(f"Swagger UI available at http://{host}:{port}/api/docs/") + log.info("(Press CTRL+C to quit)") + web.run_app(app, host=host, port=port, print=None) finally: if caddy_p: caddy_p.kill() diff --git a/unshackle/core/__init__.py b/unshackle/core/__init__.py index ac329c9..8c0d5d5 100644 --- a/unshackle/core/__init__.py +++ b/unshackle/core/__init__.py @@ -1 +1 @@ -__version__ = "1.4.7" +__version__ = "2.0.0" diff --git a/unshackle/core/api/__init__.py b/unshackle/core/api/__init__.py new file mode 100644 index 0000000..1fa2c9b --- /dev/null +++ b/unshackle/core/api/__init__.py @@ -0,0 +1,3 @@ +from unshackle.core.api.routes import setup_routes, setup_swagger + +__all__ = ["setup_routes", "setup_swagger"] diff --git a/unshackle/core/api/download_manager.py b/unshackle/core/api/download_manager.py new file mode 100644 index 0000000..7e2f0a4 --- /dev/null +++ b/unshackle/core/api/download_manager.py @@ -0,0 +1,630 @@ +import asyncio +import json +import logging +import os +import sys +import tempfile +import threading +import uuid +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Callable, Dict, List, Optional +from datetime import datetime, timedelta +from contextlib import suppress + +log = logging.getLogger("download_manager") + + +class JobStatus(Enum): + QUEUED = "queued" + DOWNLOADING = "downloading" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +@dataclass +class DownloadJob: + """Represents a download job with all its parameters and status.""" + + job_id: str + status: JobStatus + created_time: datetime + service: str + title_id: str + parameters: Dict[str, Any] + + # Progress tracking + started_time: Optional[datetime] = None + completed_time: Optional[datetime] = None + progress: float = 0.0 + + # Results and error info + output_files: List[str] = field(default_factory=list) + error_message: Optional[str] = None + error_details: Optional[str] = None + + # Cancellation support + cancel_event: threading.Event = field(default_factory=threading.Event) + + def to_dict(self, include_full_details: bool = False) -> Dict[str, Any]: + """Convert job to dictionary for JSON response.""" + result = { + "job_id": self.job_id, + "status": self.status.value, + "created_time": self.created_time.isoformat(), + "service": self.service, + "title_id": self.title_id, + "progress": self.progress, + } + + if include_full_details: + result.update( + { + "parameters": self.parameters, + "started_time": self.started_time.isoformat() if self.started_time else None, + "completed_time": self.completed_time.isoformat() if self.completed_time else None, + "output_files": self.output_files, + "error_message": self.error_message, + "error_details": self.error_details, + } + ) + + return result + + +def _perform_download( + job_id: str, + service: str, + title_id: str, + params: Dict[str, Any], + cancel_event: Optional[threading.Event] = None, + progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None, +) -> List[str]: + """Execute the synchronous download logic for a job.""" + + def _check_cancel(stage: str): + if cancel_event and cancel_event.is_set(): + raise Exception(f"Job was cancelled {stage}") + + from io import StringIO + from contextlib import redirect_stdout, redirect_stderr + + _check_cancel("before execution started") + + # Import dl.py components lazily to avoid circular deps during module import + import click + import yaml + from unshackle.commands.dl import dl + from unshackle.core.config import config + from unshackle.core.services import Services + from unshackle.core.utils.click_types import ContextData + from unshackle.core.utils.collections import merge_dict + + log.info(f"Starting sync download for job {job_id}") + + # Load service configuration + service_config_path = Services.get_path(service) / config.filenames.config + if service_config_path.exists(): + service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8")) + else: + service_config = {} + merge_dict(config.services.get(service), service_config) + + from unshackle.commands.dl import dl as dl_command + + ctx = click.Context(dl_command.cli) + ctx.invoked_subcommand = service + ctx.obj = ContextData(config=service_config, cdm=None, proxy_providers=[], profile=params.get("profile")) + ctx.params = { + "proxy": params.get("proxy"), + "no_proxy": params.get("no_proxy", False), + "profile": params.get("profile"), + "tag": params.get("tag"), + "tmdb_id": params.get("tmdb_id"), + "tmdb_name": params.get("tmdb_name", False), + "tmdb_year": params.get("tmdb_year", False), + } + + dl_instance = dl( + ctx=ctx, + no_proxy=params.get("no_proxy", False), + profile=params.get("profile"), + proxy=params.get("proxy"), + tag=params.get("tag"), + tmdb_id=params.get("tmdb_id"), + tmdb_name=params.get("tmdb_name", False), + tmdb_year=params.get("tmdb_year", False), + ) + + service_module = Services.load(service) + + _check_cancel("before service instantiation") + + try: + import inspect + + service_init_params = inspect.signature(service_module.__init__).parameters + + service_ctx = click.Context(click.Command(service)) + service_ctx.parent = ctx + service_ctx.obj = ctx.obj + + service_kwargs = {} + + if "title" in service_init_params: + service_kwargs["title"] = title_id + + for key, value in params.items(): + if key in service_init_params and key not in ["service", "title_id"]: + service_kwargs[key] = value + + for param_name, param_info in service_init_params.items(): + if param_name not in service_kwargs and param_name not in ["self", "ctx"]: + if param_info.default is inspect.Parameter.empty: + if param_name == "movie": + service_kwargs[param_name] = "/movies/" in title_id + elif param_name == "meta_lang": + service_kwargs[param_name] = None + else: + log.warning(f"Unknown required parameter '{param_name}' for service {service}, using None") + service_kwargs[param_name] = None + + service_instance = service_module(service_ctx, **service_kwargs) + + except Exception as exc: # noqa: BLE001 - propagate meaningful failure + log.error(f"Failed to create service instance: {exc}") + raise + + original_download_dir = config.directories.downloads + + _check_cancel("before download execution") + + stdout_capture = StringIO() + stderr_capture = StringIO() + + # Simple progress tracking if callback provided + if progress_callback: + # Report initial progress + progress_callback({"progress": 0.0, "status": "starting"}) + + # Simple approach: report progress at key points + original_result = dl_instance.result + + def result_with_progress(*args, **kwargs): + try: + # Report that download started + progress_callback({"progress": 5.0, "status": "downloading"}) + + # Call original method + result = original_result(*args, **kwargs) + + # Report completion + progress_callback({"progress": 100.0, "status": "completed"}) + return result + except Exception as e: + progress_callback({"progress": 0.0, "status": "failed", "error": str(e)}) + raise + + dl_instance.result = result_with_progress + + try: + with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): + dl_instance.result( + service=service_instance, + quality=params.get("quality", []), + vcodec=params.get("vcodec"), + acodec=params.get("acodec"), + vbitrate=params.get("vbitrate"), + abitrate=params.get("abitrate"), + range_=params.get("range", []), + channels=params.get("channels"), + no_atmos=params.get("no_atmos", False), + wanted=params.get("wanted", []), + lang=params.get("lang", ["orig"]), + v_lang=params.get("v_lang", []), + a_lang=params.get("a_lang", []), + s_lang=params.get("s_lang", ["all"]), + require_subs=params.get("require_subs", []), + forced_subs=params.get("forced_subs", False), + sub_format=params.get("sub_format"), + video_only=params.get("video_only", False), + audio_only=params.get("audio_only", False), + subs_only=params.get("subs_only", False), + chapters_only=params.get("chapters_only", False), + no_subs=params.get("no_subs", False), + no_audio=params.get("no_audio", False), + no_chapters=params.get("no_chapters", False), + slow=params.get("slow", False), + list_=False, + list_titles=False, + skip_dl=params.get("skip_dl", False), + export=params.get("export"), + cdm_only=params.get("cdm_only"), + no_proxy=params.get("no_proxy", False), + no_folder=params.get("no_folder", False), + no_source=params.get("no_source", False), + workers=params.get("workers"), + downloads=params.get("downloads", 1), + best_available=params.get("best_available", False), + ) + + except SystemExit as exc: + if exc.code != 0: + stdout_str = stdout_capture.getvalue() + stderr_str = stderr_capture.getvalue() + log.error(f"Download exited with code {exc.code}") + log.error(f"Stdout: {stdout_str}") + log.error(f"Stderr: {stderr_str}") + raise Exception(f"Download failed with exit code {exc.code}") + + except Exception as exc: # noqa: BLE001 - propagate to caller + stdout_str = stdout_capture.getvalue() + stderr_str = stderr_capture.getvalue() + log.error(f"Download execution failed: {exc}") + log.error(f"Stdout: {stdout_str}") + log.error(f"Stderr: {stderr_str}") + raise + + log.info(f"Download completed for job {job_id}, files in {original_download_dir}") + + return [] + + +class DownloadQueueManager: + """Manages download job queue with configurable concurrency limits.""" + + def __init__(self, max_concurrent_downloads: int = 2, job_retention_hours: int = 24): + self.max_concurrent_downloads = max_concurrent_downloads + self.job_retention_hours = job_retention_hours + + self._jobs: Dict[str, DownloadJob] = {} + self._job_queue: asyncio.Queue = asyncio.Queue() + self._active_downloads: Dict[str, asyncio.Task] = {} + self._download_processes: Dict[str, asyncio.subprocess.Process] = {} + self._job_temp_files: Dict[str, Dict[str, str]] = {} + self._workers_started = False + self._shutdown_event = asyncio.Event() + + log.info( + f"Initialized download queue manager: max_concurrent={max_concurrent_downloads}, retention_hours={job_retention_hours}" + ) + + def create_job(self, service: str, title_id: str, **parameters) -> DownloadJob: + """Create a new download job and add it to the queue.""" + job_id = str(uuid.uuid4()) + job = DownloadJob( + job_id=job_id, + status=JobStatus.QUEUED, + created_time=datetime.now(), + service=service, + title_id=title_id, + parameters=parameters, + ) + + self._jobs[job_id] = job + self._job_queue.put_nowait(job) + + log.info(f"Created download job {job_id} for {service}:{title_id}") + return job + + def get_job(self, job_id: str) -> Optional[DownloadJob]: + """Get job by ID.""" + return self._jobs.get(job_id) + + def list_jobs(self) -> List[DownloadJob]: + """List all jobs.""" + return list(self._jobs.values()) + + def cancel_job(self, job_id: str) -> bool: + """Cancel a job if it's queued or downloading.""" + job = self._jobs.get(job_id) + if not job: + return False + + if job.status == JobStatus.QUEUED: + job.status = JobStatus.CANCELLED + job.cancel_event.set() # Signal cancellation + log.info(f"Cancelled queued job {job_id}") + return True + elif job.status == JobStatus.DOWNLOADING: + # Set the cancellation event first - this will be checked by the download thread + job.cancel_event.set() + job.status = JobStatus.CANCELLED + log.info(f"Signaled cancellation for downloading job {job_id}") + + # Cancel the active download task + task = self._active_downloads.get(job_id) + if task: + task.cancel() + log.info(f"Cancelled download task for job {job_id}") + + process = self._download_processes.get(job_id) + if process: + try: + process.terminate() + log.info(f"Terminated worker process for job {job_id}") + except ProcessLookupError: + log.debug(f"Worker process for job {job_id} already exited") + + return True + + return False + + def cleanup_old_jobs(self) -> int: + """Remove jobs older than retention period.""" + cutoff_time = datetime.now() - timedelta(hours=self.job_retention_hours) + jobs_to_remove = [] + + for job_id, job in self._jobs.items(): + if job.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: + if job.completed_time and job.completed_time < cutoff_time: + jobs_to_remove.append(job_id) + elif not job.completed_time and job.created_time < cutoff_time: + jobs_to_remove.append(job_id) + + for job_id in jobs_to_remove: + del self._jobs[job_id] + + if jobs_to_remove: + log.info(f"Cleaned up {len(jobs_to_remove)} old jobs") + + return len(jobs_to_remove) + + async def start_workers(self): + """Start worker tasks to process the download queue.""" + if self._workers_started: + return + + self._workers_started = True + + # Start worker tasks + for i in range(self.max_concurrent_downloads): + asyncio.create_task(self._download_worker(f"worker-{i}")) + + # Start cleanup task + asyncio.create_task(self._cleanup_worker()) + + log.info(f"Started {self.max_concurrent_downloads} download workers") + + async def shutdown(self): + """Shutdown the queue manager and cancel all active downloads.""" + log.info("Shutting down download queue manager") + self._shutdown_event.set() + + # Cancel all active downloads + for task in self._active_downloads.values(): + task.cancel() + + # Terminate worker processes + for job_id, process in list(self._download_processes.items()): + try: + process.terminate() + except ProcessLookupError: + log.debug(f"Worker process for job {job_id} already exited during shutdown") + + for job_id, process in list(self._download_processes.items()): + try: + await asyncio.wait_for(process.wait(), timeout=5) + except asyncio.TimeoutError: + log.warning(f"Worker process for job {job_id} did not exit, killing") + process.kill() + await process.wait() + finally: + self._download_processes.pop(job_id, None) + + # Clean up any remaining temp files + for paths in self._job_temp_files.values(): + for path in paths.values(): + try: + os.remove(path) + except OSError: + pass + self._job_temp_files.clear() + + # Wait for workers to finish + if self._active_downloads: + await asyncio.gather(*self._active_downloads.values(), return_exceptions=True) + + async def _download_worker(self, worker_name: str): + """Worker task that processes jobs from the queue.""" + log.debug(f"Download worker {worker_name} started") + + while not self._shutdown_event.is_set(): + try: + # Wait for a job or shutdown signal + job = await asyncio.wait_for(self._job_queue.get(), timeout=1.0) + + if job.status == JobStatus.CANCELLED: + continue + + # Start processing the job + job.status = JobStatus.DOWNLOADING + job.started_time = datetime.now() + + log.info(f"Worker {worker_name} starting job {job.job_id}") + + # Create download task + download_task = asyncio.create_task(self._execute_download(job)) + self._active_downloads[job.job_id] = download_task + + try: + await download_task + except asyncio.CancelledError: + job.status = JobStatus.CANCELLED + log.info(f"Job {job.job_id} was cancelled") + except Exception as e: + job.status = JobStatus.FAILED + job.error_message = str(e) + log.error(f"Job {job.job_id} failed: {e}") + finally: + job.completed_time = datetime.now() + if job.job_id in self._active_downloads: + del self._active_downloads[job.job_id] + + except asyncio.TimeoutError: + continue + except Exception as e: + log.error(f"Worker {worker_name} error: {e}") + + async def _execute_download(self, job: DownloadJob): + """Execute the actual download for a job.""" + log.info(f"Executing download for job {job.job_id}") + + try: + output_files = await self._run_download_async(job) + job.status = JobStatus.COMPLETED + job.output_files = output_files + job.progress = 100.0 + log.info(f"Download completed for job {job.job_id}: {len(output_files)} files") + except Exception as e: + job.status = JobStatus.FAILED + job.error_message = str(e) + job.error_details = str(e) + log.error(f"Download failed for job {job.job_id}: {e}") + raise + + async def _run_download_async(self, job: DownloadJob) -> List[str]: + """Invoke a worker subprocess to execute the download.""" + + payload = { + "job_id": job.job_id, + "service": job.service, + "title_id": job.title_id, + "parameters": job.parameters, + } + + payload_fd, payload_path = tempfile.mkstemp(prefix=f"unshackle_job_{job.job_id}_", suffix="_payload.json") + os.close(payload_fd) + result_fd, result_path = tempfile.mkstemp(prefix=f"unshackle_job_{job.job_id}_", suffix="_result.json") + os.close(result_fd) + progress_fd, progress_path = tempfile.mkstemp(prefix=f"unshackle_job_{job.job_id}_", suffix="_progress.json") + os.close(progress_fd) + + with open(payload_path, "w", encoding="utf-8") as handle: + json.dump(payload, handle) + + process = await asyncio.create_subprocess_exec( + sys.executable, + "-m", + "unshackle.core.api.download_worker", + payload_path, + result_path, + progress_path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + self._download_processes[job.job_id] = process + self._job_temp_files[job.job_id] = {"payload": payload_path, "result": result_path, "progress": progress_path} + + communicate_task = asyncio.create_task(process.communicate()) + + stdout_bytes = b"" + stderr_bytes = b"" + + try: + while True: + done, _ = await asyncio.wait({communicate_task}, timeout=0.5) + if communicate_task in done: + stdout_bytes, stderr_bytes = communicate_task.result() + break + + # Check for progress updates + try: + if os.path.exists(progress_path): + with open(progress_path, "r", encoding="utf-8") as handle: + progress_data = json.load(handle) + if "progress" in progress_data: + new_progress = float(progress_data["progress"]) + if new_progress != job.progress: + job.progress = new_progress + log.info(f"Job {job.job_id} progress updated: {job.progress}%") + except (FileNotFoundError, json.JSONDecodeError, ValueError) as e: + log.debug(f"Could not read progress for job {job.job_id}: {e}") + + if job.cancel_event.is_set() or job.status == JobStatus.CANCELLED: + log.info(f"Cancellation detected for job {job.job_id}, terminating worker process") + process.terminate() + try: + await asyncio.wait_for(communicate_task, timeout=5) + except asyncio.TimeoutError: + log.warning(f"Worker process for job {job.job_id} did not terminate, killing") + process.kill() + await asyncio.wait_for(communicate_task, timeout=5) + raise asyncio.CancelledError("Job was cancelled") + + returncode = process.returncode + stdout = stdout_bytes.decode("utf-8", errors="ignore") + stderr = stderr_bytes.decode("utf-8", errors="ignore") + + if stdout.strip(): + log.debug(f"Worker stdout for job {job.job_id}: {stdout.strip()}") + if stderr.strip(): + log.warning(f"Worker stderr for job {job.job_id}: {stderr.strip()}") + + result_data: Optional[Dict[str, Any]] = None + try: + with open(result_path, "r", encoding="utf-8") as handle: + result_data = json.load(handle) + except FileNotFoundError: + log.error(f"Result file missing for job {job.job_id}") + except json.JSONDecodeError as exc: + log.error(f"Failed to parse worker result for job {job.job_id}: {exc}") + + if returncode != 0: + message = result_data.get("message") if result_data else "unknown error" + raise Exception(f"Worker exited with code {returncode}: {message}") + + if not result_data or result_data.get("status") != "success": + message = result_data.get("message") if result_data else "worker did not report success" + raise Exception(f"Worker failure: {message}") + + return result_data.get("output_files", []) + + finally: + if not communicate_task.done(): + communicate_task.cancel() + with suppress(asyncio.CancelledError): + await communicate_task + + self._download_processes.pop(job.job_id, None) + + temp_paths = self._job_temp_files.pop(job.job_id, {}) + for path in temp_paths.values(): + try: + os.remove(path) + except OSError: + pass + + def _execute_download_sync(self, job: DownloadJob) -> List[str]: + """Execute download synchronously using existing dl.py logic.""" + return _perform_download(job.job_id, job.service, job.title_id, job.parameters.copy(), job.cancel_event) + + async def _cleanup_worker(self): + """Worker that periodically cleans up old jobs.""" + while not self._shutdown_event.is_set(): + try: + await asyncio.sleep(3600) # Run every hour + self.cleanup_old_jobs() + except Exception as e: + log.error(f"Cleanup worker error: {e}") + + +# Global instance +download_manager: Optional[DownloadQueueManager] = None + + +def get_download_manager() -> DownloadQueueManager: + """Get the global download manager instance.""" + global download_manager + if download_manager is None: + # Load configuration from unshackle config + from unshackle.core.config import config + + max_concurrent = getattr(config, "max_concurrent_downloads", 2) + retention_hours = getattr(config, "download_job_retention_hours", 24) + + download_manager = DownloadQueueManager(max_concurrent, retention_hours) + + return download_manager diff --git a/unshackle/core/api/download_worker.py b/unshackle/core/api/download_worker.py new file mode 100644 index 0000000..08810d4 --- /dev/null +++ b/unshackle/core/api/download_worker.py @@ -0,0 +1,84 @@ +"""Standalone worker process entry point for executing download jobs.""" + +from __future__ import annotations + +import json +import logging +import sys +import traceback +from pathlib import Path +from typing import Any, Dict + +from .download_manager import _perform_download + +log = logging.getLogger("download_worker") + + +def _read_payload(path: Path) -> Dict[str, Any]: + with path.open("r", encoding="utf-8") as handle: + return json.load(handle) + + +def _write_result(path: Path, payload: Dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as handle: + json.dump(payload, handle) + + +def main(argv: list[str]) -> int: + if len(argv) not in [3, 4]: + print( + "Usage: python -m unshackle.core.api.download_worker [progress_path]", + file=sys.stderr, + ) + return 2 + + payload_path = Path(argv[1]) + result_path = Path(argv[2]) + progress_path = Path(argv[3]) if len(argv) > 3 else None + + result: Dict[str, Any] = {} + exit_code = 0 + + try: + payload = _read_payload(payload_path) + job_id = payload["job_id"] + service = payload["service"] + title_id = payload["title_id"] + params = payload.get("parameters", {}) + + log.info(f"Worker starting job {job_id} ({service}:{title_id})") + + def progress_callback(progress_data: Dict[str, Any]) -> None: + """Write progress updates to file for main process to read.""" + if progress_path: + try: + log.info(f"Writing progress update: {progress_data}") + _write_result(progress_path, progress_data) + log.info(f"Progress update written to {progress_path}") + except Exception as e: + log.error(f"Failed to write progress update: {e}") + + output_files = _perform_download( + job_id, service, title_id, params, cancel_event=None, progress_callback=progress_callback + ) + + result = {"status": "success", "output_files": output_files} + + except Exception as exc: # noqa: BLE001 - capture for parent process + exit_code = 1 + tb = traceback.format_exc() + log.error(f"Worker failed with error: {exc}") + result = {"status": "error", "message": str(exc), "traceback": tb} + + finally: + try: + _write_result(result_path, result) + except Exception as exc: # noqa: BLE001 - last resort logging + log.error(f"Failed to write worker result file: {exc}") + + return exit_code + + +if __name__ == "__main__": + sys.exit(main(sys.argv)) diff --git a/unshackle/core/api/handlers.py b/unshackle/core/api/handlers.py new file mode 100644 index 0000000..60261a6 --- /dev/null +++ b/unshackle/core/api/handlers.py @@ -0,0 +1,653 @@ +import logging +from typing import Any, Dict, List, Optional + +from aiohttp import web + +from unshackle.core.constants import AUDIO_CODEC_MAP, DYNAMIC_RANGE_MAP, VIDEO_CODEC_MAP +from unshackle.core.proxies.basic import Basic +from unshackle.core.proxies.hola import Hola +from unshackle.core.proxies.nordvpn import NordVPN +from unshackle.core.proxies.surfsharkvpn import SurfsharkVPN +from unshackle.core.services import Services +from unshackle.core.titles import Episode, Movie, Title_T +from unshackle.core.tracks import Audio, Subtitle, Video + +log = logging.getLogger("api") + + +def initialize_proxy_providers() -> List[Any]: + """Initialize and return available proxy providers.""" + proxy_providers = [] + try: + from unshackle.core import binaries + + # Load the main unshackle config to get proxy provider settings + from unshackle.core.config import config as main_config + + log.debug(f"Main config proxy providers: {getattr(main_config, 'proxy_providers', {})}") + log.debug(f"Available proxy provider configs: {list(getattr(main_config, 'proxy_providers', {}).keys())}") + + # Use main_config instead of the service-specific config for proxy providers + proxy_config = getattr(main_config, "proxy_providers", {}) + + if proxy_config.get("basic"): + log.debug("Loading Basic proxy provider") + proxy_providers.append(Basic(**proxy_config["basic"])) + if proxy_config.get("nordvpn"): + log.debug("Loading NordVPN proxy provider") + proxy_providers.append(NordVPN(**proxy_config["nordvpn"])) + if proxy_config.get("surfsharkvpn"): + log.debug("Loading SurfsharkVPN proxy provider") + proxy_providers.append(SurfsharkVPN(**proxy_config["surfsharkvpn"])) + if hasattr(binaries, "HolaProxy") and binaries.HolaProxy: + log.debug("Loading Hola proxy provider") + proxy_providers.append(Hola()) + + for proxy_provider in proxy_providers: + log.info(f"Loaded {proxy_provider.__class__.__name__}: {proxy_provider}") + + if not proxy_providers: + log.warning("No proxy providers were loaded. Check your proxy provider configuration in unshackle.yaml") + + except Exception as e: + log.warning(f"Failed to initialize some proxy providers: {e}") + + return proxy_providers + + +def resolve_proxy(proxy: str, proxy_providers: List[Any]) -> str: + """Resolve proxy parameter to actual proxy URI.""" + import re + + if not proxy: + return proxy + + # Check if explicit proxy URI + if re.match(r"^https?://", proxy): + return proxy + + # Handle provider:country format (e.g., "nordvpn:us") + requested_provider = None + if re.match(r"^[a-z]+:.+$", proxy, re.IGNORECASE): + requested_provider, proxy = proxy.split(":", maxsplit=1) + + # Handle country code format (e.g., "us", "uk") + if re.match(r"^[a-z]{2}(?:\d+)?$", proxy, re.IGNORECASE): + proxy = proxy.lower() + + if requested_provider: + # Find specific provider (case-insensitive matching) + proxy_provider = next( + (x for x in proxy_providers if x.__class__.__name__.lower() == requested_provider.lower()), + None, + ) + if not proxy_provider: + available_providers = [x.__class__.__name__ for x in proxy_providers] + raise ValueError( + f"The proxy provider '{requested_provider}' was not recognized. Available providers: {available_providers}" + ) + + proxy_uri = proxy_provider.get_proxy(proxy) + if not proxy_uri: + raise ValueError(f"The proxy provider {requested_provider} had no proxy for {proxy}") + + log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy_uri}") + return proxy_uri + else: + # Try all providers + for proxy_provider in proxy_providers: + proxy_uri = proxy_provider.get_proxy(proxy) + if proxy_uri: + log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy_uri}") + return proxy_uri + + raise ValueError(f"No proxy provider had a proxy for {proxy}") + + # Return as-is if not recognized format + log.info(f"Using explicit Proxy: {proxy}") + return proxy + + +def validate_service(service_tag: str) -> Optional[str]: + """Validate and normalize service tag.""" + try: + normalized = Services.get_tag(service_tag) + service_path = Services.get_path(normalized) + if not service_path.exists(): + return None + return normalized + except Exception: + return None + + +def serialize_title(title: Title_T) -> Dict[str, Any]: + """Convert a title object to JSON-serializable dict.""" + if isinstance(title, Episode): + episode_name = title.name if title.name else f"Episode {title.number:02d}" + result = { + "type": "episode", + "name": episode_name, + "series_title": str(title.title), + "season": title.season, + "number": title.number, + "year": title.year, + "id": str(title.id) if hasattr(title, "id") else None, + } + elif isinstance(title, Movie): + result = { + "type": "movie", + "name": str(title.name) if hasattr(title, "name") else str(title), + "year": title.year, + "id": str(title.id) if hasattr(title, "id") else None, + } + else: + result = { + "type": "other", + "name": str(title.name) if hasattr(title, "name") else str(title), + "id": str(title.id) if hasattr(title, "id") else None, + } + + return result + + +def serialize_video_track(track: Video) -> Dict[str, Any]: + """Convert video track to JSON-serializable dict.""" + codec_name = track.codec.name if hasattr(track.codec, "name") else str(track.codec) + range_name = track.range.name if hasattr(track.range, "name") else str(track.range) + + return { + "id": str(track.id), + "codec": codec_name, + "codec_display": VIDEO_CODEC_MAP.get(codec_name, codec_name), + "bitrate": int(track.bitrate / 1000) if track.bitrate else None, + "width": track.width, + "height": track.height, + "resolution": f"{track.width}x{track.height}" if track.width and track.height else None, + "fps": track.fps if track.fps else None, + "range": range_name, + "range_display": DYNAMIC_RANGE_MAP.get(range_name, range_name), + "language": str(track.language) if track.language else None, + "drm": str(track.drm) if hasattr(track, "drm") and track.drm else None, + } + + +def serialize_audio_track(track: Audio) -> Dict[str, Any]: + """Convert audio track to JSON-serializable dict.""" + codec_name = track.codec.name if hasattr(track.codec, "name") else str(track.codec) + + return { + "id": str(track.id), + "codec": codec_name, + "codec_display": AUDIO_CODEC_MAP.get(codec_name, codec_name), + "bitrate": int(track.bitrate / 1000) if track.bitrate else None, + "channels": track.channels if track.channels else None, + "language": str(track.language) if track.language else None, + "atmos": track.atmos if hasattr(track, "atmos") else False, + "descriptive": track.descriptive if hasattr(track, "descriptive") else False, + "drm": str(track.drm) if hasattr(track, "drm") and track.drm else None, + } + + +def serialize_subtitle_track(track: Subtitle) -> Dict[str, Any]: + """Convert subtitle track to JSON-serializable dict.""" + return { + "id": str(track.id), + "codec": track.codec.name if hasattr(track.codec, "name") else str(track.codec), + "language": str(track.language) if track.language else None, + "forced": track.forced if hasattr(track, "forced") else False, + "sdh": track.sdh if hasattr(track, "sdh") else False, + "cc": track.cc if hasattr(track, "cc") else False, + } + + +async def list_titles_handler(data: Dict[str, Any]) -> web.Response: + """Handle list-titles request.""" + service_tag = data.get("service") + title_id = data.get("title_id") + profile = data.get("profile") + + if not service_tag: + return web.json_response({"status": "error", "message": "Missing required parameter: service"}, status=400) + + if not title_id: + return web.json_response({"status": "error", "message": "Missing required parameter: title_id"}, status=400) + + normalized_service = validate_service(service_tag) + if not normalized_service: + return web.json_response( + {"status": "error", "message": f"Invalid or unavailable service: {service_tag}"}, status=400 + ) + + try: + import inspect + + import click + import yaml + + from unshackle.commands.dl import dl + from unshackle.core.config import config + from unshackle.core.utils.click_types import ContextData + from unshackle.core.utils.collections import merge_dict + + service_config_path = Services.get_path(normalized_service) / config.filenames.config + if service_config_path.exists(): + service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8")) + else: + service_config = {} + merge_dict(config.services.get(normalized_service), service_config) + + @click.command() + @click.pass_context + def dummy_service(ctx: click.Context) -> None: + pass + + # Handle proxy configuration + proxy_param = data.get("proxy") + no_proxy = data.get("no_proxy", False) + proxy_providers = [] + + if not no_proxy: + proxy_providers = initialize_proxy_providers() + + if proxy_param and not no_proxy: + try: + resolved_proxy = resolve_proxy(proxy_param, proxy_providers) + proxy_param = resolved_proxy + except ValueError as e: + return web.json_response({"status": "error", "message": f"Proxy error: {e}"}, status=400) + + ctx = click.Context(dummy_service) + ctx.obj = ContextData(config=service_config, cdm=None, proxy_providers=proxy_providers, profile=profile) + ctx.params = {"proxy": proxy_param, "no_proxy": no_proxy} + + service_module = Services.load(normalized_service) + + dummy_service.name = normalized_service + dummy_service.params = [click.Argument([title_id], type=str)] + ctx.invoked_subcommand = normalized_service + + service_ctx = click.Context(dummy_service, parent=ctx) + service_ctx.obj = ctx.obj + + service_kwargs = {"title": title_id} + + # Add additional parameters from request data + for key, value in data.items(): + if key not in ["service", "title_id", "profile", "season", "episode", "wanted", "proxy", "no_proxy"]: + service_kwargs[key] = value + + # Get service parameter info and click command defaults + service_init_params = inspect.signature(service_module.__init__).parameters + + # Extract default values from the click command + if hasattr(service_module, "cli") and hasattr(service_module.cli, "params"): + for param in service_module.cli.params: + if hasattr(param, "name") and param.name not in service_kwargs: + # Add default value if parameter is not already provided + if hasattr(param, "default") and param.default is not None: + service_kwargs[param.name] = param.default + + # Handle required parameters that don't have click defaults + for param_name, param_info in service_init_params.items(): + if param_name not in service_kwargs and param_name not in ["self", "ctx"]: + # Check if parameter is required (no default value in signature) + if param_info.default is inspect.Parameter.empty: + # Provide sensible defaults for common required parameters + if param_name == "meta_lang": + service_kwargs[param_name] = None + elif param_name == "movie": + service_kwargs[param_name] = False + else: + # Log warning for unknown required parameters + log.warning(f"Unknown required parameter '{param_name}' for service {normalized_service}") + + # Filter out any parameters that the service doesn't accept + filtered_kwargs = {} + for key, value in service_kwargs.items(): + if key in service_init_params: + filtered_kwargs[key] = value + + service_instance = service_module(service_ctx, **filtered_kwargs) + + cookies = dl.get_cookie_jar(normalized_service, profile) + credential = dl.get_credentials(normalized_service, profile) + service_instance.authenticate(cookies, credential) + + titles = service_instance.get_titles() + + if hasattr(titles, "__iter__") and not isinstance(titles, str): + title_list = [serialize_title(t) for t in titles] + else: + title_list = [serialize_title(titles)] + + return web.json_response({"titles": title_list}) + + except Exception as e: + log.exception("Error listing titles") + return web.json_response({"status": "error", "message": str(e)}, status=500) + + +async def list_tracks_handler(data: Dict[str, Any]) -> web.Response: + """Handle list-tracks request.""" + service_tag = data.get("service") + title_id = data.get("title_id") + profile = data.get("profile") + + if not service_tag: + return web.json_response({"status": "error", "message": "Missing required parameter: service"}, status=400) + + if not title_id: + return web.json_response({"status": "error", "message": "Missing required parameter: title_id"}, status=400) + + normalized_service = validate_service(service_tag) + if not normalized_service: + return web.json_response( + {"status": "error", "message": f"Invalid or unavailable service: {service_tag}"}, status=400 + ) + + try: + import inspect + + import click + import yaml + + from unshackle.commands.dl import dl + from unshackle.core.config import config + from unshackle.core.utils.click_types import ContextData + from unshackle.core.utils.collections import merge_dict + + service_config_path = Services.get_path(normalized_service) / config.filenames.config + if service_config_path.exists(): + service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8")) + else: + service_config = {} + merge_dict(config.services.get(normalized_service), service_config) + + @click.command() + @click.pass_context + def dummy_service(ctx: click.Context) -> None: + pass + + # Handle proxy configuration + proxy_param = data.get("proxy") + no_proxy = data.get("no_proxy", False) + proxy_providers = [] + + if not no_proxy: + proxy_providers = initialize_proxy_providers() + + if proxy_param and not no_proxy: + try: + resolved_proxy = resolve_proxy(proxy_param, proxy_providers) + proxy_param = resolved_proxy + except ValueError as e: + return web.json_response({"status": "error", "message": f"Proxy error: {e}"}, status=400) + + ctx = click.Context(dummy_service) + ctx.obj = ContextData(config=service_config, cdm=None, proxy_providers=proxy_providers, profile=profile) + ctx.params = {"proxy": proxy_param, "no_proxy": no_proxy} + + service_module = Services.load(normalized_service) + + dummy_service.name = normalized_service + dummy_service.params = [click.Argument([title_id], type=str)] + ctx.invoked_subcommand = normalized_service + + service_ctx = click.Context(dummy_service, parent=ctx) + service_ctx.obj = ctx.obj + + service_kwargs = {"title": title_id} + + # Add additional parameters from request data + for key, value in data.items(): + if key not in ["service", "title_id", "profile", "season", "episode", "wanted", "proxy", "no_proxy"]: + service_kwargs[key] = value + + # Get service parameter info and click command defaults + service_init_params = inspect.signature(service_module.__init__).parameters + + # Extract default values from the click command + if hasattr(service_module, "cli") and hasattr(service_module.cli, "params"): + for param in service_module.cli.params: + if hasattr(param, "name") and param.name not in service_kwargs: + # Add default value if parameter is not already provided + if hasattr(param, "default") and param.default is not None: + service_kwargs[param.name] = param.default + + # Handle required parameters that don't have click defaults + for param_name, param_info in service_init_params.items(): + if param_name not in service_kwargs and param_name not in ["self", "ctx"]: + # Check if parameter is required (no default value in signature) + if param_info.default is inspect.Parameter.empty: + # Provide sensible defaults for common required parameters + if param_name == "meta_lang": + service_kwargs[param_name] = None + elif param_name == "movie": + service_kwargs[param_name] = False + else: + # Log warning for unknown required parameters + log.warning(f"Unknown required parameter '{param_name}' for service {normalized_service}") + + # Filter out any parameters that the service doesn't accept + filtered_kwargs = {} + for key, value in service_kwargs.items(): + if key in service_init_params: + filtered_kwargs[key] = value + + service_instance = service_module(service_ctx, **filtered_kwargs) + + cookies = dl.get_cookie_jar(normalized_service, profile) + credential = dl.get_credentials(normalized_service, profile) + service_instance.authenticate(cookies, credential) + + titles = service_instance.get_titles() + + wanted_param = data.get("wanted") + season = data.get("season") + episode = data.get("episode") + + if hasattr(titles, "__iter__") and not isinstance(titles, str): + titles_list = list(titles) + + wanted = None + if wanted_param: + from unshackle.core.utils.click_types import SeasonRange + + try: + season_range = SeasonRange() + wanted = season_range.parse_tokens(wanted_param) + log.debug(f"Parsed wanted '{wanted_param}' into {len(wanted)} episodes: {wanted[:10]}...") + except Exception as e: + return web.json_response( + {"status": "error", "message": f"Invalid wanted parameter: {e}"}, status=400 + ) + elif season is not None and episode is not None: + wanted = [f"{season}x{episode}"] + + if wanted: + # Filter titles based on wanted episodes, similar to how dl.py does it + matching_titles = [] + log.debug(f"Filtering {len(titles_list)} titles with {len(wanted)} wanted episodes") + for title in titles_list: + if isinstance(title, Episode): + episode_key = f"{title.season}x{title.number}" + if episode_key in wanted: + log.debug(f"Episode {episode_key} matches wanted list") + matching_titles.append(title) + else: + log.debug(f"Episode {episode_key} not in wanted list") + else: + matching_titles.append(title) + + log.debug(f"Found {len(matching_titles)} matching titles") + + if not matching_titles: + return web.json_response( + {"status": "error", "message": "No episodes found matching wanted criteria"}, status=404 + ) + + # If multiple episodes match, return tracks for all episodes + if len(matching_titles) > 1 and all(isinstance(t, Episode) for t in matching_titles): + episodes_data = [] + failed_episodes = [] + + # Sort matching titles by season and episode number for consistent ordering + sorted_titles = sorted(matching_titles, key=lambda t: (t.season, t.number)) + + for title in sorted_titles: + try: + tracks = service_instance.get_tracks(title) + video_tracks = sorted(tracks.videos, key=lambda t: t.bitrate or 0, reverse=True) + audio_tracks = sorted(tracks.audio, key=lambda t: t.bitrate or 0, reverse=True) + + episode_data = { + "title": serialize_title(title), + "video": [serialize_video_track(t) for t in video_tracks], + "audio": [serialize_audio_track(t) for t in audio_tracks], + "subtitles": [serialize_subtitle_track(t) for t in tracks.subtitles], + } + episodes_data.append(episode_data) + log.debug(f"Successfully got tracks for {title.season}x{title.number}") + except SystemExit: + # Service calls sys.exit() for unavailable episodes - catch and skip + failed_episodes.append(f"S{title.season}E{title.number:02d}") + log.debug(f"Episode {title.season}x{title.number} not available, skipping") + continue + except Exception as e: + # Handle other errors gracefully + failed_episodes.append(f"S{title.season}E{title.number:02d}") + log.debug(f"Error getting tracks for {title.season}x{title.number}: {e}") + continue + + if episodes_data: + response = {"episodes": episodes_data} + if failed_episodes: + response["unavailable_episodes"] = failed_episodes + return web.json_response(response) + else: + return web.json_response( + { + "status": "error", + "message": f"No available episodes found. Unavailable: {', '.join(failed_episodes)}", + }, + status=404, + ) + else: + # Single episode or movie + first_title = matching_titles[0] + else: + first_title = titles_list[0] + else: + first_title = titles + + tracks = service_instance.get_tracks(first_title) + + video_tracks = sorted(tracks.videos, key=lambda t: t.bitrate or 0, reverse=True) + audio_tracks = sorted(tracks.audio, key=lambda t: t.bitrate or 0, reverse=True) + + response = { + "title": serialize_title(first_title), + "video": [serialize_video_track(t) for t in video_tracks], + "audio": [serialize_audio_track(t) for t in audio_tracks], + "subtitles": [serialize_subtitle_track(t) for t in tracks.subtitles], + } + + return web.json_response(response) + + except Exception as e: + log.exception("Error listing tracks") + return web.json_response({"status": "error", "message": str(e)}, status=500) + + +async def download_handler(data: Dict[str, Any]) -> web.Response: + """Handle download request - create and queue a download job.""" + from unshackle.core.api.download_manager import get_download_manager + + service_tag = data.get("service") + title_id = data.get("title_id") + + if not service_tag: + return web.json_response({"status": "error", "message": "Missing required parameter: service"}, status=400) + + if not title_id: + return web.json_response({"status": "error", "message": "Missing required parameter: title_id"}, status=400) + + normalized_service = validate_service(service_tag) + if not normalized_service: + return web.json_response( + {"status": "error", "message": f"Invalid or unavailable service: {service_tag}"}, status=400 + ) + + try: + # Get download manager and start workers if needed + manager = get_download_manager() + await manager.start_workers() + + # Create download job with filtered parameters (exclude service and title_id as they're already passed) + filtered_params = {k: v for k, v in data.items() if k not in ["service", "title_id"]} + job = manager.create_job(normalized_service, title_id, **filtered_params) + + return web.json_response( + {"job_id": job.job_id, "status": job.status.value, "created_time": job.created_time.isoformat()}, status=202 + ) + + except Exception as e: + log.exception("Error creating download job") + return web.json_response({"status": "error", "message": str(e)}, status=500) + + +async def list_download_jobs_handler(data: Dict[str, Any]) -> web.Response: + """Handle list download jobs request.""" + from unshackle.core.api.download_manager import get_download_manager + + try: + manager = get_download_manager() + jobs = manager.list_jobs() + + job_list = [job.to_dict(include_full_details=False) for job in jobs] + + return web.json_response({"jobs": job_list}) + + except Exception as e: + log.exception("Error listing download jobs") + return web.json_response({"status": "error", "message": str(e)}, status=500) + + +async def get_download_job_handler(job_id: str) -> web.Response: + """Handle get specific download job request.""" + from unshackle.core.api.download_manager import get_download_manager + + try: + manager = get_download_manager() + job = manager.get_job(job_id) + + if not job: + return web.json_response({"status": "error", "message": "Job not found"}, status=404) + + return web.json_response(job.to_dict(include_full_details=True)) + + except Exception as e: + log.exception(f"Error getting download job {job_id}") + return web.json_response({"status": "error", "message": str(e)}, status=500) + + +async def cancel_download_job_handler(job_id: str) -> web.Response: + """Handle cancel download job request.""" + from unshackle.core.api.download_manager import get_download_manager + + try: + manager = get_download_manager() + + if not manager.get_job(job_id): + return web.json_response({"status": "error", "message": "Job not found"}, status=404) + + success = manager.cancel_job(job_id) + + if success: + return web.json_response({"status": "success", "message": "Job cancelled"}) + else: + return web.json_response({"status": "error", "message": "Job cannot be cancelled"}, status=400) + + except Exception as e: + log.exception(f"Error cancelling download job {job_id}") + return web.json_response({"status": "error", "message": str(e)}, status=500) diff --git a/unshackle/core/api/routes.py b/unshackle/core/api/routes.py new file mode 100644 index 0000000..c8dfa7a --- /dev/null +++ b/unshackle/core/api/routes.py @@ -0,0 +1,375 @@ +import logging + +from aiohttp import web +from aiohttp_swagger3 import SwaggerDocs, SwaggerInfo, SwaggerUiSettings + +from unshackle.core import __version__ +from unshackle.core.api.handlers import ( + download_handler, + list_titles_handler, + list_tracks_handler, + list_download_jobs_handler, + get_download_job_handler, + cancel_download_job_handler, +) +from unshackle.core.services import Services +from unshackle.core.update_checker import UpdateChecker + +log = logging.getLogger("api") + + +async def health(request: web.Request) -> web.Response: + """ + Health check endpoint. + --- + summary: Health check + description: Get server health status, version info, and update availability + responses: + '200': + description: Health status + content: + application/json: + schema: + type: object + properties: + status: + type: string + example: ok + version: + type: string + example: "2.0.0" + update_check: + type: object + properties: + update_available: + type: boolean + nullable: true + current_version: + type: string + latest_version: + type: string + nullable: true + """ + try: + latest_version = await UpdateChecker.check_for_updates(__version__) + update_info = { + "update_available": latest_version is not None, + "current_version": __version__, + "latest_version": latest_version, + } + except Exception as e: + log.warning(f"Failed to check for updates: {e}") + update_info = {"update_available": None, "current_version": __version__, "latest_version": None} + + return web.json_response({"status": "ok", "version": __version__, "update_check": update_info}) + + +async def services(request: web.Request) -> web.Response: + """ + List available services. + --- + summary: List services + description: Get all available streaming services with their details + responses: + '200': + description: List of services + content: + application/json: + schema: + type: object + properties: + services: + type: array + items: + type: object + properties: + tag: + type: string + aliases: + type: array + items: + type: string + geofence: + type: array + items: + type: string + title_regex: + type: string + nullable: true + help: + type: string + nullable: true + '500': + description: Server error + """ + try: + service_tags = Services.get_tags() + services_info = [] + + for tag in service_tags: + service_data = {"tag": tag, "aliases": [], "geofence": [], "title_regex": None, "help": None} + + try: + service_module = Services.load(tag) + + if hasattr(service_module, "ALIASES"): + service_data["aliases"] = list(service_module.ALIASES) + + if hasattr(service_module, "GEOFENCE"): + service_data["geofence"] = list(service_module.GEOFENCE) + + if hasattr(service_module, "TITLE_RE"): + service_data["title_regex"] = service_module.TITLE_RE + + if service_module.__doc__: + service_data["help"] = service_module.__doc__.strip() + + except Exception as e: + log.warning(f"Could not load details for service {tag}: {e}") + + services_info.append(service_data) + + return web.json_response({"services": services_info}) + except Exception as e: + log.exception("Error listing services") + return web.json_response({"status": "error", "message": str(e)}, status=500) + + +async def list_titles(request: web.Request) -> web.Response: + """ + List titles for a service and title ID. + --- + summary: List titles + description: Get available titles for a service and title ID + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - service + - title_id + properties: + service: + type: string + description: Service tag + title_id: + type: string + description: Title identifier + responses: + '200': + description: List of titles + '400': + description: Invalid request + """ + try: + data = await request.json() + except Exception: + return web.json_response({"status": "error", "message": "Invalid JSON request body"}, status=400) + + return await list_titles_handler(data) + + +async def list_tracks(request: web.Request) -> web.Response: + """ + List tracks for a title, separated by type. + --- + summary: List tracks + description: Get available video, audio, and subtitle tracks for a title + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - service + - title_id + properties: + service: + type: string + description: Service tag + title_id: + type: string + description: Title identifier + wanted: + type: string + description: Specific episode/season (optional) + proxy: + type: string + description: Proxy configuration (optional) + responses: + '200': + description: Track information + '400': + description: Invalid request + """ + try: + data = await request.json() + except Exception: + return web.json_response({"status": "error", "message": "Invalid JSON request body"}, status=400) + + return await list_tracks_handler(data) + + +async def download(request: web.Request) -> web.Response: + """ + Download content based on provided parameters. + --- + summary: Download content + description: Download video content based on specified parameters + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - service + - title_id + properties: + service: + type: string + description: Service tag + title_id: + type: string + description: Title identifier + responses: + '200': + description: Download started + '400': + description: Invalid request + """ + try: + data = await request.json() + except Exception: + return web.json_response({"status": "error", "message": "Invalid JSON request body"}, status=400) + + return await download_handler(data) + + +async def download_jobs(request: web.Request) -> web.Response: + """ + List all download jobs. + --- + summary: List download jobs + description: Get list of all download jobs with their status + responses: + '200': + description: List of download jobs + content: + application/json: + schema: + type: object + properties: + jobs: + type: array + items: + type: object + properties: + job_id: + type: string + status: + type: string + created_time: + type: string + service: + type: string + title_id: + type: string + progress: + type: number + '500': + description: Server error + """ + return await list_download_jobs_handler({}) + + +async def download_job_detail(request: web.Request) -> web.Response: + """ + Get download job details. + --- + summary: Get download job + description: Get detailed information about a specific download job + parameters: + - name: job_id + in: path + required: true + schema: + type: string + responses: + '200': + description: Download job details + '404': + description: Job not found + '500': + description: Server error + """ + job_id = request.match_info["job_id"] + return await get_download_job_handler(job_id) + + +async def cancel_download_job(request: web.Request) -> web.Response: + """ + Cancel download job. + --- + summary: Cancel download job + description: Cancel a queued or running download job + parameters: + - name: job_id + in: path + required: true + schema: + type: string + responses: + '200': + description: Job cancelled successfully + '400': + description: Job cannot be cancelled + '404': + description: Job not found + '500': + description: Server error + """ + job_id = request.match_info["job_id"] + return await cancel_download_job_handler(job_id) + + +def setup_routes(app: web.Application) -> None: + """Setup all API routes.""" + app.router.add_get("/api/health", health) + app.router.add_get("/api/services", services) + app.router.add_post("/api/list-titles", list_titles) + app.router.add_post("/api/list-tracks", list_tracks) + app.router.add_post("/api/download", download) + app.router.add_get("/api/download/jobs", download_jobs) + app.router.add_get("/api/download/jobs/{job_id}", download_job_detail) + app.router.add_delete("/api/download/jobs/{job_id}", cancel_download_job) + + +def setup_swagger(app: web.Application) -> None: + """Setup Swagger UI documentation.""" + swagger = SwaggerDocs( + app, + swagger_ui_settings=SwaggerUiSettings(path="/api/docs/"), + info=SwaggerInfo( + title="Unshackle REST API", + version=__version__, + description="REST API for Unshackle - Modular Movie, TV, and Music Archival Software", + ), + ) + + # Add routes with OpenAPI documentation + swagger.add_routes( + [ + web.get("/api/health", health), + web.get("/api/services", services), + web.post("/api/list-titles", list_titles), + web.post("/api/list-tracks", list_tracks), + web.post("/api/download", download), + web.get("/api/download/jobs", download_jobs), + web.get("/api/download/jobs/{job_id}", download_job_detail), + web.delete("/api/download/jobs/{job_id}", cancel_download_job), + ] + ) diff --git a/unshackle/core/titles/episode.py b/unshackle/core/titles/episode.py index f66bcb7..16ecab6 100644 --- a/unshackle/core/titles/episode.py +++ b/unshackle/core/titles/episode.py @@ -89,7 +89,17 @@ class Episode(Title): def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str: primary_video_track = next(iter(media_info.video_tracks), None) - primary_audio_track = next(iter(media_info.audio_tracks), None) + primary_audio_track = None + if media_info.audio_tracks: + sorted_audio = sorted( + media_info.audio_tracks, + key=lambda x: ( + float(x.bit_rate) if x.bit_rate else 0, + bool(x.format_additionalfeatures and "JOC" in x.format_additionalfeatures) + ), + reverse=True + ) + primary_audio_track = sorted_audio[0] unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language}) # Title [Year] SXXEXX Name (or Title [Year] SXX if folder) diff --git a/unshackle/core/titles/movie.py b/unshackle/core/titles/movie.py index 3d552d2..2e1d8bb 100644 --- a/unshackle/core/titles/movie.py +++ b/unshackle/core/titles/movie.py @@ -52,7 +52,17 @@ class Movie(Title): def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str: primary_video_track = next(iter(media_info.video_tracks), None) - primary_audio_track = next(iter(media_info.audio_tracks), None) + primary_audio_track = None + if media_info.audio_tracks: + sorted_audio = sorted( + media_info.audio_tracks, + key=lambda x: ( + float(x.bit_rate) if x.bit_rate else 0, + bool(x.format_additionalfeatures and "JOC" in x.format_additionalfeatures) + ), + reverse=True + ) + primary_audio_track = sorted_audio[0] unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language}) # Name (Year) diff --git a/unshackle/core/tracks/audio.py b/unshackle/core/tracks/audio.py index ff5de9f..0069efa 100644 --- a/unshackle/core/tracks/audio.py +++ b/unshackle/core/tracks/audio.py @@ -12,6 +12,7 @@ class Audio(Track): AAC = "AAC" # https://wikipedia.org/wiki/Advanced_Audio_Coding AC3 = "DD" # https://wikipedia.org/wiki/Dolby_Digital EC3 = "DD+" # https://wikipedia.org/wiki/Dolby_Digital_Plus + AC4 = "AC-4" # https://wikipedia.org/wiki/Dolby_AC-4 OPUS = "OPUS" # https://wikipedia.org/wiki/Opus_(audio_format) OGG = "VORB" # https://wikipedia.org/wiki/Vorbis DTS = "DTS" # https://en.wikipedia.org/wiki/DTS_(company)#DTS_Digital_Surround @@ -31,6 +32,8 @@ class Audio(Track): return Audio.Codec.AC3 if mime == "ec-3": return Audio.Codec.EC3 + if mime == "ac-4": + return Audio.Codec.AC4 if mime == "opus": return Audio.Codec.OPUS if mime == "dtsc": @@ -60,6 +63,8 @@ class Audio(Track): return Audio.Codec.AC3 if profile.startswith("ddplus"): return Audio.Codec.EC3 + if profile.startswith("ac4"): + return Audio.Codec.AC4 if profile.startswith("playready-oggvorbis"): return Audio.Codec.OGG raise ValueError(f"The Content Profile '{profile}' is not a supported Audio Codec") diff --git a/unshackle/core/tracks/tracks.py b/unshackle/core/tracks/tracks.py index 34c9da6..cf691b7 100644 --- a/unshackle/core/tracks/tracks.py +++ b/unshackle/core/tracks/tracks.py @@ -202,17 +202,16 @@ class Tracks: """Sort audio tracks by bitrate, descriptive, and optionally language.""" if not self.audio: return - # bitrate - self.audio.sort(key=lambda x: float(x.bitrate or 0.0), reverse=True) # descriptive - self.audio.sort(key=lambda x: str(x.language) if x.descriptive else "") + self.audio.sort(key=lambda x: x.descriptive) + # bitrate (within each descriptive group) + self.audio.sort(key=lambda x: float(x.bitrate or 0.0), reverse=True) # language for language in reversed(by_language or []): if str(language) in ("all", "best"): language = next((x.language for x in self.audio if x.is_original_lang), "") if not language: continue - self.audio.sort(key=lambda x: str(x.language)) self.audio.sort(key=lambda x: not is_close_match(language, [x.language])) def sort_subtitles(self, by_language: Optional[Sequence[Union[str, Language]]] = None) -> None: diff --git a/unshackle/unshackle-example.yaml b/unshackle/unshackle-example.yaml index 4ad46ff..ca4f031 100644 --- a/unshackle/unshackle-example.yaml +++ b/unshackle/unshackle-example.yaml @@ -259,6 +259,7 @@ subtitle: # Configuration for pywidevine's serve functionality serve: + api_secret: "your-secret-key-here" users: secret_key_for_user: devices: diff --git a/uv.lock b/uv.lock index 1cea081..9ae2600 100644 --- a/uv.lock +++ b/uv.lock @@ -80,6 +80,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2b/d8/fa65d2a349fe938b76d309db1a56a75c4fb8cc7b17a398b698488a939903/aiohttp-3.12.15-cp312-cp312-win_amd64.whl", hash = "sha256:b390ef5f62bb508a9d67cb3bba9b8356e23b3996da7062f1a57ce1a79d2b3d34", size = 450266, upload-time = "2025-07-29T05:51:17.239Z" }, ] +[[package]] +name = "aiohttp-swagger3" +version = "0.10.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiohttp" }, + { name = "attrs" }, + { name = "fastjsonschema" }, + { name = "pyyaml" }, + { name = "rfc3339-validator" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a1/06/00ccb2c8afdde4ca7c3cac424d54715c7d90cdd4e13e1ca71d68f5b2e665/aiohttp_swagger3-0.10.0.tar.gz", hash = "sha256:a333c59328f64dd64587e5f276ee84dc256f587d09f2da6ddaae3812fa4d4f33", size = 1839028, upload-time = "2025-02-11T10:51:26.974Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0a/8f/db4cb843999a3088846d170f38eda2182b50b5733387be8102fed171c53f/aiohttp_swagger3-0.10.0-py3-none-any.whl", hash = "sha256:0ae2d2ba7dbd8ea8fe1cffe8f0197db5d0aa979eb9679bd699ecd87923912509", size = 1826491, upload-time = "2025-02-11T10:51:25.174Z" }, +] + [[package]] name = "aiosignal" version = "1.4.0" @@ -468,6 +484,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/36/f4/c6e662dade71f56cd2f3735141b265c3c79293c109549c1e6933b0651ffc/exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10", size = 16674, upload-time = "2025-05-10T17:42:49.33Z" }, ] +[[package]] +name = "fastjsonschema" +version = "2.19.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ba/7f/cedf77ace50aa60c566deaca9066750f06e1fcf6ad24f254d255bb976dd6/fastjsonschema-2.19.1.tar.gz", hash = "sha256:e3126a94bdc4623d3de4485f8d468a12f02a67921315ddc87836d6e456dc789d", size = 372732, upload-time = "2023-12-28T14:02:06.823Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9c/b9/79691036d4a8f9857e74d1728b23f34f583b81350a27492edda58d5604e1/fastjsonschema-2.19.1-py3-none-any.whl", hash = "sha256:3672b47bc94178c9f23dbb654bf47440155d4db9df5f7bc47643315f9c405cd0", size = 23388, upload-time = "2023-12-28T14:02:04.512Z" }, +] + [[package]] name = "filelock" version = "3.19.1" @@ -1252,6 +1277,18 @@ socks = [ { name = "pysocks" }, ] +[[package]] +name = "rfc3339-validator" +version = "0.1.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "six" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/28/ea/a9387748e2d111c3c2b275ba970b735e04e15cdb1eb30693b6b5708c4dbd/rfc3339_validator-0.1.4.tar.gz", hash = "sha256:138a2abdf93304ad60530167e51d2dfb9549521a836871b88d7f4695d0022f6b", size = 5513, upload-time = "2021-05-12T16:37:54.178Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7b/44/4e421b96b67b2daff264473f7465db72fbdf36a07e05494f50300cc7b0c6/rfc3339_validator-0.1.4-py2.py3-none-any.whl", hash = "sha256:24f6ec1eda14ef823da9e36ec7113124b39c04d50a4d3d3a3c2859577e7791fa", size = 3490, upload-time = "2021-05-12T16:37:52.536Z" }, +] + [[package]] name = "rich" version = "13.9.4" @@ -1358,6 +1395,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a3/dc/17031897dae0efacfea57dfd3a82fdd2a2aeb58e0ff71b77b87e44edc772/setuptools-80.9.0-py3-none-any.whl", hash = "sha256:062d34222ad13e0cc312a4c02d73f059e86a4acbfbdea8f8f76b28c99f306922", size = 1201486, upload-time = "2025-05-27T00:56:49.664Z" }, ] +[[package]] +name = "six" +version = "1.17.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/94/e7/b2c673351809dca68a0e064b6af791aa332cf192da575fd474ed7d6f16a2/six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81", size = 34031, upload-time = "2024-12-04T17:35:28.174Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" }, +] + [[package]] name = "sniffio" version = "1.3.1" @@ -1502,6 +1548,7 @@ name = "unshackle" version = "1.4.6" source = { editable = "." } dependencies = [ + { name = "aiohttp-swagger3" }, { name = "appdirs" }, { name = "brotli" }, { name = "chardet" }, @@ -1551,6 +1598,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "aiohttp-swagger3", specifier = ">=0.9.0,<1" }, { name = "appdirs", specifier = ">=1.4.4,<2" }, { name = "brotli", specifier = ">=1.1.0,<2" }, { name = "chardet", specifier = ">=5.2.0,<6" },