7 Commits
1.4.6 ... 1.4.7

Author SHA1 Message Date
Andy
bc26bf3046 feat: update changelog for version 1.4.7 2025-09-25 06:29:46 +00:00
Andy
35efdbff6d feat: add curl_cffi session support with browser impersonation
Add new session utility with curl_cffi support for anti-bot protection
Update all manifest parsers (DASH, HLS, ISM, M3U8) to accept curl_cffi sessions
Add browser impersonation support (Chrome, Firefox, Safari)
Fix cookie handling compatibility between requests and curl_cffi
Suppress HTTPS proxy warnings for better UX
Maintain full backward compatibility with requests.Session
2025-09-25 06:27:14 +00:00
Andy
63b7a49c1a feat: Add decrypt_labs_api_key to Config initialization and change duplicate track log level to debug 2025-09-25 06:22:50 +00:00
Andy
98ecf6f876 feat: Add download retry count option to download function 2025-09-23 01:32:00 +00:00
Andy
5df6914536 feat: Add options for required subtitles and best available quality in download command 2025-09-23 01:28:55 +00:00
Andy
c1df074965 Change new dynamic CDM selection text to be in Debug only 2025-09-14 04:25:57 +00:00
Andy
da60a396dd Fix: Prevent KeyError when reusing remote CDMs in dynamic selection
Creates a copy of the CDM dictionary before modification to prevent the original configuration from being mutated, allowing the same CDM to be selected multiple times within a session without errors.
2025-09-14 01:14:01 +00:00
13 changed files with 233 additions and 52 deletions

View File

@@ -5,6 +5,41 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.4.7] - 2025-09-25
### Added
- **curl_cffi Session Support**: Enhanced anti-bot protection with browser impersonation
- Added new session utility with curl_cffi support for bypassing anti-bot measures
- Browser impersonation support for Chrome, Firefox, and Safari user agents
- Full backward compatibility with requests.Session maintained
- Suppressed HTTPS proxy warnings for improved user experience
- **Download Retry Functionality**: Configurable retry mechanism for failed downloads
- Added retry count option to download function for improved reliability
- **Subtitle Requirements Options**: Enhanced subtitle download control
- Added options for required subtitles in download command
- Better control over subtitle track selection and requirements
- **Quality Selection Enhancement**: Improved quality selection options
- Added best available quality option in download command for optimal track selection
- **DecryptLabs API Integration**: Enhanced remote CDM configuration
- Added decrypt_labs_api_key to Config initialization for better API integration
### Changed
- **Manifest Parser Updates**: Enhanced compatibility across all parsers
- Updated DASH, HLS, ISM, and M3U8 parsers to accept curl_cffi sessions
- Improved cookie handling compatibility between requests and curl_cffi
- **Logging Improvements**: Reduced log verbosity for better user experience
- Changed duplicate track log level to debug to reduce console noise
- Dynamic CDM selection messages moved to debug-only output
### Fixed
- **Remote CDM Reuse**: Fixed KeyError in dynamic CDM selection
- Prevents KeyError when reusing remote CDMs in dynamic selection process
- Creates copy of CDM dictionary before modification to prevent configuration mutation
- Allows same CDM to be selected multiple times within session without errors
## [1.4.6] - 2025-09-13 ## [1.4.6] - 2025-09-13
### Added ### Added

View File

