25 Commits
1.1.0 ... 1.3.0

Author SHA1 Message Date
Andy
63e9a78b2a chore: Bump version to 1.3.0 and update changelog with mp4decrypt support and enhancements 2025-08-03 06:26:24 +00:00
Andy
a2bfe47993 feat(drm): Add support for mp4decrypt as a decryption method
* Introduced a new configuration option for DRM decryption in `unshackle.yaml`.
* Updated the `decrypt` methods in `PlayReady` and `Widevine` classes to allow using `mp4decrypt`.
* Enhanced the `Config` class to manage decryption methods per service.
* Added `mp4decrypt` binary detection in the binaries module.
2025-08-03 06:23:43 +00:00
Andy
cf4dc1ce76 feat: Add unshackle-example.yaml to replace the unshackle.yaml file, you can now make changes to the unshackle.yaml file and pull from the the repo without issues. 2025-08-03 00:54:29 +00:00
Andy
40028c81d7 Merge branch 'feature/scene-naming-option' 2025-08-03 00:48:22 +00:00
Andy
06df10cb58 fix: rename 'servers' to 'server_map' for proxy configuration in unshackle.yaml to resolve nord/surfshark incorrect named config 2025-08-01 20:23:03 +00:00
Andy
d61bec4a8c feat: Add scene naming option to configuration and update naming logic in titles 2025-08-01 18:40:40 +00:00
Andy
058bb60502 feat: update path of update_check.json to .gitignore 2025-08-01 17:44:11 +00:00
Andy
7583129e8f feat: Enhance credential management and CDM configuration in unshackle.yaml 2025-08-01 17:41:19 +00:00
Andy
4691694d2e feat: Add Unspecified_Image option to Transfer enum in Video class.
The Transfer enum was missing value 2, which according to ITU-T H.Sup19 standards represents "Unspecified (Image
  characteristics are unknown or are determined by the application)". This value is often used for still image coding systems.
2025-08-01 17:10:55 +00:00
Andy
a07345a0a2 refactor: Replace log.exit calls with ValueError exceptions for error handling in Hybrid class 2025-07-31 23:48:22 +00:00
Andy
091d7335a3 feat: Implement terminal cleanup on exit and signal handling in ComfyConsole 2025-07-31 18:25:18 +00:00
Andy
8c798b95c4 fix: Correct URL handling and improve key retrieval logic in HTTP vault 2025-07-31 15:45:12 +00:00
Andy
46c28fe943 feat: Add update check interval configuration and implement rate limiting for update checks 2025-07-30 23:36:59 +00:00
Andy
22c9aa195e feat: Bump version to 1.2.0 and update changelog, I'll eventually learn symantic versioning. 2025-07-30 23:15:20 +00:00
Andy
776d8f3df0 feat: Update version to 1.1.1 and add update checking functionality 2025-07-30 23:12:13 +00:00
Andy
67caf71295 Merge branch 'hdr10ptest' 2025-07-30 22:49:01 +00:00
Andy
3ed76d199c chore(workflow): 🗑️ Remove Docker build and publish workflow, its too messy at the moment doing manual builds for now. 2025-07-30 22:48:00 +00:00
Andy
4de9251f95 feat(tracks): Add duration fix handling for video and hybrid tracks 2025-07-30 21:39:34 +00:00
Andy
d2fb409ad9 feat(hybrid): Add HDR10+ support for conversion to Dolby Vision and enhance metadata extraction 2025-07-30 21:14:50 +00:00
Andy
fdff3a1c56 refactor(env): Enhance dependency check with detailed categorization and status summary 2025-07-30 20:12:43 +00:00
Andy
5d1f2eb458 feat(attachment): Ensure temporary directory is created for downloads 2025-07-30 18:52:36 +00:00
Andy
3efac3d474 feat(vaults): Enhance vault loading with success status 2025-07-30 17:29:06 +00:00
Andy
f578904b76 feat(subtitle): Add information into unshackle.yaml on how to use new Subby subtitle conversion. 2025-07-30 02:18:35 +00:00
Andy
9f20159605 feat(hybrid): Display resolution of HDR10 track in hybrid mode console output and clean up unused code 2025-07-30 02:08:07 +00:00
Andy
4decb0d107 feat(dl): Enhance hybrid processing to handle HDR10 and DV tracks separately by resolution, Hotfix for -q 2160,1080 both tracks will have Hybrid correctly now. 2025-07-30 01:09:59 +00:00
27 changed files with 1133 additions and 541 deletions

View File

@@ -1,99 +0,0 @@
name: Build and Publish Docker Image
on:
push:
branches: [main, master]
paths: # run only when this file changed at all
- "unshackle/core/__init__.py"
pull_request: {} # optional delete if you dont build on PRs
workflow_dispatch: {} # manual override
jobs:
detect-version-change:
runs-on: ubuntu-latest
outputs:
changed: ${{ steps.vdiff.outputs.changed }}
version: ${{ steps.vdiff.outputs.version }}
steps:
- uses: actions/checkout@v4
with: { fetch-depth: 2 } # we need the previous commit :contentReference[oaicite:1]{index=1}
- name: Extract & compare version
id: vdiff
shell: bash
run: |
current=$(grep -oP '__version__ = "\K[^"]+' unshackle/core/__init__.py)
prev=$(git show HEAD^:unshackle/core/__init__.py \
| grep -oP '__version__ = "\K[^"]+' || echo '')
echo "version=$current" >>"$GITHUB_OUTPUT"
echo "changed=$([ "$current" != "$prev" ] && echo true || echo false)" >>"$GITHUB_OUTPUT"
echo "Current=$current Previous=$prev"
build-and-push:
needs: detect-version-change
if: needs.detect-version-change.outputs.changed == 'true' # only run when bumped :contentReference[oaicite:2]{index=2}
runs-on: ubuntu-latest
permissions: { contents: read, packages: write }
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Extract version from __init__.py
id: version
run: |
VERSION=$(grep -oP '__version__ = "\K[^"]+' unshackle/core/__init__.py)
echo "version=$VERSION" >> $GITHUB_OUTPUT
echo "major_minor=$(echo $VERSION | cut -d. -f1-2)" >> $GITHUB_OUTPUT
echo "major=$(echo $VERSION | cut -d. -f1)" >> $GITHUB_OUTPUT
echo "Extracted version: $VERSION"
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Container Registry
if: github.event_name != 'pull_request'
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
type=raw,value=latest,enable={{is_default_branch}}
type=raw,value=v${{ steps.version.outputs.version }},enable={{is_default_branch}}
type=raw,value=${{ steps.version.outputs.version }},enable={{is_default_branch}}
type=raw,value=${{ steps.version.outputs.major_minor }},enable={{is_default_branch}}
type=raw,value=${{ steps.version.outputs.major }},enable={{is_default_branch}}
- name: Show planned tags
run: |
echo "Planning to create the following tags:"
echo "${{ steps.meta.outputs.tags }}"
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
platforms: linux/amd64,linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Test Docker image
if: github.event_name != 'pull_request'
run: |
docker run --rm ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest env check

1
.gitignore vendored
View File

@@ -1,6 +1,7 @@
# unshackle
unshackle.yaml
unshackle.yml
update_check.json
*.mkv
*.mp4
*.exe

View File

@@ -5,7 +5,64 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [1.3.0] - 2025-08-03
### Added
- **mp4decrypt Support**: Alternative DRM decryption method using mp4decrypt from Bento4
- Added `mp4decrypt` binary detection and support in binaries module
- New `decryption` configuration option in unshackle.yaml for service-specific decryption methods
- Enhanced PlayReady and Widevine DRM classes with mp4decrypt decryption support
- Service-specific decryption mapping allows choosing between `shaka` and `mp4decrypt` per service
- Improved error handling and progress reporting for mp4decrypt operations
### Changed
- **DRM Decryption Architecture**: Enhanced decryption system with dual method support
- Updated `dl.py` to handle service-specific decryption method selection
- Refactored `Config` class to manage decryption method mapping per service
- Enhanced DRM decrypt methods with `use_mp4decrypt` parameter for method selection
### Fixed
- **Service Track Filtering**: Cleaned up ATVP service to remove unnecessary track filtering
- Simplified track return logic to pass all tracks to dl.py for centralized filtering
- Removed unused codec and quality filter parameters from service initialization
## [1.2.0] - 2025-07-30
### Added
- **Update Checker**: Automatic GitHub release version checking on startup
- Configurable update notifications via `update_checks` setting in unshackle.yaml
- Non-blocking HTTP requests with 5-second timeout for performance
- Smart semantic version comparison supporting all version formats (x.y.z, x.y, x)
- Graceful error handling for network issues and API failures
- User-friendly update notifications with current → latest version display
- Direct links to GitHub releases page for easy updates
- **HDR10+ Support**: Enhanced HDR10+ metadata processing for hybrid tracks
- HDR10+ tool binary support (`hdr10plus_tool`) added to binaries module
- HDR10+ to Dolby Vision conversion capabilities in hybrid processing
- Enhanced metadata extraction for HDR10+ content
- **Duration Fix Handling**: Added duration correction for video and hybrid tracks
- **Temporary Directory Management**: Automatic creation of temp directories for attachment downloads
### Changed
- Enhanced configuration system with new `update_checks` boolean option (defaults to true)
- Updated sample unshackle.yaml with update checker configuration documentation
- Improved console styling consistency using `bright_black` for dimmed text
- **Environment Dependency Check**: Complete overhaul with detailed categorization and status summary
- Organized dependencies by category (Core, HDR, Download, Subtitle, Player, Network)
- Enhanced status reporting with compact summary display
- Improved tool requirement tracking and missing dependency alerts
- **Hybrid Track Processing**: Significant improvements to HDR10+ and Dolby Vision handling
- Enhanced metadata extraction and processing workflows
- Better integration with HDR processing tools
### Removed
- **Docker Workflow**: Removed Docker build and publish GitHub Actions workflow for manual builds
## [1.1.0] - 2025-07-29

