feat: add REST API server with download management

Very early dev work, more changes will be active in this branch.

- Implement download queue management and worker system
- Add OpenAPI/Swagger documentation
- Include download progress tracking and status endpoints
- Add API authentication and error handling
- Update core components to support API integration
This commit is contained in:
Sp5rky
2025-09-28 21:49:00 -06:00
parent bc26bf3046
commit 2afc59624d
15 changed files with 1902 additions and 12 deletions

View File

@@ -547,9 +547,12 @@ Configuration data for pywidevine's serve functionality run through unshackle.
This effectively allows you to run `unshackle serve` to start serving pywidevine Serve-compliant CDMs right from your
local widevine device files.
- `api_secret` - Secret key for REST API authentication. When set, enables the REST API server alongside the CDM serve functionality. This key is required for authenticating API requests.
For example,
```yaml
api_secret: "your-secret-key-here"
users:
secret_key_for_jane: # 32bit hex recommended, case-sensitive
devices: # list of allowed devices for this user

View File

@@ -58,6 +58,7 @@ dependencies = [
"httpx>=0.28.1,<0.29",
"cryptography>=45.0.0",
"subby",
"aiohttp-swagger3>=0.9.0,<1",
]
[project.urls]

View File

@@ -1,19 +1,26 @@
import logging
import subprocess
import click
from aiohttp import web
from unshackle.core import binaries
from unshackle.core.api import setup_routes, setup_swagger
from unshackle.core.config import config
from unshackle.core.constants import context_settings
@click.command(short_help="Serve your Local Widevine Devices for Remote Access.", context_settings=context_settings)
@click.command(
short_help="Serve your Local Widevine Devices and REST API for Remote Access.", context_settings=context_settings
)
@click.option("-h", "--host", type=str, default="0.0.0.0", help="Host to serve from.")
@click.option("-p", "--port", type=int, default=8786, help="Port to serve from.")
@click.option("--caddy", is_flag=True, default=False, help="Also serve with Caddy.")
def serve(host: str, port: int, caddy: bool) -> None:
@click.option("--api-only", is_flag=True, default=False, help="Serve only the REST API, not pywidevine CDM.")
@click.option("--no-key", is_flag=True, default=False, help="Disable API key authentication (allows all requests).")
def serve(host: str, port: int, caddy: bool, api_only: bool, no_key: bool) -> None:
"""
Serve your Local Widevine Devices for Remote Access.
Serve your Local Widevine Devices and REST API for Remote Access.
\b
Host as 127.0.0.1 may block remote access even if port-forwarded.
@@ -23,8 +30,25 @@ def serve(host: str, port: int, caddy: bool) -> None:
You may serve with Caddy at the same time with --caddy. You can use Caddy
as a reverse-proxy to serve with HTTPS. The config used will be the Caddyfile
next to the unshackle config.
\b
The REST API provides programmatic access to unshackle functionality.
Configure authentication in your config under serve.users and serve.api_secret.
"""
from pywidevine import serve
from pywidevine import serve as pywidevine_serve
log = logging.getLogger("serve")
# Validate API secret for REST API routes (unless --no-key is used)
if not no_key:
api_secret = config.serve.get("api_secret")
if not api_secret:
raise click.ClickException(
"API secret key is not configured. Please add 'api_secret' to the 'serve' section in your config."
)
else:
api_secret = None
log.warning("Running with --no-key: Authentication is DISABLED for all API endpoints!")
if caddy:
if not binaries.Caddy:
@@ -39,7 +63,51 @@ def serve(host: str, port: int, caddy: bool) -> None:
if not config.serve.get("devices"):
config.serve["devices"] = []
config.serve["devices"].extend(list(config.directories.wvds.glob("*.wvd")))
serve.run(config.serve, host, port)
if api_only:
# API-only mode: serve just the REST API
log.info("Starting REST API server (pywidevine CDM disabled)")
if no_key:
app = web.Application()
app["config"] = {"users": []}
else:
app = web.Application(middlewares=[pywidevine_serve.authentication])
app["config"] = {"users": [api_secret]}
setup_routes(app)
setup_swagger(app)
log.info(f"REST API endpoints available at http://{host}:{port}/api/")
log.info(f"Swagger UI available at http://{host}:{port}/api/docs/")
log.info("(Press CTRL+C to quit)")
web.run_app(app, host=host, port=port, print=None)
else:
# Integrated mode: serve both pywidevine + REST API
log.info("Starting integrated server (pywidevine CDM + REST API)")
# Create integrated app with both pywidevine and API routes
if no_key:
app = web.Application()
app["config"] = dict(config.serve)
app["config"]["users"] = []
else:
app = web.Application(middlewares=[pywidevine_serve.authentication])
# Setup config - add API secret to users for authentication
serve_config = dict(config.serve)
if not serve_config.get("users"):
serve_config["users"] = []
if api_secret not in serve_config["users"]:
serve_config["users"].append(api_secret)
app["config"] = serve_config
app.on_startup.append(pywidevine_serve._startup)
app.on_cleanup.append(pywidevine_serve._cleanup)
app.add_routes(pywidevine_serve.routes)
setup_routes(app)
setup_swagger(app)
log.info(f"REST API endpoints available at http://{host}:{port}/api/")
log.info(f"Swagger UI available at http://{host}:{port}/api/docs/")
log.info("(Press CTRL+C to quit)")
web.run_app(app, host=host, port=port, print=None)
finally:
if caddy_p:
caddy_p.kill()

View File

@@ -1 +1 @@
__version__ = "1.4.7"
__version__ = "2.0.0"

View File

@@ -0,0 +1,3 @@
from unshackle.core.api.routes import setup_routes, setup_swagger
__all__ = ["setup_routes", "setup_swagger"]

View File

@@ -0,0 +1,630 @@
import asyncio
import json
import logging
import os
import sys
import tempfile
import threading
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
from datetime import datetime, timedelta
from contextlib import suppress
log = logging.getLogger("download_manager")
class JobStatus(Enum):
QUEUED = "queued"
DOWNLOADING = "downloading"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class DownloadJob:
"""Represents a download job with all its parameters and status."""
job_id: str
status: JobStatus
created_time: datetime
service: str
title_id: str
parameters: Dict[str, Any]
# Progress tracking
started_time: Optional[datetime] = None
completed_time: Optional[datetime] = None
progress: float = 0.0
# Results and error info
output_files: List[str] = field(default_factory=list)
error_message: Optional[str] = None
error_details: Optional[str] = None
# Cancellation support
cancel_event: threading.Event = field(default_factory=threading.Event)
def to_dict(self, include_full_details: bool = False) -> Dict[str, Any]:
"""Convert job to dictionary for JSON response."""
result = {
"job_id": self.job_id,
"status": self.status.value,
"created_time": self.created_time.isoformat(),
"service": self.service,
"title_id": self.title_id,
"progress": self.progress,
}
if include_full_details:
result.update(
{
"parameters": self.parameters,
"started_time": self.started_time.isoformat() if self.started_time else None,
"completed_time": self.completed_time.isoformat() if self.completed_time else None,
"output_files": self.output_files,
"error_message": self.error_message,
"error_details": self.error_details,
}
)
return result
def _perform_download(
job_id: str,
service: str,
title_id: str,
params: Dict[str, Any],
cancel_event: Optional[threading.Event] = None,
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
) -> List[str]:
"""Execute the synchronous download logic for a job."""
def _check_cancel(stage: str):
if cancel_event and cancel_event.is_set():
raise Exception(f"Job was cancelled {stage}")
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr
_check_cancel("before execution started")
# Import dl.py components lazily to avoid circular deps during module import
import click
import yaml
from unshackle.commands.dl import dl
from unshackle.core.config import config
from unshackle.core.services import Services
from unshackle.core.utils.click_types import ContextData
from unshackle.core.utils.collections import merge_dict
log.info(f"Starting sync download for job {job_id}")
# Load service configuration
service_config_path = Services.get_path(service) / config.filenames.config
if service_config_path.exists():
service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8"))
else:
service_config = {}
merge_dict(config.services.get(service), service_config)
from unshackle.commands.dl import dl as dl_command
ctx = click.Context(dl_command.cli)
ctx.invoked_subcommand = service
ctx.obj = ContextData(config=service_config, cdm=None, proxy_providers=[], profile=params.get("profile"))
ctx.params = {
"proxy": params.get("proxy"),
"no_proxy": params.get("no_proxy", False),
"profile": params.get("profile"),
"tag": params.get("tag"),
"tmdb_id": params.get("tmdb_id"),
"tmdb_name": params.get("tmdb_name", False),
"tmdb_year": params.get("tmdb_year", False),
}
dl_instance = dl(
ctx=ctx,
no_proxy=params.get("no_proxy", False),
profile=params.get("profile"),
proxy=params.get("proxy"),
tag=params.get("tag"),
tmdb_id=params.get("tmdb_id"),
tmdb_name=params.get("tmdb_name", False),
tmdb_year=params.get("tmdb_year", False),
)
service_module = Services.load(service)
_check_cancel("before service instantiation")
try:
import inspect
service_init_params = inspect.signature(service_module.__init__).parameters
service_ctx = click.Context(click.Command(service))
service_ctx.parent = ctx
service_ctx.obj = ctx.obj
service_kwargs = {}
if "title" in service_init_params:
service_kwargs["title"] = title_id
for key, value in params.items():
if key in service_init_params and key not in ["service", "title_id"]:
service_kwargs[key] = value
for param_name, param_info in service_init_params.items():
if param_name not in service_kwargs and param_name not in ["self", "ctx"]:
if param_info.default is inspect.Parameter.empty:
if param_name == "movie":
service_kwargs[param_name] = "/movies/" in title_id
elif param_name == "meta_lang":
service_kwargs[param_name] = None
else:
log.warning(f"Unknown required parameter '{param_name}' for service {service}, using None")
service_kwargs[param_name] = None
service_instance = service_module(service_ctx, **service_kwargs)
except Exception as exc: # noqa: BLE001 - propagate meaningful failure
log.error(f"Failed to create service instance: {exc}")
raise
original_download_dir = config.directories.downloads
_check_cancel("before download execution")
stdout_capture = StringIO()
stderr_capture = StringIO()
# Simple progress tracking if callback provided
if progress_callback:
# Report initial progress
progress_callback({"progress": 0.0, "status": "starting"})
# Simple approach: report progress at key points
original_result = dl_instance.result
def result_with_progress(*args, **kwargs):
try:
# Report that download started
progress_callback({"progress": 5.0, "status": "downloading"})
# Call original method
result = original_result(*args, **kwargs)
# Report completion
progress_callback({"progress": 100.0, "status": "completed"})
return result
except Exception as e:
progress_callback({"progress": 0.0, "status": "failed", "error": str(e)})
raise
dl_instance.result = result_with_progress
try:
with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture):
dl_instance.result(
service=service_instance,
quality=params.get("quality", []),
vcodec=params.get("vcodec"),
acodec=params.get("acodec"),
vbitrate=params.get("vbitrate"),
abitrate=params.get("abitrate"),
range_=params.get("range", []),
channels=params.get("channels"),
no_atmos=params.get("no_atmos", False),
wanted=params.get("wanted", []),
lang=params.get("lang", ["orig"]),
v_lang=params.get("v_lang", []),
a_lang=params.get("a_lang", []),
s_lang=params.get("s_lang", ["all"]),
require_subs=params.get("require_subs", []),
forced_subs=params.get("forced_subs", False),
sub_format=params.get("sub_format"),
video_only=params.get("video_only", False),
audio_only=params.get("audio_only", False),
subs_only=params.get("subs_only", False),
chapters_only=params.get("chapters_only", False),
no_subs=params.get("no_subs", False),
no_audio=params.get("no_audio", False),
no_chapters=params.get("no_chapters", False),
slow=params.get("slow", False),
list_=False,
list_titles=False,
skip_dl=params.get("skip_dl", False),
export=params.get("export"),
cdm_only=params.get("cdm_only"),
no_proxy=params.get("no_proxy", False),
no_folder=params.get("no_folder", False),
no_source=params.get("no_source", False),
workers=params.get("workers"),
downloads=params.get("downloads", 1),
best_available=params.get("best_available", False),
)
except SystemExit as exc:
if exc.code != 0:
stdout_str = stdout_capture.getvalue()
stderr_str = stderr_capture.getvalue()
log.error(f"Download exited with code {exc.code}")
log.error(f"Stdout: {stdout_str}")
log.error(f"Stderr: {stderr_str}")
raise Exception(f"Download failed with exit code {exc.code}")
except Exception as exc: # noqa: BLE001 - propagate to caller
stdout_str = stdout_capture.getvalue()
stderr_str = stderr_capture.getvalue()
log.error(f"Download execution failed: {exc}")
log.error(f"Stdout: {stdout_str}")
log.error(f"Stderr: {stderr_str}")
raise
log.info(f"Download completed for job {job_id}, files in {original_download_dir}")
return []
class DownloadQueueManager:
"""Manages download job queue with configurable concurrency limits."""
def __init__(self, max_concurrent_downloads: int = 2, job_retention_hours: int = 24):
self.max_concurrent_downloads = max_concurrent_downloads
self.job_retention_hours = job_retention_hours
self._jobs: Dict[str, DownloadJob] = {}
self._job_queue: asyncio.Queue = asyncio.Queue()
self._active_downloads: Dict[str, asyncio.Task] = {}
self._download_processes: Dict[str, asyncio.subprocess.Process] = {}
self._job_temp_files: Dict[str, Dict[str, str]] = {}
self._workers_started = False
self._shutdown_event = asyncio.Event()
log.info(
f"Initialized download queue manager: max_concurrent={max_concurrent_downloads}, retention_hours={job_retention_hours}"
)
def create_job(self, service: str, title_id: str, **parameters) -> DownloadJob:
"""Create a new download job and add it to the queue."""
job_id = str(uuid.uuid4())
job = DownloadJob(
job_id=job_id,
status=JobStatus.QUEUED,
created_time=datetime.now(),
service=service,
title_id=title_id,
parameters=parameters,
)
self._jobs[job_id] = job
self._job_queue.put_nowait(job)
log.info(f"Created download job {job_id} for {service}:{title_id}")
return job
def get_job(self, job_id: str) -> Optional[DownloadJob]:
"""Get job by ID."""
return self._jobs.get(job_id)
def list_jobs(self) -> List[DownloadJob]:
"""List all jobs."""
return list(self._jobs.values())
def cancel_job(self, job_id: str) -> bool:
"""Cancel a job if it's queued or downloading."""
job = self._jobs.get(job_id)
if not job:
return False
if job.status == JobStatus.QUEUED:
job.status = JobStatus.CANCELLED
job.cancel_event.set() # Signal cancellation
log.info(f"Cancelled queued job {job_id}")
return True
elif job.status == JobStatus.DOWNLOADING:
# Set the cancellation event first - this will be checked by the download thread
job.cancel_event.set()
job.status = JobStatus.CANCELLED
log.info(f"Signaled cancellation for downloading job {job_id}")
# Cancel the active download task
task = self._active_downloads.get(job_id)
if task:
task.cancel()
log.info(f"Cancelled download task for job {job_id}")
process = self._download_processes.get(job_id)
if process:
try:
process.terminate()
log.info(f"Terminated worker process for job {job_id}")
except ProcessLookupError:
log.debug(f"Worker process for job {job_id} already exited")
return True
return False
def cleanup_old_jobs(self) -> int:
"""Remove jobs older than retention period."""
cutoff_time = datetime.now() - timedelta(hours=self.job_retention_hours)
jobs_to_remove = []
for job_id, job in self._jobs.items():
if job.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
if job.completed_time and job.completed_time < cutoff_time:
jobs_to_remove.append(job_id)
elif not job.completed_time and job.created_time < cutoff_time:
jobs_to_remove.append(job_id)
for job_id in jobs_to_remove:
del self._jobs[job_id]
if jobs_to_remove:
log.info(f"Cleaned up {len(jobs_to_remove)} old jobs")
return len(jobs_to_remove)
async def start_workers(self):
"""Start worker tasks to process the download queue."""
if self._workers_started:
return
self._workers_started = True
# Start worker tasks
for i in range(self.max_concurrent_downloads):
asyncio.create_task(self._download_worker(f"worker-{i}"))
# Start cleanup task
asyncio.create_task(self._cleanup_worker())
log.info(f"Started {self.max_concurrent_downloads} download workers")
async def shutdown(self):
"""Shutdown the queue manager and cancel all active downloads."""
log.info("Shutting down download queue manager")
self._shutdown_event.set()
# Cancel all active downloads
for task in self._active_downloads.values():
task.cancel()
# Terminate worker processes
for job_id, process in list(self._download_processes.items()):
try:
process.terminate()
except ProcessLookupError:
log.debug(f"Worker process for job {job_id} already exited during shutdown")
for job_id, process in list(self._download_processes.items()):
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
log.warning(f"Worker process for job {job_id} did not exit, killing")
process.kill()
await process.wait()
finally:
self._download_processes.pop(job_id, None)
# Clean up any remaining temp files
for paths in self._job_temp_files.values():
for path in paths.values():
try:
os.remove(path)
except OSError:
pass
self._job_temp_files.clear()
# Wait for workers to finish
if self._active_downloads:
await asyncio.gather(*self._active_downloads.values(), return_exceptions=True)
async def _download_worker(self, worker_name: str):
"""Worker task that processes jobs from the queue."""
log.debug(f"Download worker {worker_name} started")
while not self._shutdown_event.is_set():
try:
# Wait for a job or shutdown signal
job = await asyncio.wait_for(self._job_queue.get(), timeout=1.0)
if job.status == JobStatus.CANCELLED:
continue
# Start processing the job
job.status = JobStatus.DOWNLOADING
job.started_time = datetime.now()
log.info(f"Worker {worker_name} starting job {job.job_id}")
# Create download task
download_task = asyncio.create_task(self._execute_download(job))
self._active_downloads[job.job_id] = download_task
try:
await download_task
except asyncio.CancelledError:
job.status = JobStatus.CANCELLED
log.info(f"Job {job.job_id} was cancelled")
except Exception as e:
job.status = JobStatus.FAILED
job.error_message = str(e)
log.error(f"Job {job.job_id} failed: {e}")
finally:
job.completed_time = datetime.now()
if job.job_id in self._active_downloads:
del self._active_downloads[job.job_id]
except asyncio.TimeoutError:
continue
except Exception as e:
log.error(f"Worker {worker_name} error: {e}")
async def _execute_download(self, job: DownloadJob):
"""Execute the actual download for a job."""
log.info(f"Executing download for job {job.job_id}")
try:
output_files = await self._run_download_async(job)
job.status = JobStatus.COMPLETED
job.output_files = output_files
job.progress = 100.0
log.info(f"Download completed for job {job.job_id}: {len(output_files)} files")
except Exception as e:
job.status = JobStatus.FAILED
job.error_message = str(e)
job.error_details = str(e)
log.error(f"Download failed for job {job.job_id}: {e}")
raise
async def _run_download_async(self, job: DownloadJob) -> List[str]:
"""Invoke a worker subprocess to execute the download."""
payload = {
"job_id": job.job_id,
"service": job.service,
"title_id": job.title_id,
"parameters": job.parameters,
}
payload_fd, payload_path = tempfile.mkstemp(prefix=f"unshackle_job_{job.job_id}_", suffix="_payload.json")
os.close(payload_fd)
result_fd, result_path = tempfile.mkstemp(prefix=f"unshackle_job_{job.job_id}_", suffix="_result.json")
os.close(result_fd)
progress_fd, progress_path = tempfile.mkstemp(prefix=f"unshackle_job_{job.job_id}_", suffix="_progress.json")
os.close(progress_fd)
with open(payload_path, "w", encoding="utf-8") as handle:
json.dump(payload, handle)
process = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"unshackle.core.api.download_worker",
payload_path,
result_path,
progress_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._download_processes[job.job_id] = process
self._job_temp_files[job.job_id] = {"payload": payload_path, "result": result_path, "progress": progress_path}
communicate_task = asyncio.create_task(process.communicate())
stdout_bytes = b""
stderr_bytes = b""
try:
while True:
done, _ = await asyncio.wait({communicate_task}, timeout=0.5)
if communicate_task in done:
stdout_bytes, stderr_bytes = communicate_task.result()
break
# Check for progress updates
try:
if os.path.exists(progress_path):
with open(progress_path, "r", encoding="utf-8") as handle:
progress_data = json.load(handle)
if "progress" in progress_data:
new_progress = float(progress_data["progress"])
if new_progress != job.progress:
job.progress = new_progress
log.info(f"Job {job.job_id} progress updated: {job.progress}%")
except (FileNotFoundError, json.JSONDecodeError, ValueError) as e:
log.debug(f"Could not read progress for job {job.job_id}: {e}")
if job.cancel_event.is_set() or job.status == JobStatus.CANCELLED:
log.info(f"Cancellation detected for job {job.job_id}, terminating worker process")
process.terminate()
try:
await asyncio.wait_for(communicate_task, timeout=5)
except asyncio.TimeoutError:
log.warning(f"Worker process for job {job.job_id} did not terminate, killing")
process.kill()
await asyncio.wait_for(communicate_task, timeout=5)
raise asyncio.CancelledError("Job was cancelled")
returncode = process.returncode
stdout = stdout_bytes.decode("utf-8", errors="ignore")
stderr = stderr_bytes.decode("utf-8", errors="ignore")
if stdout.strip():
log.debug(f"Worker stdout for job {job.job_id}: {stdout.strip()}")
if stderr.strip():
log.warning(f"Worker stderr for job {job.job_id}: {stderr.strip()}")
result_data: Optional[Dict[str, Any]] = None
try:
with open(result_path, "r", encoding="utf-8") as handle:
result_data = json.load(handle)
except FileNotFoundError:
log.error(f"Result file missing for job {job.job_id}")
except json.JSONDecodeError as exc:
log.error(f"Failed to parse worker result for job {job.job_id}: {exc}")
if returncode != 0:
message = result_data.get("message") if result_data else "unknown error"
raise Exception(f"Worker exited with code {returncode}: {message}")
if not result_data or result_data.get("status") != "success":
message = result_data.get("message") if result_data else "worker did not report success"
raise Exception(f"Worker failure: {message}")
return result_data.get("output_files", [])
finally:
if not communicate_task.done():
communicate_task.cancel()
with suppress(asyncio.CancelledError):
await communicate_task
self._download_processes.pop(job.job_id, None)
temp_paths = self._job_temp_files.pop(job.job_id, {})
for path in temp_paths.values():
try:
os.remove(path)
except OSError:
pass
def _execute_download_sync(self, job: DownloadJob) -> List[str]:
"""Execute download synchronously using existing dl.py logic."""
return _perform_download(job.job_id, job.service, job.title_id, job.parameters.copy(), job.cancel_event)
async def _cleanup_worker(self):
"""Worker that periodically cleans up old jobs."""
while not self._shutdown_event.is_set():
try:
await asyncio.sleep(3600) # Run every hour
self.cleanup_old_jobs()
except Exception as e:
log.error(f"Cleanup worker error: {e}")
# Global instance
download_manager: Optional[DownloadQueueManager] = None
def get_download_manager() -> DownloadQueueManager:
"""Get the global download manager instance."""
global download_manager
if download_manager is None:
# Load configuration from unshackle config
from unshackle.core.config import config
max_concurrent = getattr(config, "max_concurrent_downloads", 2)
retention_hours = getattr(config, "download_job_retention_hours", 24)
download_manager = DownloadQueueManager(max_concurrent, retention_hours)
return download_manager

View File

@@ -0,0 +1,84 @@
"""Standalone worker process entry point for executing download jobs."""
from __future__ import annotations
import json
import logging
import sys
import traceback
from pathlib import Path
from typing import Any, Dict
from .download_manager import _perform_download
log = logging.getLogger("download_worker")
def _read_payload(path: Path) -> Dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def _write_result(path: Path, payload: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
json.dump(payload, handle)
def main(argv: list[str]) -> int:
if len(argv) not in [3, 4]:
print(
"Usage: python -m unshackle.core.api.download_worker <payload_path> <result_path> [progress_path]",
file=sys.stderr,
)
return 2
payload_path = Path(argv[1])
result_path = Path(argv[2])
progress_path = Path(argv[3]) if len(argv) > 3 else None
result: Dict[str, Any] = {}
exit_code = 0
try:
payload = _read_payload(payload_path)
job_id = payload["job_id"]
service = payload["service"]
title_id = payload["title_id"]
params = payload.get("parameters", {})
log.info(f"Worker starting job {job_id} ({service}:{title_id})")
def progress_callback(progress_data: Dict[str, Any]) -> None:
"""Write progress updates to file for main process to read."""
if progress_path:
try:
log.info(f"Writing progress update: {progress_data}")
_write_result(progress_path, progress_data)
log.info(f"Progress update written to {progress_path}")
except Exception as e:
log.error(f"Failed to write progress update: {e}")
output_files = _perform_download(
job_id, service, title_id, params, cancel_event=None, progress_callback=progress_callback
)
result = {"status": "success", "output_files": output_files}
except Exception as exc: # noqa: BLE001 - capture for parent process
exit_code = 1
tb = traceback.format_exc()
log.error(f"Worker failed with error: {exc}")
result = {"status": "error", "message": str(exc), "traceback": tb}
finally:
try:
_write_result(result_path, result)
except Exception as exc: # noqa: BLE001 - last resort logging
log.error(f"Failed to write worker result file: {exc}")
return exit_code
if __name__ == "__main__":
sys.exit(main(sys.argv))

View File

@@ -0,0 +1,653 @@
import logging
from typing import Any, Dict, List, Optional
from aiohttp import web
from unshackle.core.constants import AUDIO_CODEC_MAP, DYNAMIC_RANGE_MAP, VIDEO_CODEC_MAP
from unshackle.core.proxies.basic import Basic
from unshackle.core.proxies.hola import Hola
from unshackle.core.proxies.nordvpn import NordVPN
from unshackle.core.proxies.surfsharkvpn import SurfsharkVPN
from unshackle.core.services import Services
from unshackle.core.titles import Episode, Movie, Title_T
from unshackle.core.tracks import Audio, Subtitle, Video
log = logging.getLogger("api")
def initialize_proxy_providers() -> List[Any]:
"""Initialize and return available proxy providers."""
proxy_providers = []
try:
from unshackle.core import binaries
# Load the main unshackle config to get proxy provider settings
from unshackle.core.config import config as main_config
log.debug(f"Main config proxy providers: {getattr(main_config, 'proxy_providers', {})}")
log.debug(f"Available proxy provider configs: {list(getattr(main_config, 'proxy_providers', {}).keys())}")
# Use main_config instead of the service-specific config for proxy providers
proxy_config = getattr(main_config, "proxy_providers", {})
if proxy_config.get("basic"):
log.debug("Loading Basic proxy provider")
proxy_providers.append(Basic(**proxy_config["basic"]))
if proxy_config.get("nordvpn"):
log.debug("Loading NordVPN proxy provider")
proxy_providers.append(NordVPN(**proxy_config["nordvpn"]))
if proxy_config.get("surfsharkvpn"):
log.debug("Loading SurfsharkVPN proxy provider")
proxy_providers.append(SurfsharkVPN(**proxy_config["surfsharkvpn"]))
if hasattr(binaries, "HolaProxy") and binaries.HolaProxy:
log.debug("Loading Hola proxy provider")
proxy_providers.append(Hola())
for proxy_provider in proxy_providers:
log.info(f"Loaded {proxy_provider.__class__.__name__}: {proxy_provider}")
if not proxy_providers:
log.warning("No proxy providers were loaded. Check your proxy provider configuration in unshackle.yaml")
except Exception as e:
log.warning(f"Failed to initialize some proxy providers: {e}")
return proxy_providers
def resolve_proxy(proxy: str, proxy_providers: List[Any]) -> str:
"""Resolve proxy parameter to actual proxy URI."""
import re
if not proxy:
return proxy
# Check if explicit proxy URI
if re.match(r"^https?://", proxy):
return proxy
# Handle provider:country format (e.g., "nordvpn:us")
requested_provider = None
if re.match(r"^[a-z]+:.+$", proxy, re.IGNORECASE):
requested_provider, proxy = proxy.split(":", maxsplit=1)
# Handle country code format (e.g., "us", "uk")
if re.match(r"^[a-z]{2}(?:\d+)?$", proxy, re.IGNORECASE):
proxy = proxy.lower()
if requested_provider:
# Find specific provider (case-insensitive matching)
proxy_provider = next(
(x for x in proxy_providers if x.__class__.__name__.lower() == requested_provider.lower()),
None,
)
if not proxy_provider:
available_providers = [x.__class__.__name__ for x in proxy_providers]
raise ValueError(
f"The proxy provider '{requested_provider}' was not recognized. Available providers: {available_providers}"
)
proxy_uri = proxy_provider.get_proxy(proxy)
if not proxy_uri:
raise ValueError(f"The proxy provider {requested_provider} had no proxy for {proxy}")
log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy_uri}")
return proxy_uri
else:
# Try all providers
for proxy_provider in proxy_providers:
proxy_uri = proxy_provider.get_proxy(proxy)
if proxy_uri:
log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy_uri}")
return proxy_uri
raise ValueError(f"No proxy provider had a proxy for {proxy}")
# Return as-is if not recognized format
log.info(f"Using explicit Proxy: {proxy}")
return proxy
def validate_service(service_tag: str) -> Optional[str]:
"""Validate and normalize service tag."""
try:
normalized = Services.get_tag(service_tag)
service_path = Services.get_path(normalized)
if not service_path.exists():
return None
return normalized
except Exception:
return None
def serialize_title(title: Title_T) -> Dict[str, Any]:
"""Convert a title object to JSON-serializable dict."""
if isinstance(title, Episode):
episode_name = title.name if title.name else f"Episode {title.number:02d}"
result = {
"type": "episode",
"name": episode_name,
"series_title": str(title.title),
"season": title.season,
"number": title.number,
"year": title.year,
"id": str(title.id) if hasattr(title, "id") else None,
}
elif isinstance(title, Movie):
result = {
"type": "movie",
"name": str(title.name) if hasattr(title, "name") else str(title),
"year": title.year,
"id": str(title.id) if hasattr(title, "id") else None,
}
else:
result = {
"type": "other",
"name": str(title.name) if hasattr(title, "name") else str(title),
"id": str(title.id) if hasattr(title, "id") else None,
}
return result
def serialize_video_track(track: Video) -> Dict[str, Any]:
"""Convert video track to JSON-serializable dict."""
codec_name = track.codec.name if hasattr(track.codec, "name") else str(track.codec)
range_name = track.range.name if hasattr(track.range, "name") else str(track.range)
return {
"id": str(track.id),
"codec": codec_name,
"codec_display": VIDEO_CODEC_MAP.get(codec_name, codec_name),
"bitrate": int(track.bitrate / 1000) if track.bitrate else None,
"width": track.width,
"height": track.height,
"resolution": f"{track.width}x{track.height}" if track.width and track.height else None,
"fps": track.fps if track.fps else None,
"range": range_name,
"range_display": DYNAMIC_RANGE_MAP.get(range_name, range_name),
"language": str(track.language) if track.language else None,
"drm": str(track.drm) if hasattr(track, "drm") and track.drm else None,
}
def serialize_audio_track(track: Audio) -> Dict[str, Any]:
"""Convert audio track to JSON-serializable dict."""
codec_name = track.codec.name if hasattr(track.codec, "name") else str(track.codec)
return {
"id": str(track.id),
"codec": codec_name,
"codec_display": AUDIO_CODEC_MAP.get(codec_name, codec_name),
"bitrate": int(track.bitrate / 1000) if track.bitrate else None,
"channels": track.channels if track.channels else None,
"language": str(track.language) if track.language else None,
"atmos": track.atmos if hasattr(track, "atmos") else False,
"descriptive": track.descriptive if hasattr(track, "descriptive") else False,
"drm": str(track.drm) if hasattr(track, "drm") and track.drm else None,
}
def serialize_subtitle_track(track: Subtitle) -> Dict[str, Any]:
"""Convert subtitle track to JSON-serializable dict."""
return {
"id": str(track.id),
"codec": track.codec.name if hasattr(track.codec, "name") else str(track.codec),
"language": str(track.language) if track.language else None,
"forced": track.forced if hasattr(track, "forced") else False,
"sdh": track.sdh if hasattr(track, "sdh") else False,
"cc": track.cc if hasattr(track, "cc") else False,
}
async def list_titles_handler(data: Dict[str, Any]) -> web.Response:
"""Handle list-titles request."""
service_tag = data.get("service")
title_id = data.get("title_id")
profile = data.get("profile")
if not service_tag:
return web.json_response({"status": "error", "message": "Missing required parameter: service"}, status=400)
if not title_id:
return web.json_response({"status": "error", "message": "Missing required parameter: title_id"}, status=400)
normalized_service = validate_service(service_tag)
if not normalized_service:
return web.json_response(
{"status": "error", "message": f"Invalid or unavailable service: {service_tag}"}, status=400
)
try:
import inspect
import click
import yaml
from unshackle.commands.dl import dl
from unshackle.core.config import config
from unshackle.core.utils.click_types import ContextData
from unshackle.core.utils.collections import merge_dict
service_config_path = Services.get_path(normalized_service) / config.filenames.config
if service_config_path.exists():
service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8"))
else:
service_config = {}
merge_dict(config.services.get(normalized_service), service_config)
@click.command()
@click.pass_context
def dummy_service(ctx: click.Context) -> None:
pass
# Handle proxy configuration
proxy_param = data.get("proxy")
no_proxy = data.get("no_proxy", False)
proxy_providers = []
if not no_proxy:
proxy_providers = initialize_proxy_providers()
if proxy_param and not no_proxy:
try:
resolved_proxy = resolve_proxy(proxy_param, proxy_providers)
proxy_param = resolved_proxy
except ValueError as e:
return web.json_response({"status": "error", "message": f"Proxy error: {e}"}, status=400)
ctx = click.Context(dummy_service)
ctx.obj = ContextData(config=service_config, cdm=None, proxy_providers=proxy_providers, profile=profile)
ctx.params = {"proxy": proxy_param, "no_proxy": no_proxy}
service_module = Services.load(normalized_service)
dummy_service.name = normalized_service
dummy_service.params = [click.Argument([title_id], type=str)]
ctx.invoked_subcommand = normalized_service
service_ctx = click.Context(dummy_service, parent=ctx)
service_ctx.obj = ctx.obj
service_kwargs = {"title": title_id}
# Add additional parameters from request data
for key, value in data.items():
if key not in ["service", "title_id", "profile", "season", "episode", "wanted", "proxy", "no_proxy"]:
service_kwargs[key] = value
# Get service parameter info and click command defaults
service_init_params = inspect.signature(service_module.__init__).parameters
# Extract default values from the click command
if hasattr(service_module, "cli") and hasattr(service_module.cli, "params"):
for param in service_module.cli.params:
if hasattr(param, "name") and param.name not in service_kwargs:
# Add default value if parameter is not already provided
if hasattr(param, "default") and param.default is not None:
service_kwargs[param.name] = param.default
# Handle required parameters that don't have click defaults
for param_name, param_info in service_init_params.items():
if param_name not in service_kwargs and param_name not in ["self", "ctx"]:
# Check if parameter is required (no default value in signature)
if param_info.default is inspect.Parameter.empty:
# Provide sensible defaults for common required parameters
if param_name == "meta_lang":
service_kwargs[param_name] = None
elif param_name == "movie":
service_kwargs[param_name] = False
else:
# Log warning for unknown required parameters
log.warning(f"Unknown required parameter '{param_name}' for service {normalized_service}")
# Filter out any parameters that the service doesn't accept
filtered_kwargs = {}
for key, value in service_kwargs.items():
if key in service_init_params:
filtered_kwargs[key] = value
service_instance = service_module(service_ctx, **filtered_kwargs)
cookies = dl.get_cookie_jar(normalized_service, profile)
credential = dl.get_credentials(normalized_service, profile)
service_instance.authenticate(cookies, credential)
titles = service_instance.get_titles()
if hasattr(titles, "__iter__") and not isinstance(titles, str):
title_list = [serialize_title(t) for t in titles]
else:
title_list = [serialize_title(titles)]
return web.json_response({"titles": title_list})
except Exception as e:
log.exception("Error listing titles")
return web.json_response({"status": "error", "message": str(e)}, status=500)
async def list_tracks_handler(data: Dict[str, Any]) -> web.Response:
"""Handle list-tracks request."""
service_tag = data.get("service")
title_id = data.get("title_id")
profile = data.get("profile")
if not service_tag:
return web.json_response({"status": "error", "message": "Missing required parameter: service"}, status=400)
if not title_id:
return web.json_response({"status": "error", "message": "Missing required parameter: title_id"}, status=400)
normalized_service = validate_service(service_tag)
if not normalized_service:
return web.json_response(
{"status": "error", "message": f"Invalid or unavailable service: {service_tag}"}, status=400
)
try:
import inspect
import click
import yaml
from unshackle.commands.dl import dl
from unshackle.core.config import config
from unshackle.core.utils.click_types import ContextData
from unshackle.core.utils.collections import merge_dict
service_config_path = Services.get_path(normalized_service) / config.filenames.config
if service_config_path.exists():
service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8"))
else:
service_config = {}
merge_dict(config.services.get(normalized_service), service_config)
@click.command()
@click.pass_context
def dummy_service(ctx: click.Context) -> None:
pass
# Handle proxy configuration
proxy_param = data.get("proxy")
no_proxy = data.get("no_proxy", False)
proxy_providers = []
if not no_proxy:
proxy_providers = initialize_proxy_providers()
if proxy_param and not no_proxy:
try:
resolved_proxy = resolve_proxy(proxy_param, proxy_providers)
proxy_param = resolved_proxy
except ValueError as e:
return web.json_response({"status": "error", "message": f"Proxy error: {e}"}, status=400)
ctx = click.Context(dummy_service)
ctx.obj = ContextData(config=service_config, cdm=None, proxy_providers=proxy_providers, profile=profile)
ctx.params = {"proxy": proxy_param, "no_proxy": no_proxy}
service_module = Services.load(normalized_service)
dummy_service.name = normalized_service
dummy_service.params = [click.Argument([title_id], type=str)]
ctx.invoked_subcommand = normalized_service
service_ctx = click.Context(dummy_service, parent=ctx)
service_ctx.obj = ctx.obj
service_kwargs = {"title": title_id}
# Add additional parameters from request data
for key, value in data.items():
if key not in ["service", "title_id", "profile", "season", "episode", "wanted", "proxy", "no_proxy"]:
service_kwargs[key] = value
# Get service parameter info and click command defaults
service_init_params = inspect.signature(service_module.__init__).parameters
# Extract default values from the click command
if hasattr(service_module, "cli") and hasattr(service_module.cli, "params"):
for param in service_module.cli.params:
if hasattr(param, "name") and param.name not in service_kwargs:
# Add default value if parameter is not already provided
if hasattr(param, "default") and param.default is not None:
service_kwargs[param.name] = param.default
# Handle required parameters that don't have click defaults
for param_name, param_info in service_init_params.items():
if param_name not in service_kwargs and param_name not in ["self", "ctx"]:
# Check if parameter is required (no default value in signature)
if param_info.default is inspect.Parameter.empty:
# Provide sensible defaults for common required parameters
if param_name == "meta_lang":
service_kwargs[param_name] = None
elif param_name == "movie":
service_kwargs[param_name] = False
else:
# Log warning for unknown required parameters
log.warning(f"Unknown required parameter '{param_name}' for service {normalized_service}")
# Filter out any parameters that the service doesn't accept
filtered_kwargs = {}
for key, value in service_kwargs.items():
if key in service_init_params:
filtered_kwargs[key] = value
service_instance = service_module(service_ctx, **filtered_kwargs)
cookies = dl.get_cookie_jar(normalized_service, profile)
credential = dl.get_credentials(normalized_service, profile)
service_instance.authenticate(cookies, credential)
titles = service_instance.get_titles()
wanted_param = data.get("wanted")
season = data.get("season")
episode = data.get("episode")
if hasattr(titles, "__iter__") and not isinstance(titles, str):
titles_list = list(titles)
wanted = None
if wanted_param:
from unshackle.core.utils.click_types import SeasonRange
try:
season_range = SeasonRange()
wanted = season_range.parse_tokens(wanted_param)
log.debug(f"Parsed wanted '{wanted_param}' into {len(wanted)} episodes: {wanted[:10]}...")
except Exception as e:
return web.json_response(
{"status": "error", "message": f"Invalid wanted parameter: {e}"}, status=400
)
elif season is not None and episode is not None:
wanted = [f"{season}x{episode}"]
if wanted:
# Filter titles based on wanted episodes, similar to how dl.py does it
matching_titles = []
log.debug(f"Filtering {len(titles_list)} titles with {len(wanted)} wanted episodes")
for title in titles_list:
if isinstance(title, Episode):
episode_key = f"{title.season}x{title.number}"
if episode_key in wanted:
log.debug(f"Episode {episode_key} matches wanted list")
matching_titles.append(title)
else:
log.debug(f"Episode {episode_key} not in wanted list")
else:
matching_titles.append(title)
log.debug(f"Found {len(matching_titles)} matching titles")
if not matching_titles:
return web.json_response(
{"status": "error", "message": "No episodes found matching wanted criteria"}, status=404
)
# If multiple episodes match, return tracks for all episodes
if len(matching_titles) > 1 and all(isinstance(t, Episode) for t in matching_titles):
episodes_data = []
failed_episodes = []
# Sort matching titles by season and episode number for consistent ordering
sorted_titles = sorted(matching_titles, key=lambda t: (t.season, t.number))
for title in sorted_titles:
try:
tracks = service_instance.get_tracks(title)
video_tracks = sorted(tracks.videos, key=lambda t: t.bitrate or 0, reverse=True)
audio_tracks = sorted(tracks.audio, key=lambda t: t.bitrate or 0, reverse=True)
episode_data = {
"title": serialize_title(title),
"video": [serialize_video_track(t) for t in video_tracks],
"audio": [serialize_audio_track(t) for t in audio_tracks],
"subtitles": [serialize_subtitle_track(t) for t in tracks.subtitles],
}
episodes_data.append(episode_data)
log.debug(f"Successfully got tracks for {title.season}x{title.number}")
except SystemExit:
# Service calls sys.exit() for unavailable episodes - catch and skip
failed_episodes.append(f"S{title.season}E{title.number:02d}")
log.debug(f"Episode {title.season}x{title.number} not available, skipping")
continue
except Exception as e:
# Handle other errors gracefully
failed_episodes.append(f"S{title.season}E{title.number:02d}")
log.debug(f"Error getting tracks for {title.season}x{title.number}: {e}")
continue
if episodes_data:
response = {"episodes": episodes_data}
if failed_episodes:
response["unavailable_episodes"] = failed_episodes
return web.json_response(response)
else:
return web.json_response(
{
"status": "error",
"message": f"No available episodes found. Unavailable: {', '.join(failed_episodes)}",
},
status=404,
)
else:
# Single episode or movie
first_title = matching_titles[0]
else:
first_title = titles_list[0]
else:
first_title = titles
tracks = service_instance.get_tracks(first_title)
video_tracks = sorted(tracks.videos, key=lambda t: t.bitrate or 0, reverse=True)
audio_tracks = sorted(tracks.audio, key=lambda t: t.bitrate or 0, reverse=True)
response = {
"title": serialize_title(first_title),
"video": [serialize_video_track(t) for t in video_tracks],
"audio": [serialize_audio_track(t) for t in audio_tracks],
"subtitles": [serialize_subtitle_track(t) for t in tracks.subtitles],
}
return web.json_response(response)
except Exception as e:
log.exception("Error listing tracks")
return web.json_response({"status": "error", "message": str(e)}, status=500)
async def download_handler(data: Dict[str, Any]) -> web.Response:
"""Handle download request - create and queue a download job."""
from unshackle.core.api.download_manager import get_download_manager
service_tag = data.get("service")
title_id = data.get("title_id")
if not service_tag:
return web.json_response({"status": "error", "message": "Missing required parameter: service"}, status=400)
if not title_id:
return web.json_response({"status": "error", "message": "Missing required parameter: title_id"}, status=400)
normalized_service = validate_service(service_tag)
if not normalized_service:
return web.json_response(
{"status": "error", "message": f"Invalid or unavailable service: {service_tag}"}, status=400
)
try:
# Get download manager and start workers if needed
manager = get_download_manager()
await manager.start_workers()
# Create download job with filtered parameters (exclude service and title_id as they're already passed)
filtered_params = {k: v for k, v in data.items() if k not in ["service", "title_id"]}
job = manager.create_job(normalized_service, title_id, **filtered_params)
return web.json_response(
{"job_id": job.job_id, "status": job.status.value, "created_time": job.created_time.isoformat()}, status=202
)
except Exception as e:
log.exception("Error creating download job")
return web.json_response({"status": "error", "message": str(e)}, status=500)
async def list_download_jobs_handler(data: Dict[str, Any]) -> web.Response:
"""Handle list download jobs request."""
from unshackle.core.api.download_manager import get_download_manager
try:
manager = get_download_manager()
jobs = manager.list_jobs()
job_list = [job.to_dict(include_full_details=False) for job in jobs]
return web.json_response({"jobs": job_list})
except Exception as e:
log.exception("Error listing download jobs")
return web.json_response({"status": "error", "message": str(e)}, status=500)
async def get_download_job_handler(job_id: str) -> web.Response:
"""Handle get specific download job request."""
from unshackle.core.api.download_manager import get_download_manager
try:
manager = get_download_manager()
job = manager.get_job(job_id)
if not job:
return web.json_response({"status": "error", "message": "Job not found"}, status=404)
return web.json_response(job.to_dict(include_full_details=True))
except Exception as e:
log.exception(f"Error getting download job {job_id}")
return web.json_response({"status": "error", "message": str(e)}, status=500)
async def cancel_download_job_handler(job_id: str) -> web.Response:
"""Handle cancel download job request."""
from unshackle.core.api.download_manager import get_download_manager
try:
manager = get_download_manager()
if not manager.get_job(job_id):
return web.json_response({"status": "error", "message": "Job not found"}, status=404)
success = manager.cancel_job(job_id)
if success:
return web.json_response({"status": "success", "message": "Job cancelled"})
else:
return web.json_response({"status": "error", "message": "Job cannot be cancelled"}, status=400)
except Exception as e:
log.exception(f"Error cancelling download job {job_id}")
return web.json_response({"status": "error", "message": str(e)}, status=500)

View File

@@ -0,0 +1,375 @@
import logging
from aiohttp import web
from aiohttp_swagger3 import SwaggerDocs, SwaggerInfo, SwaggerUiSettings
from unshackle.core import __version__
from unshackle.core.api.handlers import (
download_handler,
list_titles_handler,
list_tracks_handler,
list_download_jobs_handler,
get_download_job_handler,
cancel_download_job_handler,
)
from unshackle.core.services import Services
from unshackle.core.update_checker import UpdateChecker
log = logging.getLogger("api")
async def health(request: web.Request) -> web.Response:
"""
Health check endpoint.
---
summary: Health check
description: Get server health status, version info, and update availability
responses:
'200':
description: Health status
content:
application/json:
schema:
type: object
properties:
status:
type: string
example: ok
version:
type: string
example: "2.0.0"
update_check:
type: object
properties:
update_available:
type: boolean
nullable: true
current_version:
type: string
latest_version:
type: string
nullable: true
"""
try:
latest_version = await UpdateChecker.check_for_updates(__version__)
update_info = {
"update_available": latest_version is not None,
"current_version": __version__,
"latest_version": latest_version,
}
except Exception as e:
log.warning(f"Failed to check for updates: {e}")
update_info = {"update_available": None, "current_version": __version__, "latest_version": None}
return web.json_response({"status": "ok", "version": __version__, "update_check": update_info})
async def services(request: web.Request) -> web.Response:
"""
List available services.
---
summary: List services
description: Get all available streaming services with their details
responses:
'200':
description: List of services
content:
application/json:
schema:
type: object
properties:
services:
type: array
items:
type: object
properties:
tag:
type: string
aliases:
type: array
items:
type: string
geofence:
type: array
items:
type: string
title_regex:
type: string
nullable: true
help:
type: string
nullable: true
'500':
description: Server error
"""
try:
service_tags = Services.get_tags()
services_info = []
for tag in service_tags:
service_data = {"tag": tag, "aliases": [], "geofence": [], "title_regex": None, "help": None}
try:
service_module = Services.load(tag)
if hasattr(service_module, "ALIASES"):
service_data["aliases"] = list(service_module.ALIASES)
if hasattr(service_module, "GEOFENCE"):
service_data["geofence"] = list(service_module.GEOFENCE)
if hasattr(service_module, "TITLE_RE"):
service_data["title_regex"] = service_module.TITLE_RE
if service_module.__doc__:
service_data["help"] = service_module.__doc__.strip()
except Exception as e:
log.warning(f"Could not load details for service {tag}: {e}")
services_info.append(service_data)
return web.json_response({"services": services_info})
except Exception as e:
log.exception("Error listing services")
return web.json_response({"status": "error", "message": str(e)}, status=500)
async def list_titles(request: web.Request) -> web.Response:
"""
List titles for a service and title ID.
---
summary: List titles
description: Get available titles for a service and title ID
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- service
- title_id
properties:
service:
type: string
description: Service tag
title_id:
type: string
description: Title identifier
responses:
'200':
description: List of titles
'400':
description: Invalid request
"""
try:
data = await request.json()
except Exception:
return web.json_response({"status": "error", "message": "Invalid JSON request body"}, status=400)
return await list_titles_handler(data)
async def list_tracks(request: web.Request) -> web.Response:
"""
List tracks for a title, separated by type.
---
summary: List tracks
description: Get available video, audio, and subtitle tracks for a title
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- service
- title_id
properties:
service:
type: string
description: Service tag
title_id:
type: string
description: Title identifier
wanted:
type: string
description: Specific episode/season (optional)
proxy:
type: string
description: Proxy configuration (optional)
responses:
'200':
description: Track information
'400':
description: Invalid request
"""
try:
data = await request.json()
except Exception:
return web.json_response({"status": "error", "message": "Invalid JSON request body"}, status=400)
return await list_tracks_handler(data)
async def download(request: web.Request) -> web.Response:
"""
Download content based on provided parameters.
---
summary: Download content
description: Download video content based on specified parameters
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- service
- title_id
properties:
service:
type: string
description: Service tag
title_id:
type: string
description: Title identifier
responses:
'200':
description: Download started
'400':
description: Invalid request
"""
try:
data = await request.json()
except Exception:
return web.json_response({"status": "error", "message": "Invalid JSON request body"}, status=400)
return await download_handler(data)
async def download_jobs(request: web.Request) -> web.Response:
"""
List all download jobs.
---
summary: List download jobs
description: Get list of all download jobs with their status
responses:
'200':
description: List of download jobs
content:
application/json:
schema:
type: object
properties:
jobs:
type: array
items:
type: object
properties:
job_id:
type: string
status:
type: string
created_time:
type: string
service:
type: string
title_id:
type: string
progress:
type: number
'500':
description: Server error
"""
return await list_download_jobs_handler({})
async def download_job_detail(request: web.Request) -> web.Response:
"""
Get download job details.
---
summary: Get download job
description: Get detailed information about a specific download job
parameters:
- name: job_id
in: path
required: true
schema:
type: string
responses:
'200':
description: Download job details
'404':
description: Job not found
'500':
description: Server error
"""
job_id = request.match_info["job_id"]
return await get_download_job_handler(job_id)
async def cancel_download_job(request: web.Request) -> web.Response:
"""
Cancel download job.
---
summary: Cancel download job
description: Cancel a queued or running download job
parameters:
- name: job_id
in: path
required: true
schema:
type: string
responses:
'200':
description: Job cancelled successfully
'400':
description: Job cannot be cancelled
'404':
description: Job not found
'500':
description: Server error
"""
job_id = request.match_info["job_id"]
return await cancel_download_job_handler(job_id)
def setup_routes(app: web.Application) -> None:
"""Setup all API routes."""
app.router.add_get("/api/health", health)
app.router.add_get("/api/services", services)
app.router.add_post("/api/list-titles", list_titles)
app.router.add_post("/api/list-tracks", list_tracks)
app.router.add_post("/api/download", download)
app.router.add_get("/api/download/jobs", download_jobs)
app.router.add_get("/api/download/jobs/{job_id}", download_job_detail)
app.router.add_delete("/api/download/jobs/{job_id}", cancel_download_job)
def setup_swagger(app: web.Application) -> None:
"""Setup Swagger UI documentation."""
swagger = SwaggerDocs(
app,
swagger_ui_settings=SwaggerUiSettings(path="/api/docs/"),
info=SwaggerInfo(
title="Unshackle REST API",
version=__version__,
description="REST API for Unshackle - Modular Movie, TV, and Music Archival Software",
),
)
# Add routes with OpenAPI documentation
swagger.add_routes(
[
web.get("/api/health", health),
web.get("/api/services", services),
web.post("/api/list-titles", list_titles),
web.post("/api/list-tracks", list_tracks),
web.post("/api/download", download),
web.get("/api/download/jobs", download_jobs),
web.get("/api/download/jobs/{job_id}", download_job_detail),
web.delete("/api/download/jobs/{job_id}", cancel_download_job),
]
)

View File

@@ -89,7 +89,17 @@ class Episode(Title):
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
primary_audio_track = None
if media_info.audio_tracks:
sorted_audio = sorted(
media_info.audio_tracks,
key=lambda x: (
float(x.bit_rate) if x.bit_rate else 0,
bool(x.format_additionalfeatures and "JOC" in x.format_additionalfeatures)
),
reverse=True
)
primary_audio_track = sorted_audio[0]
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
# Title [Year] SXXEXX Name (or Title [Year] SXX if folder)

View File

@@ -52,7 +52,17 @@ class Movie(Title):
def get_filename(self, media_info: MediaInfo, folder: bool = False, show_service: bool = True) -> str:
primary_video_track = next(iter(media_info.video_tracks), None)
primary_audio_track = next(iter(media_info.audio_tracks), None)
primary_audio_track = None
if media_info.audio_tracks:
sorted_audio = sorted(
media_info.audio_tracks,
key=lambda x: (
float(x.bit_rate) if x.bit_rate else 0,
bool(x.format_additionalfeatures and "JOC" in x.format_additionalfeatures)
),
reverse=True
)
primary_audio_track = sorted_audio[0]
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
# Name (Year)

View File

@@ -12,6 +12,7 @@ class Audio(Track):
AAC = "AAC" # https://wikipedia.org/wiki/Advanced_Audio_Coding
AC3 = "DD" # https://wikipedia.org/wiki/Dolby_Digital
EC3 = "DD+" # https://wikipedia.org/wiki/Dolby_Digital_Plus
AC4 = "AC-4" # https://wikipedia.org/wiki/Dolby_AC-4
OPUS = "OPUS" # https://wikipedia.org/wiki/Opus_(audio_format)
OGG = "VORB" # https://wikipedia.org/wiki/Vorbis
DTS = "DTS" # https://en.wikipedia.org/wiki/DTS_(company)#DTS_Digital_Surround
@@ -31,6 +32,8 @@ class Audio(Track):
return Audio.Codec.AC3
if mime == "ec-3":
return Audio.Codec.EC3
if mime == "ac-4":
return Audio.Codec.AC4
if mime == "opus":
return Audio.Codec.OPUS
if mime == "dtsc":
@@ -60,6 +63,8 @@ class Audio(Track):
return Audio.Codec.AC3
if profile.startswith("ddplus"):
return Audio.Codec.EC3
if profile.startswith("ac4"):
return Audio.Codec.AC4
if profile.startswith("playready-oggvorbis"):
return Audio.Codec.OGG
raise ValueError(f"The Content Profile '{profile}' is not a supported Audio Codec")

View File

@@ -202,17 +202,16 @@ class Tracks:
"""Sort audio tracks by bitrate, descriptive, and optionally language."""
if not self.audio:
return
# bitrate
self.audio.sort(key=lambda x: float(x.bitrate or 0.0), reverse=True)
# descriptive
self.audio.sort(key=lambda x: str(x.language) if x.descriptive else "")
self.audio.sort(key=lambda x: x.descriptive)
# bitrate (within each descriptive group)
self.audio.sort(key=lambda x: float(x.bitrate or 0.0), reverse=True)
# language
for language in reversed(by_language or []):
if str(language) in ("all", "best"):
language = next((x.language for x in self.audio if x.is_original_lang), "")
if not language:
continue
self.audio.sort(key=lambda x: str(x.language))
self.audio.sort(key=lambda x: not is_close_match(language, [x.language]))
def sort_subtitles(self, by_language: Optional[Sequence[Union[str, Language]]] = None) -> None:

View File

@@ -259,6 +259,7 @@ subtitle:
# Configuration for pywidevine's serve functionality
serve:
api_secret: "your-secret-key-here"
users:
secret_key_for_user:
devices:

48
uv.lock generated
View File

@@ -80,6 +80,22 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/2b/d8/fa65d2a349fe938b76d309db1a56a75c4fb8cc7b17a398b698488a939903/aiohttp-3.12.15-cp312-cp312-win_amd64.whl", hash = "sha256:b390ef5f62bb508a9d67cb3bba9b8356e23b3996da7062f1a57ce1a79d2b3d34", size = 450266, upload-time = "2025-07-29T05:51:17.239Z" },
]
[[package]]
name = "aiohttp-swagger3"
version = "0.10.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiohttp" },
{ name = "attrs" },
{ name = "fastjsonschema" },
{ name = "pyyaml" },
{ name = "rfc3339-validator" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a1/06/00ccb2c8afdde4ca7c3cac424d54715c7d90cdd4e13e1ca71d68f5b2e665/aiohttp_swagger3-0.10.0.tar.gz", hash = "sha256:a333c59328f64dd64587e5f276ee84dc256f587d09f2da6ddaae3812fa4d4f33", size = 1839028, upload-time = "2025-02-11T10:51:26.974Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0a/8f/db4cb843999a3088846d170f38eda2182b50b5733387be8102fed171c53f/aiohttp_swagger3-0.10.0-py3-none-any.whl", hash = "sha256:0ae2d2ba7dbd8ea8fe1cffe8f0197db5d0aa979eb9679bd699ecd87923912509", size = 1826491, upload-time = "2025-02-11T10:51:25.174Z" },
]
[[package]]
name = "aiosignal"
version = "1.4.0"
@@ -468,6 +484,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/36/f4/c6e662dade71f56cd2f3735141b265c3c79293c109549c1e6933b0651ffc/exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10", size = 16674, upload-time = "2025-05-10T17:42:49.33Z" },
]
[[package]]
name = "fastjsonschema"
version = "2.19.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ba/7f/cedf77ace50aa60c566deaca9066750f06e1fcf6ad24f254d255bb976dd6/fastjsonschema-2.19.1.tar.gz", hash = "sha256:e3126a94bdc4623d3de4485f8d468a12f02a67921315ddc87836d6e456dc789d", size = 372732, upload-time = "2023-12-28T14:02:06.823Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9c/b9/79691036d4a8f9857e74d1728b23f34f583b81350a27492edda58d5604e1/fastjsonschema-2.19.1-py3-none-any.whl", hash = "sha256:3672b47bc94178c9f23dbb654bf47440155d4db9df5f7bc47643315f9c405cd0", size = 23388, upload-time = "2023-12-28T14:02:04.512Z" },
]
[[package]]
name = "filelock"
version = "3.19.1"
@@ -1252,6 +1277,18 @@ socks = [
{ name = "pysocks" },
]
[[package]]
name = "rfc3339-validator"
version = "0.1.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "six" },
]
sdist = { url = "https://files.pythonhosted.org/packages/28/ea/a9387748e2d111c3c2b275ba970b735e04e15cdb1eb30693b6b5708c4dbd/rfc3339_validator-0.1.4.tar.gz", hash = "sha256:138a2abdf93304ad60530167e51d2dfb9549521a836871b88d7f4695d0022f6b", size = 5513, upload-time = "2021-05-12T16:37:54.178Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7b/44/4e421b96b67b2daff264473f7465db72fbdf36a07e05494f50300cc7b0c6/rfc3339_validator-0.1.4-py2.py3-none-any.whl", hash = "sha256:24f6ec1eda14ef823da9e36ec7113124b39c04d50a4d3d3a3c2859577e7791fa", size = 3490, upload-time = "2021-05-12T16:37:52.536Z" },
]
[[package]]
name = "rich"
version = "13.9.4"
@@ -1358,6 +1395,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a3/dc/17031897dae0efacfea57dfd3a82fdd2a2aeb58e0ff71b77b87e44edc772/setuptools-80.9.0-py3-none-any.whl", hash = "sha256:062d34222ad13e0cc312a4c02d73f059e86a4acbfbdea8f8f76b28c99f306922", size = 1201486, upload-time = "2025-05-27T00:56:49.664Z" },
]
[[package]]
name = "six"
version = "1.17.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/94/e7/b2c673351809dca68a0e064b6af791aa332cf192da575fd474ed7d6f16a2/six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81", size = 34031, upload-time = "2024-12-04T17:35:28.174Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" },
]
[[package]]
name = "sniffio"
version = "1.3.1"
@@ -1502,6 +1548,7 @@ name = "unshackle"
version = "1.4.6"
source = { editable = "." }
dependencies = [
{ name = "aiohttp-swagger3" },
{ name = "appdirs" },
{ name = "brotli" },
{ name = "chardet" },
@@ -1551,6 +1598,7 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "aiohttp-swagger3", specifier = ">=0.9.0,<1" },
{ name = "appdirs", specifier = ">=1.4.4,<2" },
{ name = "brotli", specifier = ">=1.1.0,<2" },
{ name = "chardet", specifier = ">=5.2.0,<6" },