@@ -173,6 +173,12 @@ class dl:
help="Language wanted for Audio, overrides -l/--lang for audio tracks.", help="Language wanted for Audio, overrides -l/--lang for audio tracks.",
) )
@click.option("-sl", "--s-lang", type=LANGUAGE_RANGE, default=["all"], help="Language wanted for Subtitles.") @click.option("-sl", "--s-lang", type=LANGUAGE_RANGE, default=["all"], help="Language wanted for Subtitles.")
@click.option(
"--require-subs",
type=LANGUAGE_RANGE,
default=[],
help="Required subtitle languages. Downloads all subtitles only if these languages exist. Cannot be used with --s-lang.",
)
@click.option("-fs", "--forced-subs", is_flag=True, default=False, help="Include forced subtitle tracks.") @click.option("-fs", "--forced-subs", is_flag=True, default=False, help="Include forced subtitle tracks.")
@click.option( @click.option(
"--proxy", "--proxy",
@@ -263,6 +269,13 @@ class dl:
@click.option( @click.option(
"--reset-cache", "reset_cache", is_flag=True, default=False, help="Clear title cache before fetching." "--reset-cache", "reset_cache", is_flag=True, default=False, help="Clear title cache before fetching."
) )
@click.option(
"--best-available",
"best_available",
is_flag=True,
default=False,
help="Continue with best available quality if requested resolutions are not available.",
)
@click.pass_context @click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> dl: def cli(ctx: click.Context, **kwargs: Any) -> dl:
return dl(ctx, **kwargs) return dl(ctx, **kwargs)
@@ -322,6 +335,16 @@ class dl:
vault_copy = vault.copy() vault_copy = vault.copy()
del vault_copy["type"] del vault_copy["type"]
if vault_type.lower() == "api" and "decrypt_labs" in vault_name.lower():
if "token" not in vault_copy or not vault_copy["token"]:
if config.decrypt_labs_api_key:
vault_copy["token"] = config.decrypt_labs_api_key
else:
self.log.warning(
f"No token provided for DecryptLabs vault '{vault_name}' and no global "
"decrypt_labs_api_key configured"
)
if vault_type.lower() == "sqlite": if vault_type.lower() == "sqlite":
try: try:
self.vaults.load_critical(vault_type, **vault_copy) self.vaults.load_critical(vault_type, **vault_copy)
@@ -442,6 +465,7 @@ class dl:
v_lang: list[str], v_lang: list[str],
a_lang: list[str], a_lang: list[str],
s_lang: list[str], s_lang: list[str],
require_subs: list[str],
forced_subs: bool, forced_subs: bool,
sub_format: Optional[Subtitle.Codec], sub_format: Optional[Subtitle.Codec],
video_only: bool, video_only: bool,
@@ -462,6 +486,7 @@ class dl:
no_source: bool, no_source: bool,
workers: Optional[int], workers: Optional[int],
downloads: int, downloads: int,
best_available: bool,
*_: Any, *_: Any,
**__: Any, **__: Any,
) -> None: ) -> None:
@@ -469,6 +494,10 @@ class dl:
self.search_source = None self.search_source = None
start_time = time.time() start_time = time.time()
if require_subs and s_lang != ["all"]:
self.log.error("--require-subs and --s-lang cannot be used together")
sys.exit(1)
# Check if dovi_tool is available when hybrid mode is requested # Check if dovi_tool is available when hybrid mode is requested
if any(r == Video.Range.HYBRID for r in range_): if any(r == Video.Range.HYBRID for r in range_):
from unshackle.core.binaries import DoviTool from unshackle.core.binaries import DoviTool
@@ -703,6 +732,12 @@ class dl:
res_list = ", ".join([f"{x}p" for x in missing_resolutions[:-1]]) + " or " res_list = ", ".join([f"{x}p" for x in missing_resolutions[:-1]]) + " or "
res_list = f"{res_list}{missing_resolutions[-1]}p" res_list = f"{res_list}{missing_resolutions[-1]}p"
plural = "s" if len(missing_resolutions) > 1 else "" plural = "s" if len(missing_resolutions) > 1 else ""
if best_available:
self.log.warning(
f"There's no {res_list} Video Track{plural}, continuing with available qualities..."
)
else:
self.log.error(f"There's no {res_list} Video Track{plural}...") self.log.error(f"There's no {res_list} Video Track{plural}...")
sys.exit(1) sys.exit(1)
@@ -740,7 +775,21 @@ class dl:
title.tracks.videos = selected_videos title.tracks.videos = selected_videos
# filter subtitle tracks # filter subtitle tracks
if s_lang and "all" not in s_lang: if require_subs:
missing_langs = [
lang
for lang in require_subs
if not any(is_close_match(lang, [sub.language]) for sub in title.tracks.subtitles)
]
if missing_langs:
self.log.error(f"Required subtitle language(s) not found: {', '.join(missing_langs)}")
sys.exit(1)
self.log.info(
f"Required languages found ({', '.join(require_subs)}), downloading all available subtitles"
)
elif s_lang and "all" not in s_lang:
missing_langs = [ missing_langs = [
lang_ lang_
for lang_ in s_lang for lang_ in s_lang
@@ -880,7 +929,7 @@ class dl:
self.service, self.profile, drm="widevine", quality=highest_quality self.service, self.profile, drm="widevine", quality=highest_quality
) )
if quality_based_cdm and quality_based_cdm != self.cdm: if quality_based_cdm and quality_based_cdm != self.cdm:
self.log.info( self.log.debug(
f"Pre-selecting Widevine CDM based on highest quality {highest_quality}p across all video tracks" f"Pre-selecting Widevine CDM based on highest quality {highest_quality}p across all video tracks"
) )
self.cdm = quality_based_cdm self.cdm = quality_based_cdm
@@ -891,7 +940,7 @@ class dl:
self.service, self.profile, drm="playready", quality=highest_quality self.service, self.profile, drm="playready", quality=highest_quality
) )
if quality_based_cdm and quality_based_cdm != self.cdm: if quality_based_cdm and quality_based_cdm != self.cdm:
self.log.info( self.log.debug(
f"Pre-selecting PlayReady CDM based on highest quality {highest_quality}p across all video tracks" f"Pre-selecting PlayReady CDM based on highest quality {highest_quality}p across all video tracks"
) )
self.cdm = quality_based_cdm self.cdm = quality_based_cdm
@@ -1515,6 +1564,9 @@ class dl:
@staticmethod @staticmethod
def save_cookies(path: Path, cookies: CookieJar): def save_cookies(path: Path, cookies: CookieJar):
if hasattr(cookies, 'jar'):
cookies = cookies.jar
cookie_jar = MozillaCookieJar(path) cookie_jar = MozillaCookieJar(path)
cookie_jar.load() cookie_jar.load()
for cookie in cookies: for cookie in cookies:
@@ -1583,31 +1635,31 @@ class dl:
for key in quality_keys: for key in quality_keys:
if key.isdigit() and quality == int(key): if key.isdigit() and quality == int(key):
quality_match = cdm_name[key] quality_match = cdm_name[key]
self.log.info(f"Selected CDM based on exact quality match {quality}p: {quality_match}") self.log.debug(f"Selected CDM based on exact quality match {quality}p: {quality_match}")
break break
elif key.startswith(">="): elif key.startswith(">="):
threshold = int(key[2:]) threshold = int(key[2:])
if quality >= threshold: if quality >= threshold:
quality_match = cdm_name[key] quality_match = cdm_name[key]
self.log.info(f"Selected CDM based on quality {quality}p >= {threshold}p: {quality_match}") self.log.debug(f"Selected CDM based on quality {quality}p >= {threshold}p: {quality_match}")
break break
elif key.startswith(">"): elif key.startswith(">"):
threshold = int(key[1:]) threshold = int(key[1:])
if quality > threshold: if quality > threshold:
quality_match = cdm_name[key] quality_match = cdm_name[key]
self.log.info(f"Selected CDM based on quality {quality}p > {threshold}p: {quality_match}") self.log.debug(f"Selected CDM based on quality {quality}p > {threshold}p: {quality_match}")
break break
elif key.startswith("<="): elif key.startswith("<="):
threshold = int(key[2:]) threshold = int(key[2:])
if quality <= threshold: if quality <= threshold:
quality_match = cdm_name[key] quality_match = cdm_name[key]
self.log.info(f"Selected CDM based on quality {quality}p <= {threshold}p: {quality_match}") self.log.debug(f"Selected CDM based on quality {quality}p <= {threshold}p: {quality_match}")
break break
elif key.startswith("<"): elif key.startswith("<"):
threshold = int(key[1:]) threshold = int(key[1:])
if quality < threshold: if quality < threshold:
quality_match = cdm_name[key] quality_match = cdm_name[key]
self.log.info(f"Selected CDM based on quality {quality}p < {threshold}p: {quality_match}") self.log.debug(f"Selected CDM based on quality {quality}p < {threshold}p: {quality_match}")
break break
if quality_match: if quality_match:
@@ -1630,13 +1682,22 @@ class dl:
if not cdm_name: if not cdm_name:
return None return None
cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None) cdm_api = next(iter(x.copy() for x in config.remote_cdm if x["name"] == cdm_name), None)
if cdm_api: if cdm_api:
is_decrypt_lab = True if cdm_api.get("type") == "decrypt_labs" else False is_decrypt_lab = True if cdm_api.get("type") == "decrypt_labs" else False
if is_decrypt_lab: if is_decrypt_lab:
del cdm_api["name"] del cdm_api["name"]
del cdm_api["type"] del cdm_api["type"]
if "secret" not in cdm_api or not cdm_api["secret"]:
if config.decrypt_labs_api_key:
cdm_api["secret"] = config.decrypt_labs_api_key
else:
raise ValueError(
f"No secret provided for DecryptLabs CDM '{cdm_name}' and no global "
"decrypt_labs_api_key configured"
)
# All DecryptLabs CDMs use DecryptLabsRemoteCDM # All DecryptLabs CDMs use DecryptLabsRemoteCDM
return DecryptLabsRemoteCDM(service_name=service, vaults=self.vaults, **cdm_api) return DecryptLabsRemoteCDM(service_name=service, vaults=self.vaults, **cdm_api)
else: else:

