mirror of
https://github.com/unshackle-dl/unshackle.git
synced 2025-10-23 15:11:08 +00:00
The Transfer enum was missing value 2, which according to ITU-T H.Sup19 standards represents "Unspecified (Image characteristics are unknown or are determined by the application)". This value is often used for still image coding systems.
456 lines
17 KiB
Python
456 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
import re
|
|
import subprocess
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Any, Optional, Union
|
|
|
|
from langcodes import Language
|
|
|
|
from unshackle.core import binaries
|
|
from unshackle.core.config import config
|
|
from unshackle.core.tracks.subtitle import Subtitle
|
|
from unshackle.core.tracks.track import Track
|
|
from unshackle.core.utilities import FPS, get_boxes
|
|
|
|
|
|
class Video(Track):
|
|
class Codec(str, Enum):
|
|
AVC = "H.264"
|
|
HEVC = "H.265"
|
|
VC1 = "VC-1"
|
|
VP8 = "VP8"
|
|
VP9 = "VP9"
|
|
AV1 = "AV1"
|
|
|
|
@property
|
|
def extension(self) -> str:
|
|
return self.value.lower().replace(".", "").replace("-", "")
|
|
|
|
@staticmethod
|
|
def from_mime(mime: str) -> Video.Codec:
|
|
mime = mime.lower().strip().split(".")[0]
|
|
if mime in (
|
|
"avc1",
|
|
"avc2",
|
|
"avc3",
|
|
"dva1",
|
|
"dvav", # Dolby Vision
|
|
):
|
|
return Video.Codec.AVC
|
|
if mime in (
|
|
"hev1",
|
|
"hev2",
|
|
"hev3",
|
|
"hvc1",
|
|
"hvc2",
|
|
"hvc3",
|
|
"dvh1",
|
|
"dvhe", # Dolby Vision
|
|
"lhv1",
|
|
"lhe1", # Layered
|
|
):
|
|
return Video.Codec.HEVC
|
|
if mime == "vc-1":
|
|
return Video.Codec.VC1
|
|
if mime in ("vp08", "vp8"):
|
|
return Video.Codec.VP8
|
|
if mime in ("vp09", "vp9"):
|
|
return Video.Codec.VP9
|
|
if mime == "av01":
|
|
return Video.Codec.AV1
|
|
raise ValueError(f"The MIME '{mime}' is not a supported Video Codec")
|
|
|
|
@staticmethod
|
|
def from_codecs(codecs: str) -> Video.Codec:
|
|
for codec in codecs.lower().split(","):
|
|
codec = codec.strip()
|
|
mime = codec.split(".")[0]
|
|
try:
|
|
return Video.Codec.from_mime(mime)
|
|
except ValueError:
|
|
pass
|
|
raise ValueError(f"No MIME types matched any supported Video Codecs in '{codecs}'")
|
|
|
|
@staticmethod
|
|
def from_netflix_profile(profile: str) -> Video.Codec:
|
|
profile = profile.lower().strip()
|
|
if profile.startswith(("h264", "playready-h264")):
|
|
return Video.Codec.AVC
|
|
if profile.startswith("hevc"):
|
|
return Video.Codec.HEVC
|
|
if profile.startswith("vp9"):
|
|
return Video.Codec.VP9
|
|
if profile.startswith("av1"):
|
|
return Video.Codec.AV1
|
|
raise ValueError(f"The Content Profile '{profile}' is not a supported Video Codec")
|
|
|
|
class Range(str, Enum):
|
|
SDR = "SDR" # No Dynamic Range
|
|
HLG = "HLG" # https://en.wikipedia.org/wiki/Hybrid_log%E2%80%93gamma
|
|
HDR10 = "HDR10" # https://en.wikipedia.org/wiki/HDR10
|
|
HDR10P = "HDR10+" # https://en.wikipedia.org/wiki/HDR10%2B
|
|
DV = "DV" # https://en.wikipedia.org/wiki/Dolby_Vision
|
|
HYBRID = "HYBRID" # Selects both HDR10 and DV tracks for hybrid processing with DoviTool
|
|
|
|
@staticmethod
|
|
def from_cicp(primaries: int, transfer: int, matrix: int) -> Video.Range:
|
|
"""
|
|
ISO/IEC 23001-8 Coding-independent code points to Video Range.
|
|
|
|
Sources:
|
|
https://www.itu.int/rec/T-REC-H.Sup19-202104-I
|
|
"""
|
|
|
|
class Primaries(Enum):
|
|
Unspecified = 0
|
|
BT_709 = 1
|
|
BT_601_625 = 5
|
|
BT_601_525 = 6
|
|
BT_2020_and_2100 = 9
|
|
SMPTE_ST_2113_and_EG_4321 = 12 # P3D65
|
|
|
|
class Transfer(Enum):
|
|
Unspecified = 0
|
|
BT_709 = 1
|
|
Unspecified_Image = 2
|
|
BT_601 = 6
|
|
BT_2020 = 14
|
|
BT_2100 = 15
|
|
BT_2100_PQ = 16
|
|
BT_2100_HLG = 18
|
|
|
|
class Matrix(Enum):
|
|
RGB = 0
|
|
YCbCr_BT_709 = 1
|
|
YCbCr_BT_601_625 = 5
|
|
YCbCr_BT_601_525 = 6
|
|
YCbCr_BT_2020_and_2100 = 9 # YCbCr BT.2100 shares the same CP
|
|
ICtCp_BT_2100 = 14
|
|
|
|
if transfer == 5:
|
|
# While not part of any standard, it is typically used as a PAL variant of Transfer.BT_601=6.
|
|
# i.e. where Transfer 6 would be for BT.601-NTSC and Transfer 5 would be for BT.601-PAL.
|
|
# The codebase is currently agnostic to either, so a manual conversion to 6 is done.
|
|
transfer = 6
|
|
|
|
primaries = Primaries(primaries)
|
|
transfer = Transfer(transfer)
|
|
matrix = Matrix(matrix)
|
|
|
|
# primaries and matrix does not strictly correlate to a range
|
|
|
|
if (primaries, transfer, matrix) == (0, 0, 0):
|
|
return Video.Range.SDR
|
|
elif primaries in (Primaries.BT_601_625, Primaries.BT_601_525):
|
|
return Video.Range.SDR
|
|
elif transfer == Transfer.BT_2100_PQ:
|
|
return Video.Range.HDR10
|
|
elif transfer == Transfer.BT_2100_HLG:
|
|
return Video.Range.HLG
|
|
else:
|
|
return Video.Range.SDR
|
|
|
|
@staticmethod
|
|
def from_m3u_range_tag(tag: str) -> Optional[Video.Range]:
|
|
tag = (tag or "").upper().replace('"', "").strip()
|
|
if not tag:
|
|
return None
|
|
if tag == "SDR":
|
|
return Video.Range.SDR
|
|
elif tag == "PQ":
|
|
return Video.Range.HDR10 # technically could be any PQ-transfer range
|
|
elif tag == "HLG":
|
|
return Video.Range.HLG
|
|
# for some reason there's no Dolby Vision info tag
|
|
raise ValueError(f"The M3U Range Tag '{tag}' is not a supported Video Range")
|
|
|
|
def __init__(
|
|
self,
|
|
*args: Any,
|
|
codec: Optional[Video.Codec] = None,
|
|
range_: Optional[Video.Range] = None,
|
|
bitrate: Optional[Union[str, int, float]] = None,
|
|
width: Optional[int] = None,
|
|
height: Optional[int] = None,
|
|
fps: Optional[Union[str, int, float]] = None,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""
|
|
Create a new Video track object.
|
|
|
|
Parameters:
|
|
codec: A Video.Codec enum representing the video codec.
|
|
If not specified, MediaInfo will be used to retrieve the codec
|
|
once the track has been downloaded.
|
|
range_: A Video.Range enum representing the video color range.
|
|
Defaults to SDR if not specified.
|
|
bitrate: A number or float representing the average bandwidth in bytes/s.
|
|
Float values are rounded up to the nearest integer.
|
|
width: The horizontal resolution of the video.
|
|
height: The vertical resolution of the video.
|
|
fps: A number, float, or string representing the frames/s of the video.
|
|
Strings may represent numbers, floats, or a fraction (num/den).
|
|
All strings will be cast to either a number or float.
|
|
|
|
Note: If codec, bitrate, width, height, or fps is not specified some checks
|
|
may be skipped or assume a value. Specifying as much information as possible
|
|
is highly recommended.
|
|
"""
|
|
super().__init__(*args, **kwargs)
|
|
|
|
if not isinstance(codec, (Video.Codec, type(None))):
|
|
raise TypeError(f"Expected codec to be a {Video.Codec}, not {codec!r}")
|
|
if not isinstance(range_, (Video.Range, type(None))):
|
|
raise TypeError(f"Expected range_ to be a {Video.Range}, not {range_!r}")
|
|
if not isinstance(bitrate, (str, int, float, type(None))):
|
|
raise TypeError(f"Expected bitrate to be a {str}, {int}, or {float}, not {bitrate!r}")
|
|
if not isinstance(width, (int, str, type(None))):
|
|
raise TypeError(f"Expected width to be a {int}, not {width!r}")
|
|
if not isinstance(height, (int, str, type(None))):
|
|
raise TypeError(f"Expected height to be a {int}, not {height!r}")
|
|
if not isinstance(fps, (str, int, float, type(None))):
|
|
raise TypeError(f"Expected fps to be a {str}, {int}, or {float}, not {fps!r}")
|
|
|
|
self.codec = codec
|
|
self.range = range_ or Video.Range.SDR
|
|
|
|
try:
|
|
self.bitrate = int(math.ceil(float(bitrate))) if bitrate else None
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Expected bitrate to be a number or float, {e}")
|
|
|
|
try:
|
|
self.width = int(width or 0) or None
|
|
except ValueError as e:
|
|
raise ValueError(f"Expected width to be a number, not {width!r}, {e}")
|
|
|
|
try:
|
|
self.height = int(height or 0) or None
|
|
except ValueError as e:
|
|
raise ValueError(f"Expected height to be a number, not {height!r}, {e}")
|
|
|
|
try:
|
|
self.fps = (FPS.parse(str(fps)) or None) if fps else None
|
|
except Exception as e:
|
|
raise ValueError("Expected fps to be a number, float, or a string as numerator/denominator form, " + str(e))
|
|
|
|
self.needs_duration_fix = False
|
|
|
|
def __str__(self) -> str:
|
|
return " | ".join(
|
|
filter(
|
|
bool,
|
|
[
|
|
"VID",
|
|
"[" + (", ".join(filter(bool, [self.codec.value if self.codec else None, self.range.name]))) + "]",
|
|
str(self.language),
|
|
", ".join(
|
|
filter(
|
|
bool,
|
|
[
|
|
" @ ".join(
|
|
filter(
|
|
bool,
|
|
[
|
|
f"{self.width}x{self.height}" if self.width and self.height else None,
|
|
f"{self.bitrate // 1000} kb/s" if self.bitrate else None,
|
|
],
|
|
)
|
|
),
|
|
f"{self.fps:.3f} FPS" if self.fps else None,
|
|
],
|
|
)
|
|
),
|
|
self.edition,
|
|
],
|
|
)
|
|
)
|
|
|
|
def change_color_range(self, range_: int) -> None:
|
|
"""Change the Video's Color Range to Limited (0) or Full (1)."""
|
|
if not self.path or not self.path.exists():
|
|
raise ValueError("Cannot change the color range flag on a Video that has not been downloaded.")
|
|
if not self.codec:
|
|
raise ValueError("Cannot change the color range flag on a Video that has no codec specified.")
|
|
if self.codec not in (Video.Codec.AVC, Video.Codec.HEVC):
|
|
raise NotImplementedError(
|
|
"Cannot change the color range flag on this Video as "
|
|
f"it's codec, {self.codec.value}, is not yet supported."
|
|
)
|
|
|
|
if not binaries.FFMPEG:
|
|
raise EnvironmentError('FFmpeg executable "ffmpeg" was not found but is required for this call.')
|
|
|
|
filter_key = {Video.Codec.AVC: "h264_metadata", Video.Codec.HEVC: "hevc_metadata"}[self.codec]
|
|
|
|
original_path = self.path
|
|
output_path = original_path.with_stem(f"{original_path.stem}_{['limited', 'full'][range_]}_range")
|
|
|
|
subprocess.run(
|
|
[
|
|
binaries.FFMPEG,
|
|
"-hide_banner",
|
|
"-loglevel",
|
|
"panic",
|
|
"-i",
|
|
original_path,
|
|
"-codec",
|
|
"copy",
|
|
"-bsf:v",
|
|
f"{filter_key}=video_full_range_flag={range_}",
|
|
str(output_path),
|
|
],
|
|
check=True,
|
|
)
|
|
|
|
self.path = output_path
|
|
original_path.unlink()
|
|
|
|
def ccextractor(
|
|
self, track_id: Any, out_path: Union[Path, str], language: Language, original: bool = False
|
|
) -> Optional[Subtitle]:
|
|
"""Return a TextTrack object representing CC track extracted by CCExtractor."""
|
|
if not self.path:
|
|
raise ValueError("You must download the track first.")
|
|
|
|
if not binaries.CCExtractor:
|
|
raise EnvironmentError("ccextractor executable was not found.")
|
|
|
|
# ccextractor often fails in weird ways unless we repack
|
|
self.repackage()
|
|
|
|
out_path = Path(out_path)
|
|
|
|
try:
|
|
subprocess.run(
|
|
[binaries.CCExtractor, "-trim", "-nobom", "-noru", "-ru1", "-o", out_path, self.path],
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
out_path.unlink(missing_ok=True)
|
|
if not e.returncode == 10: # No captions found
|
|
raise
|
|
|
|
if out_path.exists():
|
|
cc_track = Subtitle(
|
|
id_=track_id,
|
|
url="", # doesn't need to be downloaded
|
|
codec=Subtitle.Codec.SubRip,
|
|
language=language,
|
|
is_original_lang=original,
|
|
cc=True,
|
|
)
|
|
cc_track.path = out_path
|
|
return cc_track
|
|
|
|
return None
|
|
|
|
def extract_c608(self) -> list[Subtitle]:
|
|
"""
|
|
Extract Apple-Style c608 box (CEA-608) subtitle using ccextractor.
|
|
|
|
This isn't much more than a wrapper to the track.ccextractor function.
|
|
All this does, is actually check if a c608 box exists and only if so
|
|
does it actually call ccextractor.
|
|
|
|
Even though there is a possibility of more than one c608 box, only one
|
|
can actually be extracted. Not only that but it's very possible this
|
|
needs to be done before any decryption as the decryption may destroy
|
|
some of the metadata.
|
|
|
|
TODO: Need a test file with more than one c608 box to add support for
|
|
more than one CEA-608 extraction.
|
|
"""
|
|
if not self.path:
|
|
raise ValueError("You must download the track first.")
|
|
with self.path.open("rb") as f:
|
|
# assuming 20KB is enough to contain the c608 box.
|
|
# ffprobe will fail, so a c608 box check must be done.
|
|
c608_count = len(list(get_boxes(f.read(20000), b"c608")))
|
|
if c608_count > 0:
|
|
# TODO: Figure out the real language, it might be different
|
|
# CEA-608 boxes doesnt seem to carry language information :(
|
|
# TODO: Figure out if the CC language is original lang or not.
|
|
# Will need to figure out above first to do so.
|
|
track_id = f"ccextractor-{self.id}"
|
|
cc_lang = self.language
|
|
cc_track = self.ccextractor(
|
|
track_id=track_id,
|
|
out_path=config.directories.temp / config.filenames.subtitle.format(id=track_id, language=cc_lang),
|
|
language=cc_lang,
|
|
original=False,
|
|
)
|
|
if not cc_track:
|
|
return []
|
|
return [cc_track]
|
|
return []
|
|
|
|
def remove_eia_cc(self) -> bool:
|
|
"""
|
|
Remove EIA-CC data from Bitstream while keeping SEI data.
|
|
|
|
This works by removing all NAL Unit's with the Type of 6 from the bistream
|
|
and then re-adding SEI data (effectively a new NAL Unit with just the SEI data).
|
|
Only bitstreams with x264 encoding information is currently supported due to the
|
|
obscurity on the MDAT mp4 box structure. Therefore, we need to use hacky regex.
|
|
"""
|
|
if not self.path or not self.path.exists():
|
|
raise ValueError("Cannot clean a Track that has not been downloaded.")
|
|
|
|
if not binaries.FFMPEG:
|
|
raise EnvironmentError('FFmpeg executable "ffmpeg" was not found but is required for this call.')
|
|
|
|
log = logging.getLogger("x264-clean")
|
|
log.info("Removing EIA-CC from Video Track with FFMPEG")
|
|
|
|
with open(self.path, "rb") as f:
|
|
file = f.read(60000)
|
|
|
|
x264 = re.search(rb"(.{16})(x264)", file)
|
|
if not x264:
|
|
log.info(" - No x264 encode settings were found, unsupported...")
|
|
return False
|
|
|
|
uuid = x264.group(1).hex()
|
|
i = file.index(b"x264")
|
|
encoding_settings = file[i : i + file[i:].index(b"\x00")].replace(b":", rb"\\:").replace(b",", rb"\,").decode()
|
|
|
|
original_path = self.path
|
|
cleaned_path = original_path.with_suffix(f".cleaned{original_path.suffix}")
|
|
subprocess.run(
|
|
[
|
|
binaries.FFMPEG,
|
|
"-hide_banner",
|
|
"-loglevel",
|
|
"panic",
|
|
"-i",
|
|
original_path,
|
|
"-map_metadata",
|
|
"-1",
|
|
"-fflags",
|
|
"bitexact",
|
|
"-bsf:v",
|
|
f"filter_units=remove_types=6,h264_metadata=sei_user_data={uuid}+{encoding_settings}",
|
|
"-codec",
|
|
"copy",
|
|
str(cleaned_path),
|
|
],
|
|
check=True,
|
|
)
|
|
|
|
log.info(" + Removed")
|
|
|
|
self.path = cleaned_path
|
|
original_path.unlink()
|
|
|
|
return True
|
|
|
|
|
|
__all__ = ("Video",)
|