View File

@@ -213,6 +213,37 @@ downloader:
The `default` entry is optional. If omitted, `requests` will be used for services not listed.
## decryption (str | dict)
Choose what software to use to decrypt DRM-protected content throughout unshackle where needed.
You may provide a single decryption method globally or a mapping of service tags to
decryption methods.
Options:
- `shaka` (default) - Shaka Packager - <https://github.com/shaka-project/shaka-packager>
- `mp4decrypt` - mp4decrypt from Bento4 - <https://github.com/axiomatic-systems/Bento4>
Note that Shaka Packager is the traditional method and works with most services. mp4decrypt
is an alternative that may work better with certain services that have specific encryption formats.
Example mapping:
```yaml
decryption:
ATVP: mp4decrypt
AMZN: shaka
default: shaka
```
The `default` entry is optional. If omitted, `shaka` will be used for services not listed.
Simple configuration (single method for all services):
```yaml
decryption: mp4decrypt
```
## filenames (dict)
Override the default filenames used across unshackle.

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "unshackle"
version = "1.1.0"
version = "1.3.0"
description = "Modular Movie, TV, and Music Archival Software."
authors = [{ name = "unshackle team" }]
requires-python = ">=3.10,<3.13"

View File

@@ -295,11 +295,41 @@ class dl:
with console.status("Loading Key Vaults...", spinner="dots"):
self.vaults = Vaults(self.service)
total_vaults = len(config.key_vaults)
failed_vaults = []
for vault in config.key_vaults:
vault_type = vault["type"]
del vault["type"]
self.vaults.load(vault_type, **vault)
self.log.info(f"Loaded {len(self.vaults)} Vaults")
vault_name = vault.get("name", vault_type)
vault_copy = vault.copy()
del vault_copy["type"]
if vault_type.lower() == "sqlite":
try:
self.vaults.load_critical(vault_type, **vault_copy)
self.log.debug(f"Successfully loaded vault: {vault_name} ({vault_type})")
except Exception as e:
self.log.error(f"vault failure: {vault_name} ({vault_type}) - {e}")
raise
else:
# Other vaults (MySQL, HTTP, API) - soft fail
if not self.vaults.load(vault_type, **vault_copy):
failed_vaults.append(vault_name)
self.log.debug(f"Failed to load vault: {vault_name} ({vault_type})")
else:
self.log.debug(f"Successfully loaded vault: {vault_name} ({vault_type})")
loaded_count = len(self.vaults)
if failed_vaults:
self.log.warning(f"Failed to load {len(failed_vaults)} vault(s): {', '.join(failed_vaults)}")
self.log.info(f"Loaded {loaded_count}/{total_vaults} Vaults")
# Debug: Show detailed vault status
if loaded_count > 0:
vault_names = [vault.name for vault in self.vaults]
self.log.debug(f"Active vaults: {', '.join(vault_names)}")
else:
self.log.debug("No vaults are currently active")
self.proxy_providers = []
if no_proxy:
@@ -403,6 +433,7 @@ class dl:
# Check if dovi_tool is available when hybrid mode is requested
if any(r == Video.Range.HYBRID for r in range_):
from unshackle.core.binaries import DoviTool
if not DoviTool:
self.log.error("Unable to run hybrid mode: dovi_tool not detected")
self.log.error("Please install dovi_tool from https://github.com/quietvoid/dovi_tool")
@@ -734,7 +765,8 @@ class dl:
DOWNLOAD_LICENCE_ONLY.set()
try:
with Live(Padding(download_table, (1, 5)), console=console, refresh_per_second=5):
# Use transient mode to prevent display remnants
with Live(Padding(download_table, (1, 5)), console=console, refresh_per_second=5, transient=True):
with ThreadPoolExecutor(downloads) as pool:
for download in futures.as_completed(
(
@@ -880,6 +912,31 @@ class dl:
if font_count:
self.log.info(f"Attached {font_count} fonts for the Subtitles")
# Handle DRM decryption BEFORE repacking (must decrypt first!)
service_name = service.__class__.__name__.upper()
decryption_method = config.decryption_map.get(service_name, config.decryption)
use_mp4decrypt = decryption_method.lower() == "mp4decrypt"
if use_mp4decrypt:
decrypt_tool = "mp4decrypt"
else:
decrypt_tool = "Shaka Packager"
drm_tracks = [track for track in title.tracks if track.drm]
if drm_tracks:
with console.status(f"Decrypting tracks with {decrypt_tool}..."):
has_decrypted = False
for track in drm_tracks:
for drm in track.drm:
if hasattr(drm, "decrypt"):
drm.decrypt(track.path, use_mp4decrypt=use_mp4decrypt)
has_decrypted = True
events.emit(events.Types.TRACK_REPACKED, track=track)
break
if has_decrypted:
self.log.info(f"Decrypted tracks with {decrypt_tool}")
# Now repack the decrypted tracks
with console.status("Repackaging tracks with FFMPEG..."):
has_repacked = False
for track in title.tracks:
@@ -907,32 +964,59 @@ class dl:
# Check if we're in hybrid mode
if any(r == Video.Range.HYBRID for r in range_) and title.tracks.videos:
# Hybrid mode: process DV and HDR10 tracks together
# Hybrid mode: process DV and HDR10 tracks separately for each resolution
self.log.info("Processing Hybrid HDR10+DV tracks...")
# Run the hybrid processing
Hybrid(title.tracks.videos, self.service)
# Group video tracks by resolution
resolutions_processed = set()
hdr10_tracks = [v for v in title.tracks.videos if v.range == Video.Range.HDR10]
dv_tracks = [v for v in title.tracks.videos if v.range == Video.Range.DV]
# After hybrid processing, the output file should be in temp directory
hybrid_output_path = config.directories.temp / "HDR10-DV.hevc"
for hdr10_track in hdr10_tracks:
resolution = hdr10_track.height
if resolution in resolutions_processed:
continue
resolutions_processed.add(resolution)
# Create a single mux task for the hybrid output
task_description = "Multiplexing Hybrid HDR10+DV"
task_id = progress.add_task(f"{task_description}...", total=None, start=False)
# Find matching DV track for this resolution (use the lowest DV resolution)
matching_dv = min(dv_tracks, key=lambda v: v.height) if dv_tracks else None
# Create tracks with the hybrid video output
task_tracks = Tracks(title.tracks) + title.tracks.chapters + title.tracks.attachments
if matching_dv:
# Create track pair for this resolution
resolution_tracks = [hdr10_track, matching_dv]
# Create a new video track for the hybrid output
# Use the HDR10 track as a template but update its path
hdr10_track = next((v for v in title.tracks.videos if v.range == Video.Range.HDR10), None)
if hdr10_track:
hybrid_track = deepcopy(hdr10_track)
hybrid_track.path = hybrid_output_path
hybrid_track.range = Video.Range.DV # It's now a DV track
task_tracks.videos = [hybrid_track]
for track in resolution_tracks:
track.needs_duration_fix = True
multiplex_tasks.append((task_id, task_tracks))
# Run the hybrid processing for this resolution
Hybrid(resolution_tracks, self.service)
# Create unique output filename for this resolution
hybrid_filename = f"HDR10-DV-{resolution}p.hevc"
hybrid_output_path = config.directories.temp / hybrid_filename
# The Hybrid class creates HDR10-DV.hevc, rename it for this resolution
default_output = config.directories.temp / "HDR10-DV.hevc"
if default_output.exists():
shutil.move(str(default_output), str(hybrid_output_path))
# Create a mux task for this resolution
task_description = f"Multiplexing Hybrid HDR10+DV {resolution}p"
task_id = progress.add_task(f"{task_description}...", total=None, start=False)
# Create tracks with the hybrid video output for this resolution
task_tracks = Tracks(title.tracks) + title.tracks.chapters + title.tracks.attachments
# Create a new video track for the hybrid output
hybrid_track = deepcopy(hdr10_track)
hybrid_track.path = hybrid_output_path
hybrid_track.range = Video.Range.DV # It's now a DV track
hybrid_track.needs_duration_fix = True
task_tracks.videos = [hybrid_track]
multiplex_tasks.append((task_id, task_tracks))
console.print()
else:
# Normal mode: process each video track separately
for video_track in title.tracks.videos or [None]:
@@ -951,7 +1035,7 @@ class dl:
multiplex_tasks.append((task_id, task_tracks))
with Live(Padding(progress, (0, 5, 1, 5)), console=console):
with Live(Padding(progress, (0, 5, 1, 5)), console=console, transient=True):
for task_id, task_tracks in multiplex_tasks:
progress.start_task(task_id) # TODO: Needed?
muxed_path, return_code, errors = task_tracks.mux(

View File

@@ -25,52 +25,134 @@ def env() -> None:
@env.command()
def check() -> None:
"""Checks environment for the required dependencies."""
table = Table(title="Dependencies", expand=True)
table.add_column("Name", no_wrap=True)
table.add_column("Required", justify="center")
table.add_column("Installed", justify="center")
table.add_column("Path", no_wrap=False, overflow="fold")
# Define all dependencies with their binary objects and required status
dependencies = [
{"name": "FFMpeg", "binary": binaries.FFMPEG, "required": True},
{"name": "FFProbe", "binary": binaries.FFProbe, "required": True},
{"name": "shaka-packager", "binary": binaries.ShakaPackager, "required": True},
{"name": "MKVToolNix", "binary": binaries.MKVToolNix, "required": True},
{"name": "Mkvpropedit", "binary": binaries.Mkvpropedit, "required": True},
{"name": "CCExtractor", "binary": binaries.CCExtractor, "required": False},
{"name": "FFPlay", "binary": binaries.FFPlay, "required": False},
{"name": "SubtitleEdit", "binary": binaries.SubtitleEdit, "required": False},
{"name": "Aria2(c)", "binary": binaries.Aria2, "required": False},
{"name": "HolaProxy", "binary": binaries.HolaProxy, "required": False},
{"name": "MPV", "binary": binaries.MPV, "required": False},
{"name": "Caddy", "binary": binaries.Caddy, "required": False},
{"name": "N_m3u8DL-RE", "binary": binaries.N_m3u8DL_RE, "required": False},
{"name": "dovi_tool", "binary": binaries.DoviTool, "required": False},
# Define all dependencies
all_deps = [
# Core Media Tools
{"name": "FFmpeg", "binary": binaries.FFMPEG, "required": True, "desc": "Media processing", "cat": "Core"},
{"name": "FFprobe", "binary": binaries.FFProbe, "required": True, "desc": "Media analysis", "cat": "Core"},
{"name": "MKVToolNix", "binary": binaries.MKVToolNix, "required": True, "desc": "MKV muxing", "cat": "Core"},
{
"name": "mkvpropedit",
"binary": binaries.Mkvpropedit,
"required": True,
"desc": "MKV metadata",
"cat": "Core",
},
{
"name": "shaka-packager",
"binary": binaries.ShakaPackager,
"required": True,
"desc": "DRM decryption",
"cat": "DRM",
},
{
"name": "mp4decrypt",
"binary": binaries.Mp4decrypt,
"required": False,
"desc": "DRM decryption",
"cat": "DRM",
},
# HDR Processing
{"name": "dovi_tool", "binary": binaries.DoviTool, "required": False, "desc": "Dolby Vision", "cat": "HDR"},
{
"name": "HDR10Plus_tool",
"binary": binaries.HDR10PlusTool,
"required": False,
"desc": "HDR10+ metadata",
"cat": "HDR",
},
# Downloaders
{"name": "aria2c", "binary": binaries.Aria2, "required": False, "desc": "Multi-thread DL", "cat": "Download"},
{
"name": "N_m3u8DL-RE",
"binary": binaries.N_m3u8DL_RE,
"required": False,
"desc": "HLS/DASH/ISM",
"cat": "Download",
},
# Subtitle Tools
{
"name": "SubtitleEdit",
"binary": binaries.SubtitleEdit,
"required": False,
"desc": "Sub conversion",
"cat": "Subtitle",
},
{
"name": "CCExtractor",
"binary": binaries.CCExtractor,
"required": False,
"desc": "CC extraction",
"cat": "Subtitle",
},
# Media Players
{"name": "FFplay", "binary": binaries.FFPlay, "required": False, "desc": "Simple player", "cat": "Player"},
{"name": "MPV", "binary": binaries.MPV, "required": False, "desc": "Advanced player", "cat": "Player"},
# Network Tools
{
"name": "HolaProxy",
"binary": binaries.HolaProxy,
"required": False,
"desc": "Proxy service",
"cat": "Network",
},
{"name": "Caddy", "binary": binaries.Caddy, "required": False, "desc": "Web server", "cat": "Network"},
]
for dep in dependencies:
# Track overall status
all_required_installed = True
total_installed = 0
total_required = 0
missing_required = []
# Create a single table
table = Table(
title="Environment Dependencies", title_style="bold", show_header=True, header_style="bold", expand=False
)
table.add_column("Category", style="bold cyan", width=10)
table.add_column("Tool", width=16)
table.add_column("Status", justify="center", width=10)
table.add_column("Req", justify="center", width=4)
table.add_column("Purpose", style="bright_black", width=20)
last_cat = None
for dep in all_deps:
path = dep["binary"]
# Required column
if dep["required"]:
required = "[red]Yes[/red]"
else:
required = "No"
# Category column (only show when it changes)
category = dep["cat"] if dep["cat"] != last_cat else ""
last_cat = dep["cat"]
# Installed column
# Status
if path:
installed = "[green]:heavy_check_mark:[/green]"
path_output = str(path)
status = "[green][/green]"
total_installed += 1
else:
installed = "[red]:x:[/red]"
path_output = "Not Found"
status = "[red][/red]"
if dep["required"]:
all_required_installed = False
missing_required.append(dep["name"])
# Add to the table
table.add_row(dep["name"], required, installed, path_output)
if dep["required"]:
total_required += 1
# Display the result
console.print(Padding(table, (1, 5)))
# Required column (compact)
req = "[red]Y[/red]" if dep["required"] else "[bright_black]-[/bright_black]"
# Add row
table.add_row(category, dep["name"], status, req, dep["desc"])
console.print(Padding(table, (1, 2)))
# Compact summary
summary_parts = [f"[bold]Total:[/bold] {total_installed}/{len(all_deps)}"]
if all_required_installed:
summary_parts.append("[green]All required tools installed ✓[/green]")
else:
summary_parts.append(f"[red]Missing required: {', '.join(missing_required)}[/red]")
console.print(Padding(" ".join(summary_parts), (1, 2)))
@env.command()
@@ -86,7 +168,7 @@ def info() -> None:
tree.add(f"[repr.number]{i}.[/] [text2]{path.resolve()}[/]")
console.print(Padding(tree, (0, 5)))
table = Table(title="Directories", expand=True)
table = Table(title="Directories", title_style="bold", expand=True)
table.add_column("Name", no_wrap=True)
table.add_column("Path", no_wrap=False, overflow="fold")

View File

@@ -46,7 +46,8 @@ def copy(to_vault: str, from_vaults: list[str], service: Optional[str] = None) -
vault_type = vault["type"]
vault_args = vault.copy()
del vault_args["type"]
vaults.load(vault_type, **vault_args)
if not vaults.load(vault_type, **vault_args):
raise click.ClickException(f"Failed to load vault ({vault_name}).")
to_vault: Vault = vaults.vaults[0]
from_vaults: list[Vault] = vaults.vaults[1:]

View File

@@ -1 +1 @@
__version__ = "1.1.0"
__version__ = "1.3.0"

View File

@@ -15,6 +15,7 @@ from unshackle.core.commands import Commands
from unshackle.core.config import config
from unshackle.core.console import ComfyRichHandler, console
from unshackle.core.constants import context_settings
from unshackle.core.update_checker import UpdateChecker
from unshackle.core.utilities import rotate_log_file
LOGGING_PATH = None
@@ -79,6 +80,22 @@ def main(version: bool, debug: bool, log_path: Path) -> None:
if version:
return
if config.update_checks:
try:
latest_version = UpdateChecker.check_for_updates_sync(__version__)
if latest_version:
console.print(
f"\n[yellow]⚠️ Update available![/yellow] "
f"Current: {__version__} → Latest: [green]{latest_version}[/green]",
justify="center",
)
console.print(
"Visit: https://github.com/unshackle-dl/unshackle/releases/latest\n",
justify="center",
)
except Exception:
pass
@atexit.register
def save_log():

View File

@@ -52,6 +52,8 @@ N_m3u8DL_RE = find("N_m3u8DL-RE", "n-m3u8dl-re")
MKVToolNix = find("mkvmerge")
Mkvpropedit = find("mkvpropedit")
DoviTool = find("dovi_tool")
HDR10PlusTool = find("hdr10plus_tool", "HDR10Plus_tool")
Mp4decrypt = find("mp4decrypt")
__all__ = (
@@ -69,5 +71,7 @@ __all__ = (
"MKVToolNix",
"Mkvpropedit",
"DoviTool",
"HDR10PlusTool",
"Mp4decrypt",
"find",
)

View File

@@ -75,9 +75,20 @@ class Config:
self.proxy_providers: dict = kwargs.get("proxy_providers") or {}
self.serve: dict = kwargs.get("serve") or {}
self.services: dict = kwargs.get("services") or {}
decryption_cfg = kwargs.get("decryption") or {}
if isinstance(decryption_cfg, dict):
self.decryption_map = {k.upper(): v for k, v in decryption_cfg.items()}
self.decryption = self.decryption_map.get("DEFAULT", "shaka")
else:
self.decryption_map = {}
self.decryption = decryption_cfg or "shaka"
self.set_terminal_bg: bool = kwargs.get("set_terminal_bg", False)
self.tag: str = kwargs.get("tag") or ""
self.tmdb_api_key: str = kwargs.get("tmdb_api_key") or ""
self.update_checks: bool = kwargs.get("update_checks", True)
self.update_check_interval: int = kwargs.get("update_check_interval", 24)
self.scene_naming: bool = kwargs.get("scene_naming", True)
@classmethod
def from_yaml(cls, path: Path) -> Config:

View File

@@ -1,4 +1,7 @@
import atexit
import logging
import signal
import sys
from datetime import datetime
from types import ModuleType
from typing import IO, Callable, Iterable, List, Literal, Mapping, Optional, Union
@@ -167,6 +170,8 @@ class ComfyConsole(Console):
time.monotonic.
"""
_cleanup_registered = False
def __init__(
self,
*,
@@ -233,6 +238,9 @@ class ComfyConsole(Console):
if log_renderer:
self._log_render = log_renderer
# Register terminal cleanup handlers
self._register_cleanup()
def status(
self,
status: RenderableType,
@@ -283,6 +291,38 @@ class ComfyConsole(Console):
return status_renderable
def _register_cleanup(self):
"""Register terminal cleanup handlers."""
if not ComfyConsole._cleanup_registered:
ComfyConsole._cleanup_registered = True
# Register cleanup on normal exit
atexit.register(self._cleanup_terminal)
# Register cleanup on signals
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _cleanup_terminal(self):
"""Restore terminal to a clean state."""
try:
# Show cursor using ANSI escape codes
sys.stdout.write("\x1b[?25h") # Show cursor
sys.stdout.write("\x1b[0m") # Reset attributes
sys.stdout.flush()
# Also use Rich's method
self.show_cursor(True)
except Exception:
# Silently fail if cleanup fails
pass
def _signal_handler(self, signum, frame):
"""Handle signals with cleanup."""
self._cleanup_terminal()
# Exit after cleanup
sys.exit(1)
catppuccin_mocha = {
# Colors based on "CatppuccinMocha" from Gogh themes

View File

@@ -187,14 +187,69 @@ class PlayReady:
if not self.content_keys:
raise PlayReady.Exceptions.EmptyLicense("No Content Keys were within the License")
def decrypt(self, path: Path) -> None:
def decrypt(self, path: Path, use_mp4decrypt: bool = False) -> None:
"""
Decrypt a Track with PlayReady DRM.
Args:
path: Path to the encrypted file to decrypt
use_mp4decrypt: If True, use mp4decrypt instead of Shaka Packager
Raises:
EnvironmentError if the required decryption executable could not be found.
ValueError if the track has not yet been downloaded.
SubprocessError if the decryption process returned a non-zero exit code.
"""
if not self.content_keys:
raise ValueError("Cannot decrypt a Track without any Content Keys...")
if not binaries.ShakaPackager:
raise EnvironmentError("Shaka Packager executable not found but is required.")
if not path or not path.exists():
raise ValueError("Tried to decrypt a file that does not exist.")
if use_mp4decrypt:
return self._decrypt_with_mp4decrypt(path)
else:
return self._decrypt_with_shaka_packager(path)
def _decrypt_with_mp4decrypt(self, path: Path) -> None:
"""Decrypt using mp4decrypt"""
if not binaries.Mp4decrypt:
raise EnvironmentError("mp4decrypt executable not found but is required.")
output_path = path.with_stem(f"{path.stem}_decrypted")
# Build key arguments
key_args = []
for kid, key in self.content_keys.items():
kid_hex = kid.hex if hasattr(kid, "hex") else str(kid).replace("-", "")
key_hex = key if isinstance(key, str) else key.hex()
key_args.extend(["--key", f"{kid_hex}:{key_hex}"])
cmd = [
str(binaries.Mp4decrypt),
"--show-progress",
*key_args,
str(path),
str(output_path),
]
try:
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else f"mp4decrypt failed with exit code {e.returncode}"
raise subprocess.CalledProcessError(e.returncode, cmd, output=e.stdout, stderr=error_msg)
if not output_path.exists():
raise RuntimeError(f"mp4decrypt failed: output file {output_path} was not created")
if output_path.stat().st_size == 0:
raise RuntimeError(f"mp4decrypt failed: output file {output_path} is empty")
path.unlink()
shutil.move(output_path, path)
def _decrypt_with_shaka_packager(self, path: Path) -> None:
"""Decrypt using Shaka Packager (original method)"""
if not binaries.ShakaPackager:
raise EnvironmentError("Shaka Packager executable not found but is required.")
output_path = path.with_stem(f"{path.stem}_decrypted")
config.directories.temp.mkdir(parents=True, exist_ok=True)

View File

@@ -227,22 +227,69 @@ class Widevine:
finally:
cdm.close(session_id)
def decrypt(self, path: Path) -> None:
def decrypt(self, path: Path, use_mp4decrypt: bool = False) -> None:
"""
Decrypt a Track with Widevine DRM.
Args:
path: Path to the encrypted file to decrypt
use_mp4decrypt: If True, use mp4decrypt instead of Shaka Packager
Raises:
EnvironmentError if the Shaka Packager executable could not be found.
EnvironmentError if the required decryption executable could not be found.
ValueError if the track has not yet been downloaded.
SubprocessError if Shaka Packager returned a non-zero exit code.
SubprocessError if the decryption process returned a non-zero exit code.
"""
if not self.content_keys:
raise ValueError("Cannot decrypt a Track without any Content Keys...")
if not binaries.ShakaPackager:
raise EnvironmentError("Shaka Packager executable not found but is required.")
if not path or not path.exists():
raise ValueError("Tried to decrypt a file that does not exist.")
if use_mp4decrypt:
return self._decrypt_with_mp4decrypt(path)
else:
return self._decrypt_with_shaka_packager(path)
def _decrypt_with_mp4decrypt(self, path: Path) -> None:
"""Decrypt using mp4decrypt"""
if not binaries.Mp4decrypt:
raise EnvironmentError("mp4decrypt executable not found but is required.")
output_path = path.with_stem(f"{path.stem}_decrypted")
# Build key arguments
key_args = []
for kid, key in self.content_keys.items():
kid_hex = kid.hex if hasattr(kid, "hex") else str(kid).replace("-", "")
key_hex = key if isinstance(key, str) else key.hex()
key_args.extend(["--key", f"{kid_hex}:{key_hex}"])
cmd = [
str(binaries.Mp4decrypt),
"--show-progress",
*key_args,
str(path),
str(output_path),
]
try:
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else f"mp4decrypt failed with exit code {e.returncode}"
raise subprocess.CalledProcessError(e.returncode, cmd, output=e.stdout, stderr=error_msg)
if not output_path.exists():
raise RuntimeError(f"mp4decrypt failed: output file {output_path} was not created")
if output_path.stat().st_size == 0:
raise RuntimeError(f"mp4decrypt failed: output file {output_path} is empty")
path.unlink()
shutil.move(output_path, path)
def _decrypt_with_shaka_packager(self, path: Path) -> None:
"""Decrypt using Shaka Packager (original method)"""
if not binaries.ShakaPackager:
raise EnvironmentError("Shaka Packager executable not found but is required.")
output_path = path.with_stem(f"{path.stem}_decrypted")
config.directories.temp.mkdir(parents=True, exist_ok=True)

View File

@@ -107,75 +107,86 @@ class Episode(Title):
name=self.name or "",
).strip()
# Resolution
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")]
if len(aspect_ratio) == 1:
# e.g., aspect ratio of 2 (2.00:1) would end up as `(2.0,)`, add 1
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
# We want the resolution represented in a 4:3 or 16:9 canvas.
# If it's not 4:3 or 16:9, calculate as if it's inside a 16:9 canvas,
# otherwise the track's height value is fine.
# We are assuming this title is some weird aspect ratio so most
# likely a movie or HD source, so it's most likely widescreen so
# 16:9 canvas makes the most sense.
resolution = int(primary_video_track.width * (9 / 16))
name += f" {resolution}p"
if config.scene_naming:
# Resolution
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [
int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")
]
if len(aspect_ratio) == 1:
# e.g., aspect ratio of 2 (2.00:1) would end up as `(2.0,)`, add 1
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
# We want the resolution represented in a 4:3 or 16:9 canvas.
# If it's not 4:3 or 16:9, calculate as if it's inside a 16:9 canvas,
# otherwise the track's height value is fine.
# We are assuming this title is some weird aspect ratio so most
# likely a movie or HD source, so it's most likely widescreen so
# 16:9 canvas makes the most sense.
resolution = int(primary_video_track.width * (9 / 16))
name += f" {resolution}p"
# Service
if show_service:
name += f" {self.service.__name__}"
# Service
if show_service:
name += f" {self.service.__name__}"
# 'WEB-DL'
name += " WEB-DL"
# 'WEB-DL'
name += " WEB-DL"
# DUAL
if unique_audio_languages == 2:
name += " DUAL"
# DUAL
if unique_audio_languages == 2:
name += " DUAL"
# MULTi
if unique_audio_languages > 2:
name += " MULTi"
# MULTi
if unique_audio_languages > 2:
name += " MULTi"
# Audio Codec + Channels (+ feature)
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or primary_audio_track.joc:
name += " Atmos"
# Video (dynamic range + hfr +) Codec
if primary_video_track:
codec = primary_video_track.format
hdr_format = primary_video_track.hdr_format_commercial
trc = primary_video_track.transfer_characteristics or primary_video_track.transfer_characteristics_original
frame_rate = float(primary_video_track.frame_rate)
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
if (primary_video_track.hdr_format_commercial) != "Dolby Vision":
name += f" DV {DYNAMIC_RANGE_MAP.get(hdr_format)} "
# Audio Codec + Channels (+ feature)
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(
sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" "))
)
else:
name += f" {DYNAMIC_RANGE_MAP.get(hdr_format)} "
elif trc and "HLG" in trc:
name += " HLG"
if frame_rate > 30:
name += " HFR"
name += f" {VIDEO_CODEC_MAP.get(codec, codec)}"
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
if config.tag:
name += f"-{config.tag}"
features = primary_audio_track.format_additionalfeatures or ""
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or primary_audio_track.joc:
name += " Atmos"
return sanitize_filename(name)
# Video (dynamic range + hfr +) Codec
if primary_video_track:
codec = primary_video_track.format
hdr_format = primary_video_track.hdr_format_commercial
trc = (
primary_video_track.transfer_characteristics
or primary_video_track.transfer_characteristics_original
)
frame_rate = float(primary_video_track.frame_rate)
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
if (primary_video_track.hdr_format_commercial) != "Dolby Vision":
name += f" DV {DYNAMIC_RANGE_MAP.get(hdr_format)} "
else:
name += f" {DYNAMIC_RANGE_MAP.get(hdr_format)} "
elif trc and "HLG" in trc:
name += " HLG"
if frame_rate > 30:
name += " HFR"
name += f" {VIDEO_CODEC_MAP.get(codec, codec)}"
if config.tag:
name += f"-{config.tag}"
return sanitize_filename(name)
else:
# Simple naming style without technical details - use spaces instead of dots
return sanitize_filename(name, " ")
class Series(SortedKeyList, ABC):

View File

@@ -58,75 +58,86 @@ class Movie(Title):
# Name (Year)
name = str(self).replace("$", "S") # e.g., Arli$$
# Resolution
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")]
if len(aspect_ratio) == 1:
# e.g., aspect ratio of 2 (2.00:1) would end up as `(2.0,)`, add 1
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
# We want the resolution represented in a 4:3 or 16:9 canvas.
# If it's not 4:3 or 16:9, calculate as if it's inside a 16:9 canvas,
# otherwise the track's height value is fine.
# We are assuming this title is some weird aspect ratio so most
# likely a movie or HD source, so it's most likely widescreen so
# 16:9 canvas makes the most sense.
resolution = int(primary_video_track.width * (9 / 16))
name += f" {resolution}p"
if config.scene_naming:
# Resolution
if primary_video_track:
resolution = primary_video_track.height
aspect_ratio = [
int(float(plane)) for plane in primary_video_track.other_display_aspect_ratio[0].split(":")
]
if len(aspect_ratio) == 1:
# e.g., aspect ratio of 2 (2.00:1) would end up as `(2.0,)`, add 1
aspect_ratio.append(1)
if aspect_ratio[0] / aspect_ratio[1] not in (16 / 9, 4 / 3):
# We want the resolution represented in a 4:3 or 16:9 canvas.
# If it's not 4:3 or 16:9, calculate as if it's inside a 16:9 canvas,
# otherwise the track's height value is fine.
# We are assuming this title is some weird aspect ratio so most
# likely a movie or HD source, so it's most likely widescreen so
# 16:9 canvas makes the most sense.
resolution = int(primary_video_track.width * (9 / 16))
name += f" {resolution}p"
# Service
if show_service:
name += f" {self.service.__name__}"
# Service
if show_service:
name += f" {self.service.__name__}"
# 'WEB-DL'
name += " WEB-DL"
# 'WEB-DL'
name += " WEB-DL"
# DUAL
if unique_audio_languages == 2:
name += " DUAL"
# DUAL
if unique_audio_languages == 2:
name += " DUAL"
# MULTi
if unique_audio_languages > 2:
name += " MULTi"
# MULTi
if unique_audio_languages > 2:
name += " MULTi"
# Audio Codec + Channels (+ feature)
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" ")))
else:
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
features = primary_audio_track.format_additionalfeatures or ""
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or primary_audio_track.joc:
name += " Atmos"
# Video (dynamic range + hfr +) Codec
if primary_video_track:
codec = primary_video_track.format
hdr_format = primary_video_track.hdr_format_commercial
trc = primary_video_track.transfer_characteristics or primary_video_track.transfer_characteristics_original
frame_rate = float(primary_video_track.frame_rate)
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
if (primary_video_track.hdr_format_commercial) != "Dolby Vision":
name += f" DV {DYNAMIC_RANGE_MAP.get(hdr_format)} "
# Audio Codec + Channels (+ feature)
if primary_audio_track:
codec = primary_audio_track.format
channel_layout = primary_audio_track.channel_layout or primary_audio_track.channellayout_original
if channel_layout:
channels = float(
sum({"LFE": 0.1}.get(position.upper(), 1) for position in channel_layout.split(" "))
)
else:
name += f" {DYNAMIC_RANGE_MAP.get(hdr_format)} "
elif trc and "HLG" in trc:
name += " HLG"
if frame_rate > 30:
name += " HFR"
name += f" {VIDEO_CODEC_MAP.get(codec, codec)}"
channel_count = primary_audio_track.channel_s or primary_audio_track.channels or 0
channels = float(channel_count)
if config.tag:
name += f"-{config.tag}"
features = primary_audio_track.format_additionalfeatures or ""
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or primary_audio_track.joc:
name += " Atmos"
return sanitize_filename(name)
# Video (dynamic range + hfr +) Codec
if primary_video_track:
codec = primary_video_track.format
hdr_format = primary_video_track.hdr_format_commercial
trc = (
primary_video_track.transfer_characteristics
or primary_video_track.transfer_characteristics_original
)
frame_rate = float(primary_video_track.frame_rate)
if hdr_format:
if (primary_video_track.hdr_format or "").startswith("Dolby Vision"):
if (primary_video_track.hdr_format_commercial) != "Dolby Vision":
name += f" DV {DYNAMIC_RANGE_MAP.get(hdr_format)} "
else:
name += f" {DYNAMIC_RANGE_MAP.get(hdr_format)} "
elif trc and "HLG" in trc:
name += " HLG"
if frame_rate > 30:
name += " HFR"
name += f" {VIDEO_CODEC_MAP.get(codec, codec)}"
if config.tag:
name += f"-{config.tag}"
return sanitize_filename(name)
else:
# Simple naming style without technical details - use spaces instead of dots
return sanitize_filename(name, " ")
class Movies(SortedKeyList, ABC):

View File

@@ -100,22 +100,26 @@ class Song(Title):
# NN. Song Name
name = str(self).split(" / ")[1]
# Service
if show_service:
name += f" {self.service.__name__}"
if config.scene_naming:
# Service
if show_service:
name += f" {self.service.__name__}"
# 'WEB-DL'
name += " WEB-DL"
# 'WEB-DL'
name += " WEB-DL"
# Audio Codec + Channels (+ feature)
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or audio_track.joc:
name += " Atmos"
# Audio Codec + Channels (+ feature)
name += f" {AUDIO_CODEC_MAP.get(codec, codec)}{channels:.1f}"
if "JOC" in features or audio_track.joc:
name += " Atmos"
if config.tag:
name += f"-{config.tag}"
if config.tag:
name += f"-{config.tag}"
return sanitize_filename(name, " ")
return sanitize_filename(name, " ")
else:
# Simple naming style without technical details
return sanitize_filename(name, " ")
class Album(SortedKeyList, ABC):

View File

@@ -62,6 +62,7 @@ class Attachment:
session = session or requests.Session()
response = session.get(url, stream=True)
response.raise_for_status()
config.directories.temp.mkdir(parents=True, exist_ok=True)
download_path.parent.mkdir(parents=True, exist_ok=True)
with open(download_path, "wb") as f:

View File

@@ -8,7 +8,7 @@ from pathlib import Path
from rich.padding import Padding
from rich.rule import Rule
from unshackle.core.binaries import DoviTool
from unshackle.core.binaries import DoviTool, HDR10PlusTool
from unshackle.core.config import config
from unshackle.core.console import console
@@ -20,6 +20,7 @@ class Hybrid:
"""
Takes the Dolby Vision and HDR10(+) streams out of the VideoTracks.
It will then attempt to inject the Dolby Vision metadata layer to the HDR10(+) stream.
If no DV track is available but HDR10+ is present, it will convert HDR10+ to DV.
"""
global directories
from unshackle.core.tracks import Video
@@ -29,17 +30,35 @@ class Hybrid:
self.rpu_file = "RPU.bin"
self.hdr_type = "HDR10"
self.hevc_file = f"{self.hdr_type}-DV.hevc"
self.hdr10plus_to_dv = False
self.hdr10plus_file = "HDR10Plus.json"
console.print(Padding(Rule("[rule.text]HDR10+DV Hybrid"), (1, 2)))
# Get resolution info from HDR10 track for display
hdr10_track = next((v for v in videos if v.range == Video.Range.HDR10), None)
hdr10p_track = next((v for v in videos if v.range == Video.Range.HDR10P), None)
track_for_res = hdr10_track or hdr10p_track
self.resolution = f"{track_for_res.height}p" if track_for_res and track_for_res.height else "Unknown"
console.print(Padding(Rule(f"[rule.text]HDR10+DV Hybrid ({self.resolution})"), (1, 2)))
for video in self.videos:
if not video.path or not os.path.exists(video.path):
self.log.exit(f" - Video track {video.id} was not downloaded before injection.")
raise ValueError(f"Video track {video.id} was not downloaded before injection.")
if not any(video.range == Video.Range.DV for video in self.videos) or not any(
video.range == Video.Range.HDR10 for video in self.videos
):
self.log.exit(" - Two VideoTracks available but one of them is not DV nor HDR10(+).")
# Check if we have DV track available
has_dv = any(video.range == Video.Range.DV for video in self.videos)
has_hdr10 = any(video.range == Video.Range.HDR10 for video in self.videos)
has_hdr10p = any(video.range == Video.Range.HDR10P for video in self.videos)
if not has_hdr10:
raise ValueError("No HDR10 track available for hybrid processing.")
# If we have HDR10+ but no DV, we can convert HDR10+ to DV
if not has_dv and has_hdr10p:
self.log.info("✓ No DV track found, but HDR10+ is available. Will convert HDR10+ to DV.")
self.hdr10plus_to_dv = True
elif not has_dv:
raise ValueError("No DV track available and no HDR10+ to convert.")
if os.path.isfile(config.directories.temp / self.hevc_file):
self.log.info("✓ Already Injected")
@@ -49,22 +68,32 @@ class Hybrid:
# Use the actual path from the video track
save_path = video.path
if not save_path or not os.path.exists(save_path):
self.log.exit(f" - Video track {video.id} was not downloaded or path not found: {save_path}")
raise ValueError(f"Video track {video.id} was not downloaded or path not found: {save_path}")
if video.range == Video.Range.HDR10:
self.extract_stream(save_path, "HDR10")
elif video.range == Video.Range.HDR10P:
self.extract_stream(save_path, "HDR10")
self.hdr_type = "HDR10+"
elif video.range == Video.Range.DV:
self.extract_stream(save_path, "DV")
# self.extract_dv_stream(video, save_path)
self.extract_rpu([video for video in videos if video.range == Video.Range.DV][0])
if os.path.isfile(config.directories.temp / "RPU_UNT.bin"):
self.rpu_file = "RPU_UNT.bin"
self.level_6()
# Mode 3 conversion already done during extraction when not untouched
elif os.path.isfile(config.directories.temp / "RPU.bin"):
# RPU already extracted with mode 3
pass
if self.hdr10plus_to_dv:
# Extract HDR10+ metadata and convert to DV
hdr10p_video = next(v for v in videos if v.range == Video.Range.HDR10P)
self.extract_hdr10plus(hdr10p_video)
self.convert_hdr10plus_to_dv()
else:
# Regular DV extraction
dv_video = next(v for v in videos if v.range == Video.Range.DV)
self.extract_rpu(dv_video)
if os.path.isfile(config.directories.temp / "RPU_UNT.bin"):
self.rpu_file = "RPU_UNT.bin"
self.level_6()
# Mode 3 conversion already done during extraction when not untouched
elif os.path.isfile(config.directories.temp / "RPU.bin"):
# RPU already extracted with mode 3
pass
self.injecting()
@@ -72,9 +101,9 @@ class Hybrid:
if self.source == ("itunes" or "appletvplus"):
Path.unlink(config.directories.temp / "hdr10.mkv")
Path.unlink(config.directories.temp / "dv.mkv")
Path.unlink(config.directories.temp / "DV.hevc")
Path.unlink(config.directories.temp / "HDR10.hevc")
Path.unlink(config.directories.temp / f"{self.rpu_file}")
Path.unlink(config.directories.temp / "HDR10.hevc", missing_ok=True)
Path.unlink(config.directories.temp / "DV.hevc", missing_ok=True)
Path.unlink(config.directories.temp / f"{self.rpu_file}", missing_ok=True)
def ffmpeg_simple(self, save_path, output):
"""Simple ffmpeg execution without progress tracking"""
@@ -106,142 +135,6 @@ class Hybrid:
self.log.error(f"x Failed extracting {type_} stream")
sys.exit(1)
def ffmpeg_task(self, save_path, output, task_id):
p = subprocess.Popen(
[
"ffmpeg",
"-nostdin",
"-i",
str(save_path),
"-c:v",
"copy",
str(output),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
)
self.progress.start_task(task_id)
for line in p.stderr:
if "frame=" in line:
self.progress.update(task_id, advance=0)
p.wait()
return p.returncode
def extract_hdr10_stream(self, video, save_path):
type_ = "HDR10"
if os.path.isfile(Path(config.directories.temp / f"{type_}.hevc")):
return
if self.source == "itunes" or self.source == "appletvplus":
self.log.info("+ Muxing HDR10 stream for fixing MP4 file")
subprocess.run(
[
"mkvmerge",
"-o",
Path(config.directories.temp / "hdr10.mkv"),
save_path,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
self.log.info(f"+ Extracting {type_} stream")
extract_stream = subprocess.run(
[
"ffmpeg",
"-nostdin",
"-stats",
"-i",
Path(config.directories.temp / "hdr10.mkv"),
"-c:v",
"copy",
Path(config.directories.temp / f"{type_}.hevc"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if extract_stream.returncode:
Path.unlink(Path(config.directories.temp / f"{type_}.hevc"))
self.log.error(f"x Failed extracting {type_} stream")
sys.exit(1)
else:
extract_stream = subprocess.run(
[
"ffmpeg",
"-nostdin",
"-stats",
"-i",
save_path,
"-c:v",
"copy",
Path(config.directories.temp / f"{type_}.hevc"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if extract_stream.returncode:
Path.unlink(Path(config.directories.temp / f"{type_}.hevc"))
self.log.error(f"x Failed extracting {type_} stream")
sys.exit(1)
def extract_dv_stream(self, video, save_path):
type_ = "DV"
if os.path.isfile(Path(config.directories.temp / f"{type_}.hevc")):
return
if self.source == "itunes" or self.source == "appletvplus":
self.log.info("+ Muxing Dolby Vision stream for fixing MP4 file")
subprocess.run(
[
"mkvmerge",
"-o",
Path(config.directories.temp / "dv.mkv"),
save_path,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
self.log.info("+ Extracting Dolby Vision stream")
extract_stream = subprocess.run(
[
"ffmpeg",
"-nostdin",
"-stats",
"-i",
Path(config.directories.temp / "dv.mkv"),
"-an",
"-c:v",
"copy",
"-f",
"hevc",
Path(config.directories.temp / "out_1.h265"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if extract_stream.returncode:
Path.unlink(Path(config.directories.temp / f"{type_}.hevc"))
self.log.error(f"x Failed extracting {type_} stream")
sys.exit(1)
else:
extract_stream = subprocess.run(
[
"mp4demuxer",
"--input-file",
save_path,
"--output-folder",
Path(config.directories.temp),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if extract_stream.returncode:
Path.unlink(Path(config.directories.temp / f"{type_}.hevc"))
self.log.error(f"x Failed extracting {type_} stream")
sys.exit(1)
def extract_rpu(self, video, untouched=False):
if os.path.isfile(config.directories.temp / "RPU.bin") or os.path.isfile(
config.directories.temp / "RPU_UNT.bin"
@@ -271,9 +164,9 @@ class Hybrid:
if b"MAX_PQ_LUMINANCE" in rpu_extraction.stderr:
self.extract_rpu(video, untouched=True)
elif b"Invalid PPS index" in rpu_extraction.stderr:
self.log.exit("x Dolby Vision VideoTrack seems to be corrupt")
raise ValueError("Dolby Vision VideoTrack seems to be corrupt")
else:
self.log.exit(f"x Failed extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream")
raise ValueError(f"Failed extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream")
def level_6(self):
"""Edit RPU Level 6 values"""
@@ -310,60 +203,117 @@ class Hybrid:
if level6.returncode:
Path.unlink(config.directories.temp / "RPU_L6.bin")
self.log.exit("x Failed editing RPU Level 6 values")
raise ValueError("Failed editing RPU Level 6 values")
# Update rpu_file to use the edited version
self.rpu_file = "RPU_L6.bin"
def mode_3(self):
"""Convert RPU to Mode 3"""
with open(config.directories.temp / "M3.json", "w+") as mode3_file:
json.dump({"mode": 3}, mode3_file, indent=3)
if not os.path.isfile(config.directories.temp / "RPU_M3.bin"):
self.log.info("+ Converting RPU to Mode 3")
mode3 = subprocess.run(
[
str(DoviTool),
"editor",
"-i",
config.directories.temp / self.rpu_file,
"-j",
config.directories.temp / "M3.json",
"-o",
config.directories.temp / "RPU_M3.bin",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if mode3.returncode:
Path.unlink(config.directories.temp / "RPU_M3.bin")
self.log.exit("x Failed converting RPU to Mode 3")
self.rpu_file = "RPU_M3.bin"
def injecting(self):
if os.path.isfile(config.directories.temp / self.hevc_file):
return
self.log.info(f"+ Injecting Dolby Vision metadata into {self.hdr_type} stream")
inject_cmd = [
str(DoviTool),
"inject-rpu",
"-i",
config.directories.temp / "HDR10.hevc",
"--rpu-in",
config.directories.temp / self.rpu_file,
]
# If we converted from HDR10+, optionally remove HDR10+ metadata during injection
# Default to removing HDR10+ metadata since we're converting to DV
if self.hdr10plus_to_dv:
inject_cmd.append("--drop-hdr10plus")
self.log.info(" - Removing HDR10+ metadata during injection")
inject_cmd.extend(["-o", config.directories.temp / self.hevc_file])
inject = subprocess.run(
[
str(DoviTool),
"inject-rpu",
"-i",
config.directories.temp / f"{self.hdr_type}.hevc",
"--rpu-in",
config.directories.temp / self.rpu_file,
"-o",
config.directories.temp / self.hevc_file,
],
inject_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if inject.returncode:
Path.unlink(config.directories.temp / self.hevc_file)
self.log.exit("x Failed injecting Dolby Vision metadata into HDR10 stream")
raise ValueError("Failed injecting Dolby Vision metadata into HDR10 stream")
def extract_hdr10plus(self, _video):
"""Extract HDR10+ metadata from the video stream"""
if os.path.isfile(config.directories.temp / self.hdr10plus_file):
return
if not HDR10PlusTool:
raise ValueError("HDR10Plus_tool not found. Please install it to use HDR10+ to DV conversion.")
self.log.info("+ Extracting HDR10+ metadata")
# HDR10Plus_tool needs raw HEVC stream
extraction = subprocess.run(
[
str(HDR10PlusTool),
"extract",
str(config.directories.temp / "HDR10.hevc"),
"-o",
str(config.directories.temp / self.hdr10plus_file),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if extraction.returncode:
raise ValueError("Failed extracting HDR10+ metadata")
# Check if the extracted file has content
if os.path.getsize(config.directories.temp / self.hdr10plus_file) == 0:
raise ValueError("No HDR10+ metadata found in the stream")
def convert_hdr10plus_to_dv(self):
"""Convert HDR10+ metadata to Dolby Vision RPU"""
if os.path.isfile(config.directories.temp / "RPU.bin"):
return
self.log.info("+ Converting HDR10+ metadata to Dolby Vision")
# First create the extra metadata JSON for dovi_tool
extra_metadata = {
"cm_version": "V29",
"length": 0, # dovi_tool will figure this out
"level6": {
"max_display_mastering_luminance": 1000,
"min_display_mastering_luminance": 1,
"max_content_light_level": 0,
"max_frame_average_light_level": 0,
},
}
with open(config.directories.temp / "extra.json", "w") as f:
json.dump(extra_metadata, f, indent=2)
# Generate DV RPU from HDR10+ metadata
conversion = subprocess.run(
[
str(DoviTool),
"generate",
"-j",
str(config.directories.temp / "extra.json"),
"--hdr10plus-json",
str(config.directories.temp / self.hdr10plus_file),
"-o",
str(config.directories.temp / "RPU.bin"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if conversion.returncode:
raise ValueError("Failed converting HDR10+ to Dolby Vision")
self.log.info("✓ HDR10+ successfully converted to Dolby Vision Profile 8")
# Clean up temporary files
Path.unlink(config.directories.temp / "extra.json")
Path.unlink(config.directories.temp / self.hdr10plus_file)

View File

@@ -331,21 +331,31 @@ class Tracks:
if not vt.path or not vt.path.exists():
raise ValueError("Video Track must be downloaded before muxing...")
events.emit(events.Types.TRACK_MULTIPLEX, track=vt)
cl.extend(
[
"--language",
f"0:{vt.language}",
"--default-track",
f"0:{i == 0}",
"--original-flag",
f"0:{vt.is_original_lang}",
"--compression",
"0:none", # disable extra compression
"(",
str(vt.path),
")",
]
)
# Prepare base arguments
video_args = [
"--language",
f"0:{vt.language}",
"--default-track",
f"0:{i == 0}",
"--original-flag",
f"0:{vt.is_original_lang}",
"--compression",
"0:none", # disable extra compression
]
# Add FPS fix if needed (typically for hybrid mode to prevent sync issues)
if hasattr(vt, "needs_duration_fix") and vt.needs_duration_fix and vt.fps:
video_args.extend(
[
"--default-duration",
f"0:{vt.fps}fps" if isinstance(vt.fps, str) else f"0:{vt.fps:.3f}fps",
"--fix-bitstream-timing-information",
"0:1",
]
)
cl.extend(video_args + ["(", str(vt.path), ")"])
for i, at in enumerate(self.audio):
if not at.path or not at.path.exists():

View File

@@ -116,6 +116,7 @@ class Video(Track):
class Transfer(Enum):
Unspecified = 0
BT_709 = 1
Unspecified_Image = 2
BT_601 = 6
BT_2020 = 14
BT_2100 = 15
@@ -237,6 +238,8 @@ class Video(Track):
except Exception as e:
raise ValueError("Expected fps to be a number, float, or a string as numerator/denominator form, " + str(e))
self.needs_duration_fix = False
def __str__(self) -> str:
return " | ".join(
filter(

View File

@@ -0,0 +1,188 @@
from __future__ import annotations
import asyncio
import json
import time
from pathlib import Path
from typing import Optional
import requests
class UpdateChecker:
"""Check for available updates from the GitHub repository."""
REPO_URL = "https://api.github.com/repos/unshackle-dl/unshackle/releases/latest"
TIMEOUT = 5
DEFAULT_CHECK_INTERVAL = 24 * 60 * 60 # 24 hours in seconds
@classmethod
def _get_cache_file(cls) -> Path:
"""Get the path to the update check cache file."""
from unshackle.core.config import config
return config.directories.cache / "update_check.json"
@classmethod
def _should_check_for_updates(cls, check_interval: int = DEFAULT_CHECK_INTERVAL) -> bool:
"""
Check if enough time has passed since the last update check.
Args:
check_interval: Time in seconds between checks (default: 24 hours)
Returns:
True if we should check for updates, False otherwise
"""
cache_file = cls._get_cache_file()
if not cache_file.exists():
return True
try:
with open(cache_file, "r") as f:
cache_data = json.load(f)
last_check = cache_data.get("last_check", 0)
current_time = time.time()
return (current_time - last_check) >= check_interval
except (json.JSONDecodeError, KeyError, OSError):
# If cache is corrupted or unreadable, allow check
return True
@classmethod
def _update_cache(cls, latest_version: Optional[str] = None) -> None:
"""
Update the cache file with the current timestamp and latest version.
Args:
latest_version: The latest version found, if any
"""
cache_file = cls._get_cache_file()
try:
# Ensure cache directory exists
cache_file.parent.mkdir(parents=True, exist_ok=True)
cache_data = {"last_check": time.time(), "latest_version": latest_version}
with open(cache_file, "w") as f:
json.dump(cache_data, f)
except (OSError, json.JSONEncodeError):
# Silently fail if we can't write cache
pass
@staticmethod
def _compare_versions(current: str, latest: str) -> bool:
"""
Simple semantic version comparison.
Args:
current: Current version string (e.g., "1.1.0")
latest: Latest version string (e.g., "1.2.0")
Returns:
True if latest > current, False otherwise
"""
try:
current_parts = [int(x) for x in current.split(".")]
latest_parts = [int(x) for x in latest.split(".")]
max_length = max(len(current_parts), len(latest_parts))
current_parts.extend([0] * (max_length - len(current_parts)))
latest_parts.extend([0] * (max_length - len(latest_parts)))
for current_part, latest_part in zip(current_parts, latest_parts):
if latest_part > current_part:
return True
elif latest_part < current_part:
return False
return False
except (ValueError, AttributeError):
return False
@classmethod
async def check_for_updates(cls, current_version: str) -> Optional[str]:
"""
Check if there's a newer version available on GitHub.
Args:
current_version: The current version string (e.g., "1.1.0")
Returns:
The latest version string if an update is available, None otherwise
"""
try:
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(None, lambda: requests.get(cls.REPO_URL, timeout=cls.TIMEOUT))
if response.status_code != 200:
return None
data = response.json()
latest_version = data.get("tag_name", "").lstrip("v")
if not latest_version:
return None
if cls._compare_versions(current_version, latest_version):
return latest_version
except Exception:
pass
return None
@classmethod
def check_for_updates_sync(cls, current_version: str, check_interval: Optional[int] = None) -> Optional[str]:
"""
Synchronous version of update check with rate limiting.
Args:
current_version: The current version string (e.g., "1.1.0")
check_interval: Time in seconds between checks (default: from config)
Returns:
The latest version string if an update is available, None otherwise
"""
# Use config value if not specified
if check_interval is None:
from unshackle.core.config import config
check_interval = config.update_check_interval * 60 * 60 # Convert hours to seconds
# Check if we should skip this check due to rate limiting
if not cls._should_check_for_updates(check_interval):
return None
try:
response = requests.get(cls.REPO_URL, timeout=cls.TIMEOUT)
if response.status_code != 200:
# Update cache even on failure to prevent rapid retries
cls._update_cache()
return None
data = response.json()
latest_version = data.get("tag_name", "").lstrip("v")
if not latest_version:
cls._update_cache()
return None
# Update cache with the latest version info
cls._update_cache(latest_version)
if cls._compare_versions(current_version, latest_version):
return latest_version
except Exception:
# Update cache even on exception to prevent rapid retries
cls._update_cache()
pass
return None

View File

@@ -25,8 +25,20 @@ class Vaults:
def __len__(self) -> int:
return len(self.vaults)
def load(self, type_: str, **kwargs: Any) -> None:
"""Load a Vault into the vaults list."""
def load(self, type_: str, **kwargs: Any) -> bool:
"""Load a Vault into the vaults list. Returns True if successful, False otherwise."""
module = _MODULES.get(type_)
if not module:
raise ValueError(f"Unable to find vault command by the name '{type_}'.")
try:
vault = module(**kwargs)
self.vaults.append(vault)
return True
except Exception:
return False
def load_critical(self, type_: str, **kwargs: Any) -> None:
"""Load a critical Vault that must succeed or raise an exception."""
module = _MODULES.get(type_)
if not module:
raise ValueError(f"Unable to find vault command by the name '{type_}'.")

View File

@@ -4,14 +4,40 @@ tag: user_tag
# Set terminal background color (custom option not in CONFIG.md)
set_terminal_bg: false
# Set file naming convention
# true for style - Prime.Suspect.S07E01.The.Final.Act.Part.One.1080p.ITV.WEB-DL.AAC2.0.H.264
# false for style - Prime Suspect S07E01 The Final Act - Part One
scene_naming: true
# Check for updates from GitHub repository on startup (default: true)
update_checks: true
# How often to check for updates, in hours (default: 24)
update_check_interval: 24
# Muxing configuration
muxing:
set_title: false
# Login credentials for each Service
credentials:
# Direct credentials (no profile support)
EXAMPLE: email@example.com:password
EXAMPLE2: username:password
# Per-profile credentials with default fallback
SERVICE_NAME:
default: default@email.com:password # Used when no -p/--profile is specified
profile1: user1@email.com:password1
profile2: user2@email.com:password2
# Per-profile credentials without default (requires -p/--profile)
SERVICE_NAME2:
john: john@example.com:johnspassword
jane: jane@example.com:janespassword
# You can also use list format for passwords with special characters
SERVICE_NAME3:
default: ["user@email.com", ":PasswordWith:Colons"]
# Override default directories used across unshackle
directories:
@@ -33,8 +59,17 @@ directories:
# Pre-define which Widevine or PlayReady device to use for each Service
cdm:
# Global default CDM device (fallback for all services/profiles)
default: WVD_1
EXAMPLE: PRD_1
# Direct service-specific CDM
DIFFERENT_EXAMPLE: PRD_1
# Per-profile CDM configuration
EXAMPLE:
john_sd: chromecdm_903_l3 # Profile 'john_sd' uses Chrome CDM L3
jane_uhd: nexus_5_l1 # Profile 'jane_uhd' uses Nexus 5 L1
default: generic_android_l3 # Default CDM for this service
# Use pywidevine Serve-compliant Remote CDMs
remote_cdm:
@@ -129,6 +164,15 @@ filenames:
# API key for The Movie Database (TMDB)
tmdb_api_key: ""
# conversion_method:
# - auto (default): Smart routing - subby for WebVTT/SAMI, standard for others
# - subby: Always use subby with advanced processing
# - pycaption: Use only pycaption library (no SubtitleEdit, no subby)
# - subtitleedit: Prefer SubtitleEdit when available, fall back to pycaption
subtitle:
conversion_method: auto
sdh_method: auto
# Configuration for pywidevine's serve functionality
serve:
users:
@@ -142,20 +186,45 @@ serve:
# Configuration data for each Service
services:
# Service-specific configuration goes here
# EXAMPLE:
# api_key: "service_specific_key"
# Profile-specific configurations can be nested under service names
# Example: with profile-specific device configs
EXAMPLE:
# Global service config
api_key: "service_api_key"
# Profile-specific device configurations
profiles:
john_sd:
device:
app_name: "AIV"
device_model: "SHIELD Android TV"
jane_uhd:
device:
app_name: "AIV"
device_model: "Fire TV Stick 4K"
# Example: Service with different regions per profile
SERVICE_NAME:
profiles:
us_account:
region: "US"
api_endpoint: "https://api.us.service.com"
uk_account:
region: "GB"
api_endpoint: "https://api.uk.service.com"
# External proxy provider services
proxy_providers:
nordvpn:
username: username_from_service_credentials
password: password_from_service_credentials
servers:
server_map:
- us: 12 # force US server #12 for US proxies
surfsharkvpn:
username: your_surfshark_service_username # Service credentials from https://my.surfshark.com/vpn/manual-setup/main/openvpn
password: your_surfshark_service_password # Service credentials (not your login password)
servers:
server_map:
- us: 3844 # force US server #3844 for US proxies
- gb: 2697 # force GB server #2697 for GB proxies
- au: 4621 # force AU server #4621 for AU proxies

View File

@@ -30,7 +30,7 @@ class HTTP(Vault):
api_mode: "query" for query parameters or "json" for JSON API
"""
super().__init__(name)
self.url = host.rstrip("/")
self.url = host
self.password = password
self.username = username
self.api_mode = api_mode.lower()
@@ -88,21 +88,23 @@ class HTTP(Vault):
if self.api_mode == "json":
try:
title = getattr(self, "current_title", None)
response = self.request(
"GetKey",
{
"kid": kid,
"service": service.lower(),
"title": title,
},
)
params = {
"kid": kid,
"service": service.lower(),
}
response = self.request("GetKey", params)
if response.get("status") == "not_found":
return None
keys = response.get("keys", [])
for key_entry in keys:
if key_entry["kid"] == kid:
return key_entry["key"]
if isinstance(key_entry, str) and ":" in key_entry:
entry_kid, entry_key = key_entry.split(":", 1)
if entry_kid == kid:
return entry_key
elif isinstance(key_entry, dict):
if key_entry.get("kid") == kid:
return key_entry.get("key")
except Exception as e:
print(f"Failed to get key ({e.__class__.__name__}: {e})")
return None

2
uv.lock generated
View File

@@ -1505,7 +1505,7 @@ wheels = [
[[package]]
name = "unshackle"
version = "1.1.0"
version = "1.2.0"
source = { editable = "." }
dependencies = [
{ name = "appdirs" },