View File

@@ -1 +1 @@
__version__ = "1.4.6" __version__ = "1.4.7"

View File

@@ -88,6 +88,7 @@ class Config:
self.tag_group_name: bool = kwargs.get("tag_group_name", True) self.tag_group_name: bool = kwargs.get("tag_group_name", True)
self.tag_imdb_tmdb: bool = kwargs.get("tag_imdb_tmdb", True) self.tag_imdb_tmdb: bool = kwargs.get("tag_imdb_tmdb", True)
self.tmdb_api_key: str = kwargs.get("tmdb_api_key") or "" self.tmdb_api_key: str = kwargs.get("tmdb_api_key") or ""
self.decrypt_labs_api_key: str = kwargs.get("decrypt_labs_api_key") or ""
self.update_checks: bool = kwargs.get("update_checks", True) self.update_checks: bool = kwargs.get("update_checks", True)
self.update_check_interval: int = kwargs.get("update_check_interval", 24) self.update_check_interval: int = kwargs.get("update_check_interval", 24)
self.scene_naming: bool = kwargs.get("scene_naming", True) self.scene_naming: bool = kwargs.get("scene_naming", True)

View File

@@ -150,6 +150,7 @@ def download(
track_type = track.__class__.__name__ track_type = track.__class__.__name__
thread_count = str(config.n_m3u8dl_re.get("thread_count", max_workers)) thread_count = str(config.n_m3u8dl_re.get("thread_count", max_workers))
retry_count = str(config.n_m3u8dl_re.get("retry_count", max_workers))
ad_keyword = config.n_m3u8dl_re.get("ad_keyword") ad_keyword = config.n_m3u8dl_re.get("ad_keyword")
arguments = [ arguments = [
@@ -160,6 +161,8 @@ def download(
output_dir, output_dir,
"--thread-count", "--thread-count",
thread_count, thread_count,
"--download-retry-count",
retry_count,
"--no-log", "--no-log",
"--write-meta-json", "--write-meta-json",
"false", "false",

View File

@@ -8,6 +8,7 @@ from urllib.parse import urljoin
from Cryptodome.Cipher import AES from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import unpad from Cryptodome.Util.Padding import unpad
from curl_cffi.requests import Session as CurlSession
from m3u8.model import Key from m3u8.model import Key
from requests import Session from requests import Session
@@ -69,8 +70,8 @@ class ClearKey:
""" """
if not isinstance(m3u_key, Key): if not isinstance(m3u_key, Key):
raise ValueError(f"Provided M3U Key is in an unexpected type {m3u_key!r}") raise ValueError(f"Provided M3U Key is in an unexpected type {m3u_key!r}")
if not isinstance(session, (Session, type(None))): if not isinstance(session, (Session, CurlSession, type(None))):
raise TypeError(f"Expected session to be a {Session}, not a {type(session)}") raise TypeError(f"Expected session to be a {Session} or {CurlSession}, not a {type(session)}")
if not m3u_key.method.startswith("AES"): if not m3u_key.method.startswith("AES"):
raise ValueError(f"Provided M3U Key is not an AES Clear Key, {m3u_key.method}") raise ValueError(f"Provided M3U Key is not an AES Clear Key, {m3u_key.method}")

View File

@@ -15,6 +15,7 @@ from uuid import UUID
from zlib import crc32 from zlib import crc32
import requests import requests
from curl_cffi.requests import Session as CurlSession
from langcodes import Language, tag_is_valid from langcodes import Language, tag_is_valid
from lxml.etree import Element, ElementTree from lxml.etree import Element, ElementTree
from pyplayready.system.pssh import PSSH as PR_PSSH from pyplayready.system.pssh import PSSH as PR_PSSH
@@ -47,7 +48,7 @@ class DASH:
self.url = url self.url = url
@classmethod @classmethod
def from_url(cls, url: str, session: Optional[Session] = None, **args: Any) -> DASH: def from_url(cls, url: str, session: Optional[Union[Session, CurlSession]] = None, **args: Any) -> DASH:
if not url: if not url:
raise requests.URLRequired("DASH manifest URL must be provided for relative path computations.") raise requests.URLRequired("DASH manifest URL must be provided for relative path computations.")
if not isinstance(url, str): if not isinstance(url, str):
@@ -55,8 +56,8 @@ class DASH:
if not session: if not session:
session = Session() session = Session()
elif not isinstance(session, Session): elif not isinstance(session, (Session, CurlSession)):
raise TypeError(f"Expected session to be a {Session}, not {session!r}") raise TypeError(f"Expected session to be a {Session} or {CurlSession}, not {session!r}")
res = session.get(url, **args) res = session.get(url, **args)
if res.url != url: if res.url != url:
@@ -103,6 +104,10 @@ class DASH:
continue continue
if next(iter(period.xpath("SegmentType/@value")), "content") != "content": if next(iter(period.xpath("SegmentType/@value")), "content") != "content":
continue continue
if "urn:amazon:primevideo:cachingBreadth" in [
x.get("schemeIdUri") for x in period.findall("SupplementalProperty")
]:
continue
for adaptation_set in period.findall("AdaptationSet"): for adaptation_set in period.findall("AdaptationSet"):
if self.is_trick_mode(adaptation_set): if self.is_trick_mode(adaptation_set):

View File

@@ -14,9 +14,10 @@ from typing import Any, Callable, Optional, Union
from urllib.parse import urljoin from urllib.parse import urljoin
from zlib import crc32 from zlib import crc32
import httpx
import m3u8 import m3u8
import requests import requests
from curl_cffi.requests import Response as CurlResponse
from curl_cffi.requests import Session as CurlSession
from langcodes import Language, tag_is_valid from langcodes import Language, tag_is_valid
from m3u8 import M3U8 from m3u8 import M3U8
from pyplayready.cdm import Cdm as PlayReadyCdm from pyplayready.cdm import Cdm as PlayReadyCdm
@@ -35,7 +36,7 @@ from unshackle.core.utilities import get_extension, is_close_match, try_ensure_u
class HLS: class HLS:
def __init__(self, manifest: M3U8, session: Optional[Union[Session, httpx.Client]] = None): def __init__(self, manifest: M3U8, session: Optional[Union[Session, CurlSession]] = None):
if not manifest: if not manifest:
raise ValueError("HLS manifest must be provided.") raise ValueError("HLS manifest must be provided.")
if not isinstance(manifest, M3U8): if not isinstance(manifest, M3U8):
@@ -47,7 +48,7 @@ class HLS:
self.session = session or Session() self.session = session or Session()
@classmethod @classmethod
def from_url(cls, url: str, session: Optional[Union[Session, httpx.Client]] = None, **args: Any) -> HLS: def from_url(cls, url: str, session: Optional[Union[Session, CurlSession]] = None, **args: Any) -> HLS:
if not url: if not url:
raise requests.URLRequired("HLS manifest URL must be provided.") raise requests.URLRequired("HLS manifest URL must be provided.")
if not isinstance(url, str): if not isinstance(url, str):
@@ -55,22 +56,22 @@ class HLS:
if not session: if not session:
session = Session() session = Session()
elif not isinstance(session, (Session, httpx.Client)): elif not isinstance(session, (Session, CurlSession)):
raise TypeError(f"Expected session to be a {Session} or {httpx.Client}, not {session!r}") raise TypeError(f"Expected session to be a {Session} or {CurlSession}, not {session!r}")
res = session.get(url, **args) res = session.get(url, **args)
# Handle both requests and httpx response objects # Handle requests and curl_cffi response objects
if isinstance(res, requests.Response): if isinstance(res, requests.Response):
if not res.ok: if not res.ok:
raise requests.ConnectionError("Failed to request the M3U(8) document.", response=res) raise requests.ConnectionError("Failed to request the M3U(8) document.", response=res)
content = res.text content = res.text
elif isinstance(res, httpx.Response): elif isinstance(res, CurlResponse):
if res.status_code >= 400: if not res.ok:
raise requests.ConnectionError("Failed to request the M3U(8) document.", response=res) raise requests.ConnectionError("Failed to request the M3U(8) document.", response=res)
content = res.text content = res.text
else: else:
raise TypeError(f"Expected response to be a requests.Response or httpx.Response, not {type(res)}") raise TypeError(f"Expected response to be a requests.Response or curl_cffi.Response, not {type(res)}")
master = m3u8.loads(content, uri=url) master = m3u8.loads(content, uri=url)
@@ -229,7 +230,7 @@ class HLS:
save_path: Path, save_path: Path,
save_dir: Path, save_dir: Path,
progress: partial, progress: partial,
session: Optional[Union[Session, httpx.Client]] = None, session: Optional[Union[Session, CurlSession]] = None,
proxy: Optional[str] = None, proxy: Optional[str] = None,
max_workers: Optional[int] = None, max_workers: Optional[int] = None,
license_widevine: Optional[Callable] = None, license_widevine: Optional[Callable] = None,
@@ -238,15 +239,13 @@ class HLS:
) -> None: ) -> None:
if not session: if not session:
session = Session() session = Session()
elif not isinstance(session, (Session, httpx.Client)): elif not isinstance(session, (Session, CurlSession)):
raise TypeError(f"Expected session to be a {Session} or {httpx.Client}, not {session!r}") raise TypeError(f"Expected session to be a {Session} or {CurlSession}, not {session!r}")
if proxy: if proxy:
# Handle proxies differently based on session type # Handle proxies differently based on session type
if isinstance(session, Session): if isinstance(session, Session):
session.proxies.update({"all": proxy}) session.proxies.update({"all": proxy})
elif isinstance(session, httpx.Client):
session.proxies = {"http://": proxy, "https://": proxy}
log = logging.getLogger("HLS") log = logging.getLogger("HLS")
@@ -257,13 +256,8 @@ class HLS:
log.error(f"Failed to request the invariant M3U8 playlist: {response.status_code}") log.error(f"Failed to request the invariant M3U8 playlist: {response.status_code}")
sys.exit(1) sys.exit(1)
playlist_text = response.text playlist_text = response.text
elif isinstance(response, httpx.Response):
if response.status_code >= 400:
log.error(f"Failed to request the invariant M3U8 playlist: {response.status_code}")
sys.exit(1)
playlist_text = response.text
else: else:
raise TypeError(f"Expected response to be a requests.Response or httpx.Response, not {type(response)}") raise TypeError(f"Expected response to be a requests.Response or curl_cffi.Response, not {type(response)}")
master = m3u8.loads(playlist_text, uri=track.url) master = m3u8.loads(playlist_text, uri=track.url)
@@ -533,13 +527,9 @@ class HLS:
if isinstance(res, requests.Response): if isinstance(res, requests.Response):
res.raise_for_status() res.raise_for_status()
init_content = res.content init_content = res.content
elif isinstance(res, httpx.Response):
if res.status_code >= 400:
raise requests.HTTPError(f"HTTP Error: {res.status_code}", response=res)
init_content = res.content
else: else:
raise TypeError( raise TypeError(
f"Expected response to be requests.Response or httpx.Response, not {type(res)}" f"Expected response to be requests.Response or curl_cffi.Response, not {type(res)}"
) )
map_data = (segment.init_section, init_content) map_data = (segment.init_section, init_content)
@@ -707,7 +697,7 @@ class HLS:
@staticmethod @staticmethod
def parse_session_data_keys( def parse_session_data_keys(
manifest: M3U8, session: Optional[Union[Session, httpx.Client]] = None manifest: M3U8, session: Optional[Union[Session, CurlSession]] = None
) -> list[m3u8.model.Key]: ) -> list[m3u8.model.Key]:
"""Parse `com.apple.hls.keys` session data and return Key objects.""" """Parse `com.apple.hls.keys` session data and return Key objects."""
keys: list[m3u8.model.Key] = [] keys: list[m3u8.model.Key] = []
@@ -798,7 +788,8 @@ class HLS:
@staticmethod @staticmethod
def get_drm( def get_drm(
key: Union[m3u8.model.SessionKey, m3u8.model.Key], session: Optional[Union[Session, httpx.Client]] = None key: Union[m3u8.model.SessionKey, m3u8.model.Key],
session: Optional[Union[Session, CurlSession]] = None,
) -> DRM_T: ) -> DRM_T:
""" """
Convert HLS EXT-X-KEY data to an initialized DRM object. Convert HLS EXT-X-KEY data to an initialized DRM object.
@@ -810,8 +801,8 @@ class HLS:
Raises a NotImplementedError if the key system is not supported. Raises a NotImplementedError if the key system is not supported.
""" """
if not isinstance(session, (Session, httpx.Client, type(None))): if not isinstance(session, (Session, CurlSession, type(None))):
raise TypeError(f"Expected session to be a {Session} or {httpx.Client}, not {type(session)}") raise TypeError(f"Expected session to be a {Session} or {CurlSession}, not {type(session)}")
if not session: if not session:
session = Session() session = Session()

View File

@@ -10,6 +10,7 @@ from pathlib import Path
from typing import Any, Callable, Optional, Union from typing import Any, Callable, Optional, Union
import requests import requests
from curl_cffi.requests import Session as CurlSession
from langcodes import Language, tag_is_valid from langcodes import Language, tag_is_valid
from lxml.etree import Element from lxml.etree import Element
from pyplayready.system.pssh import PSSH as PR_PSSH from pyplayready.system.pssh import PSSH as PR_PSSH
@@ -34,11 +35,13 @@ class ISM:
self.url = url self.url = url
@classmethod @classmethod
def from_url(cls, url: str, session: Optional[Session] = None, **kwargs: Any) -> "ISM": def from_url(cls, url: str, session: Optional[Union[Session, CurlSession]] = None, **kwargs: Any) -> "ISM":
if not url: if not url:
raise requests.URLRequired("ISM manifest URL must be provided") raise requests.URLRequired("ISM manifest URL must be provided")
if not session: if not session:
session = Session() session = Session()
elif not isinstance(session, (Session, CurlSession)):
raise TypeError(f"Expected session to be a {Session} or {CurlSession}, not {session!r}")
res = session.get(url, **kwargs) res = session.get(url, **kwargs)
if res.url != url: if res.url != url:
url = res.url url = res.url

View File

@@ -2,9 +2,10 @@
from __future__ import annotations from __future__ import annotations
from typing import Optional from typing import Optional, Union
import m3u8 import m3u8
from curl_cffi.requests import Session as CurlSession
from requests import Session from requests import Session
from unshackle.core.manifests.hls import HLS from unshackle.core.manifests.hls import HLS
@@ -15,7 +16,7 @@ def parse(
master: m3u8.M3U8, master: m3u8.M3U8,
language: str, language: str,
*, *,
session: Optional[Session] = None, session: Optional[Union[Session, CurlSession]] = None,
) -> Tracks: ) -> Tracks:
"""Parse a variant playlist to ``Tracks`` with basic information, defer DRM loading.""" """Parse a variant playlist to ``Tracks`` with basic information, defer DRM loading."""
tracks = HLS(master, session=session).to_tracks(language) tracks = HLS(master, session=session).to_tracks(language)

79
unshackle/core/session.py Normal file
View File

@@ -0,0 +1,79 @@
"""Session utilities for creating HTTP sessions with different backends."""
from __future__ import annotations
import warnings
from curl_cffi.requests import Session as CurlSession
from unshackle.core.config import config
# Globally suppress curl_cffi HTTPS proxy warnings since some proxy providers
# (like NordVPN) require HTTPS URLs but curl_cffi expects HTTP format
warnings.filterwarnings(
"ignore", message="Make sure you are using https over https proxy.*", category=RuntimeWarning, module="curl_cffi.*"
)
class Session(CurlSession):
"""curl_cffi Session with warning suppression."""
def request(self, method, url, **kwargs):
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message="Make sure you are using https over https proxy.*", category=RuntimeWarning
)
return super().request(method, url, **kwargs)
def session(browser: str | None = None, **kwargs) -> Session:
"""
Create a curl_cffi session that impersonates a browser.
This is a full replacement for requests.Session with browser impersonation
and anti-bot capabilities. The session uses curl-impersonate under the hood
to mimic real browser behavior.
Args:
browser: Browser to impersonate (e.g. "chrome124", "firefox", "safari").
Uses the configured default from curl_impersonate.browser if not specified.
See https://github.com/lexiforest/curl_cffi#sessions for available options.
**kwargs: Additional arguments passed to CurlSession constructor:
- headers: Additional headers (dict)
- cookies: Cookie jar or dict
- auth: HTTP basic auth tuple (username, password)
- proxies: Proxy configuration dict
- verify: SSL certificate verification (bool, default True)
- timeout: Request timeout in seconds (float or tuple)
- allow_redirects: Follow redirects (bool, default True)
- max_redirects: Maximum redirect count (int)
- cert: Client certificate (str or tuple)
Returns:
curl_cffi.requests.Session configured with browser impersonation, common headers,
and equivalent retry behavior to requests.Session.
Example:
from unshackle.core.session import session
class MyService(Service):
@staticmethod
def get_session():
return session() # Uses config default browser
"""
if browser is None:
browser = config.curl_impersonate.get("browser", "chrome124")
session_config = {
"impersonate": browser,
"timeout": 30.0,
"allow_redirects": True,
"max_redirects": 15,
"verify": True,
}
session_config.update(kwargs)
session_obj = Session(**session_config)
session_obj.headers.update(config.headers)
return session_obj

View File

@@ -13,6 +13,7 @@ from typing import Any, Callable, Iterable, Optional, Union
from uuid import UUID from uuid import UUID
from zlib import crc32 from zlib import crc32
from curl_cffi.requests import Session as CurlSession
from langcodes import Language from langcodes import Language
from pyplayready.cdm import Cdm as PlayReadyCdm from pyplayready.cdm import Cdm as PlayReadyCdm
from pywidevine.cdm import Cdm as WidevineCdm from pywidevine.cdm import Cdm as WidevineCdm
@@ -585,8 +586,8 @@ class Track:
raise TypeError(f"Expected url to be a {str}, not {type(url)}") raise TypeError(f"Expected url to be a {str}, not {type(url)}")
if not isinstance(byte_range, (str, type(None))): if not isinstance(byte_range, (str, type(None))):
raise TypeError(f"Expected byte_range to be a {str}, not {type(byte_range)}") raise TypeError(f"Expected byte_range to be a {str}, not {type(byte_range)}")
if not isinstance(session, (Session, type(None))): if not isinstance(session, (Session, CurlSession, type(None))):
raise TypeError(f"Expected session to be a {Session}, not {type(session)}") raise TypeError(f"Expected session to be a {Session} or {CurlSession}, not {type(session)}")
if not url: if not url:
if self.descriptor != self.Descriptor.URL: if self.descriptor != self.Descriptor.URL:

View File

@@ -181,7 +181,7 @@ class Tracks:
log = logging.getLogger("Tracks") log = logging.getLogger("Tracks")
if duplicates: if duplicates:
log.warning(f" - Found and skipped {duplicates} duplicate tracks...") log.debug(f" - Found and skipped {duplicates} duplicate tracks...")
def sort_videos(self, by_language: Optional[Sequence[Union[str, Language]]] = None) -> None: def sort_videos(self, by_language: Optional[Sequence[Union[str, Language]]] = None) -> None:
"""Sort video tracks by bitrate, and optionally language.""" """Sort video tracks by bitrate, and optionally language."""