Initial commit

This commit is contained in:
WorldObservationLog
2024-05-04 15:58:59 +08:00
commit 6d36521005
36 changed files with 4039 additions and 0 deletions

0
src/__init__.py Normal file
View File

150
src/adb.py Normal file
View File

@@ -0,0 +1,150 @@
import asyncio
import json
import subprocess
from typing import Optional
import frida
import regex
from loguru import logger
from ppadb.client import Client as AdbClient
from ppadb.device import Device as AdbDevice
from src.exceptions import FridaNotExistException, ADBConnectException, FailedGetAuthParamException
from src.types import AuthParams
class Device:
host: str
client: AdbClient
device: AdbDevice
fridaPath: str
fridaPort: int
fridaDevice: frida.core.Device = None
fridaSession: frida.core.Session = None
pid: int
authParams: AuthParams = None
suMethod: str
decryptLock: asyncio.Lock
def __init__(self, host="127.0.0.1", port=5037,
frida_path="/data/local/tmp/frida-server-16.2.1-android-x86_64", su_method: str = "su -c"):
self.client = AdbClient(host, port)
self.fridaPath = frida_path
self.suMethod = su_method
self.host = host
self.decryptLock = asyncio.Lock()
def connect(self, host: str, port: int):
try:
status = self.client.remote_connect(host, port)
except RuntimeError:
subprocess.run("adb devices", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
status = self.client.remote_connect(host, port)
if not status:
raise ADBConnectException
self.device = self.client.device(f"{host}:{port}")
def _execute_command(self, cmd: str, su: bool = False) -> Optional[str]:
if su:
cmd = cmd.replace("\"", "\\\"")
output = self.device.shell(f"{self.suMethod} \"{cmd}\"")
else:
output = self.device.shell(cmd, timeout=30)
if not output:
return ""
return output
def _if_frida_running(self) -> bool:
logger.debug("checking if frida-server running")
output = self._execute_command("ps -e | grep frida")
if not output or "frida" not in output:
return False
return True
def _start_remote_frida(self):
logger.debug("starting remote frida")
output = f"(ls {self.fridaPath} && echo True) || echo False"
if not output or "True" not in output:
raise FridaNotExistException
permission = self._execute_command(f"ls -l {self.fridaPath}")
if not permission or "x" not in permission[:10]:
self._execute_command(f"chmod +x {self.fridaPath}", True)
self._execute_command(f"{self.fridaPath} &", True)
def _start_forward(self, local_port: int, remote_port: int):
self.device.forward(f"tcp:{local_port}", f"tcp:{remote_port}")
def _inject_frida(self, frida_port):
logger.debug("injecting agent script")
self.fridaPort = frida_port
with open("agent.js", "r") as f:
agent = f.read().replace("2147483647", str(frida_port))
if not self.fridaDevice:
frida.get_device_manager().add_remote_device(self.device.serial)
self.fridaDevice = frida.get_device_manager().get_device(self.device.serial)
self.pid = self.fridaDevice.spawn("com.apple.android.music")
self.fridaSession = self.fridaDevice.attach(self.pid)
script: frida.core.Script = self.fridaSession.create_script(agent)
script.load()
self.fridaDevice.resume(self.pid)
def restart_inject_frida(self):
self.fridaSession.detach()
self._kill_apple_music()
self._inject_frida(self.fridaPort)
def _kill_apple_music(self):
self._execute_command(f"kill -9 {self.pid}", su=True)
def start_inject_frida(self, frida_port):
if not self._if_frida_running():
self._start_remote_frida()
self._start_forward(frida_port, frida_port)
self._inject_frida(frida_port)
def _get_dsid(self) -> str:
logger.debug("getting dsid")
dsid = self._execute_command(
"sqlite3 /data/data/com.apple.android.music/files/mpl_db/cookies.sqlitedb \"select value from cookies where name='X-Dsid';\"", True)
if not dsid:
raise FailedGetAuthParamException
return dsid.strip()
def _get_account_token(self, dsid: str) -> str:
logger.debug("getting account token")
account_token = self._execute_command(
f"sqlite3 /data/data/com.apple.android.music/files/mpl_db/cookies.sqlitedb \"select value from cookies where name='mz_at_ssl-{dsid}';\"", True)
if not account_token:
raise FailedGetAuthParamException
return account_token.strip()
def _get_access_token(self) -> str:
logger.debug("getting access token")
prefs = self._execute_command("cat /data/data/com.apple.android.music/shared_prefs/preferences.xml", True)
match = regex.search(r"eyJr[^<]*", prefs)
if not match:
raise FailedGetAuthParamException
return match[0]
def _get_storefront(self) -> str | None:
logger.debug("getting storefront")
storefront_id = self._execute_command(
"sqlite3 /data/data/com.apple.android.music/files/mpl_db/accounts.sqlitedb \"select storeFront from account;\"", True)
if not storefront_id:
raise FailedGetAuthParamException
with open("assets/storefront_ids.json") as f:
storefront_ids = json.load(f)
for storefront_mapping in storefront_ids:
if storefront_mapping["storefrontId"] == int(storefront_id.split("-")[0]):
return storefront_mapping["code"]
return None
def get_auth_params(self):
if not self.authParams:
dsid = self._get_dsid()
token = self._get_account_token(dsid)
access_token = self._get_access_token()
storefront = self._get_storefront()
self.authParams = AuthParams(dsid=dsid, accountToken=token,
accountAccessToken=access_token, storefront=storefront)
return self.authParams

103
src/api.py Normal file
View File

@@ -0,0 +1,103 @@
import asyncio
import logging
from ssl import SSLError
import httpcore
import httpx
import regex
from tenacity import retry, retry_if_exception_type, stop_after_attempt, before_sleep_log
from loguru import logger
from src.models import *
client = httpx.AsyncClient()
lock = asyncio.Semaphore(1)
user_agent_browser = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
user_agent_itunes = "iTunes/12.11.3 (Windows; Microsoft Windows 10 x64 Professional Edition (Build 19041); x64) AppleWebKit/7611.1022.4001.1 (dt:2)"
user_agent_app = "Music/5.7 Android/10 model/Pixel6GR1YH build/1234 (dt:66)"
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)), stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_token():
req = await client.get("https://beta.music.apple.com")
index_js_uri = regex.findall(r"/assets/index-legacy-[^/]+\.js", req.text)[0]
js_req = await client.get("https://beta.music.apple.com" + index_js_uri)
token = regex.search(r'eyJh([^"]*)', js_req.text)[0]
return token
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)), stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def download_song(url: str) -> bytes:
async with lock:
return (await client.get(url)).content
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)), stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_meta(album_id: str, token: str, storefront: str):
if "pl." in album_id:
mtype = "playlists"
else:
mtype = "albums"
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/{mtype}/{album_id}",
params={"omit[resource]": "autos", "include": "tracks,artists,record-labels",
"include[songs]": "artists", "fields[artists]": "name",
"fields[albums:albums]": "artistName,artwork,name,releaseDate,url",
"fields[record-labels]": "name"},
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
"Origin": "https://music.apple.com"})
if mtype == "albums":
return AlbumMeta.model_validate(req.json())
else:
result = PlaylistMeta.model_validate(req.json())
result.data[0].attributes.artistName = "Apple Music"
if result.data[0].relationships.tracks.next:
page = 0
while True:
page += 100
page_req = await client.get(
f"https://amp-api.music.apple.com/v1/catalog/{storefront}/{mtype}/{album_id}/tracks?offset={page}",
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
"Origin": "https://music.apple.com"})
page_result = TracksMeta.model_validate(page_req.json())
result.data[0].relationships.tracks.data.extend(page_result.data)
if not page_result.next:
break
return result
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)), stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_cover(url: str, cover_format: str):
formatted_url = regex.sub('bb.jpg', f'bb.{cover_format}', url)
req = await client.get(formatted_url.replace("{w}x{h}", "10000x10000"),
headers={"User-Agent": user_agent_browser})
return req.content
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)), stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_info_from_adam(adam_id: str, token: str, storefront: str):
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/songs/{adam_id}",
params={"extend": "extendedAssetUrls", "include": "albums"},
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_itunes,
"Origin": "https://music.apple.com"})
song_data_obj = SongData.model_validate(req.json())
for data in song_data_obj.data:
if data.id == adam_id:
return data
return None
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)), stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_song_lyrics(song_id: str, storefront: str, token: str, dsid: str, account_token: str) -> str:
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/songs/{song_id}/lyrics",
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_app,
"X-Dsid": dsid},
cookies={f"mz_at_ssl-{dsid}": account_token})
result = SongLyrics.model_validate(req.json())
return result.data[0].attributes.ttml

104
src/cmd.py Normal file
View File

@@ -0,0 +1,104 @@
import argparse
import asyncio
import random
import sys
from asyncio import Task
from loguru import logger
from prompt_toolkit import PromptSession, print_formatted_text, ANSI
from prompt_toolkit.patch_stdout import patch_stdout
from src.adb import Device
from src.api import get_token
from src.config import Config
from src.rip import rip_song, rip_album
from src.types import GlobalAuthParams
from src.url import AppleMusicURL, URLType
class NewInteractiveShell:
loop: asyncio.AbstractEventLoop
config: Config
tasks: list[Task] = []
devices: list[Device] = []
storefront_device_mapping: dict[str, list[Device]] = {}
anonymous_access_token: str
parser: argparse.ArgumentParser
def __init__(self, loop: asyncio.AbstractEventLoop):
self.loop = loop
self.config = Config.load_from_config()
self.anonymous_access_token = loop.run_until_complete(get_token())
self.parser = argparse.ArgumentParser(exit_on_error=False)
subparser = self.parser.add_subparsers()
download_parser = subparser.add_parser("download")
download_parser.add_argument("url", type=str)
download_parser.add_argument("-c", "--codec",
choices=["alac", "ec3", "aac", "aac-binaural", "aac-downmix"], default="alac")
download_parser.add_argument("-f", "--force", type=bool, default=False)
subparser.add_parser("exit")
logger.remove()
logger.add(lambda msg: print_formatted_text(ANSI(msg), end=""), colorize=True, level="INFO")
for device_info in self.config.devices:
device = Device(frida_path=device_info.fridaPath)
device.connect(device_info.host, device_info.port)
logger.info(f"Device {device_info.host}:{device_info.port} has connected")
self.devices.append(device)
auth_params = device.get_auth_params()
if not self.storefront_device_mapping.get(auth_params.storefront.lower()):
self.storefront_device_mapping.update({auth_params.storefront.lower(): []})
self.storefront_device_mapping[auth_params.storefront.lower()].append(device)
device.start_inject_frida(device_info.agentPort)
async def command_parser(self, cmd: str):
if not cmd.strip():
return
cmds = cmd.split(" ")
try:
args = self.parser.parse_args(cmds)
except argparse.ArgumentError:
logger.warning(f"Unknown command: {cmd}")
return
match cmds[0]:
case "download":
await self.do_download(args.url, args.codec, args.force)
case "exit":
self.loop.stop()
sys.exit()
async def do_download(self, raw_url: str, codec: str, force_download: bool):
url = AppleMusicURL.parse_url(raw_url)
devices = self.storefront_device_mapping.get(url.storefront)
if not devices:
logger.error(f"No device is available to decrypt the specified region: {url.storefront}")
available_devices = [device for device in devices if not device.decryptLock.locked()]
if not available_devices:
available_device: Device = random.choice(devices)
else:
available_device: Device = random.choice(available_devices)
global_auth_param = GlobalAuthParams.from_auth_params_and_token(available_device.get_auth_params(), self.anonymous_access_token)
match url.type:
case URLType.Song:
self.loop.create_task(rip_song(url, global_auth_param, codec, self.config, available_device, force_download))
case URLType.Album:
self.loop.create_task(rip_album(url, global_auth_param, codec, self.config, available_device))
async def handle_command(self):
session = PromptSession("> ")
while True:
try:
command = await session.prompt_async()
await self.command_parser(command)
except (EOFError, KeyboardInterrupt):
return
async def start(self):
with patch_stdout():
try:
await self.handle_command()
finally:
logger.info("Existing shell")

42
src/config.py Normal file
View File

@@ -0,0 +1,42 @@
import tomllib
from pydantic import BaseModel
class Language(BaseModel):
language: str
languageForGenre: str
class Device(BaseModel):
host: str
port: int
agentPort: int
fridaPath: str
class Download(BaseModel):
atmosConventToM4a: bool
songNameFormat: str
dirPathFormat: str
saveLyrics: bool
saveCover: bool
coverFormat: str
afterDownloaded: str
class Metadata(BaseModel):
embedMetadata: list[str]
class Config(BaseModel):
language: Language
devices: list[Device]
download: Download
metadata: Metadata
@classmethod
def load_from_config(cls, config_file: str = "config.toml"):
with open(config_file, "r") as f:
config = tomllib.loads(f.read())
return cls.parse_obj(config)

48
src/decrypt.py Normal file
View File

@@ -0,0 +1,48 @@
import asyncio
import logging
import sys
from prompt_toolkit.shortcuts import ProgressBar
from loguru import logger
from tenacity import retry, retry_if_exception_type, stop_after_attempt, before_sleep_log
from src.adb import Device
from src.exceptions import DecryptException
from src.models.song_data import Datum
from src.mp4 import SongInfo, SampleInfo
from src.types import defaultId, prefetchKey
async def decrypt(info: SongInfo, keys: list[str], manifest: Datum, device: Device) -> bytes:
async with device.decryptLock:
logger.info(f"Decrypting song: {manifest.attributes.artistName} - {manifest.attributes.name}")
reader, writer = await asyncio.open_connection(device.host, device.fridaPort)
decrypted = bytes()
last_index = 255
for sample in info.samples:
if last_index != sample.descIndex:
if len(decrypted) != 0:
writer.write(bytes([0, 0, 0, 0]))
key_uri = keys[sample.descIndex]
track_id = manifest.id
if key_uri == prefetchKey:
track_id = defaultId
writer.write(bytes([len(track_id)]))
writer.write(track_id.encode("utf-8"))
writer.write(bytes([len(key_uri)]))
writer.write(key_uri.encode("utf-8"))
last_index = sample.descIndex
result = await decrypt_sample(writer, reader, sample)
decrypted += result
writer.write(bytes([0, 0, 0, 0]))
writer.close()
return decrypted
async def decrypt_sample(writer: asyncio.StreamWriter, reader: asyncio.StreamReader, sample: SampleInfo) -> bytes:
writer.write(len(sample.data).to_bytes(4, byteorder="little", signed=False))
writer.write(sample.data)
result = await reader.read(len(sample.data))
if not result:
raise DecryptException
return result

18
src/exceptions.py Normal file
View File

@@ -0,0 +1,18 @@
class FridaNotExistException(Exception):
...
class ADBConnectException(Exception):
...
class FailedGetAuthParamException(Exception):
...
class DecryptException(Exception):
...
class NotTimeSyncedLyricsException(Exception):
...

58
src/metadata.py Normal file
View File

@@ -0,0 +1,58 @@
from pydantic import BaseModel
from src.api import get_cover
from src.models.song_data import Datum
from src.utils import ttml_convent_to_lrc
class SongMetadata(BaseModel):
title: str
artist: str
album_artist: str
album: str
composer: str
genre: str
created: str
track: str
tracknum: int
disk: int
lyrics: str
cover: bytes = None
cover_url: str
copyright: str
record_company: str
upc: str
isrc: str
def to_itags_params(self, embed_metadata: list[str], cover_format: str):
tags = []
for key, value in self.model_dump().items():
if key in embed_metadata and value:
if key == "cover":
continue
if key == "lyrics":
lrc = ttml_convent_to_lrc(value)
tags.append(f"{key}={lrc}")
continue
tags.append(f"{key}={value}")
return ":".join(tags)
@classmethod
def parse_from_song_data(cls, song_data: Datum):
return cls(title=song_data.attributes.name, artist=song_data.attributes.artistName,
album_artist=song_data.relationships.albums.data[0].attributes.artistName,
album=song_data.attributes.albumName, composer=song_data.attributes.composerName,
genre=song_data.attributes.genreNames[0], created=song_data.attributes.releaseDate,
track=song_data.attributes.name, tracknum=song_data.attributes.trackNumber,
disk=song_data.attributes.discNumber, lyrics="", cover_url=song_data.attributes.artwork.url,
copyright=song_data.relationships.albums.data[0].attributes.copyright,
record_company=song_data.relationships.albums.data[0].attributes.recordLabel,
upc=song_data.relationships.albums.data[0].attributes.upc,
isrc=song_data.attributes.isrc
)
def set_lyrics(self, lyrics: str):
self.lyrics = lyrics
async def get_cover(self, cover_format: str):
self.cover = await get_cover(self.cover_url, cover_format)

5
src/models/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from src.models.album_meta import AlbumMeta
from src.models.playlist_meta import PlaylistMeta
from src.models.tracks_meta import TracksMeta
from src.models.song_data import SongData
from src.models.song_lyrics import SongLyrics

160
src/models/album_meta.py Normal file
View File

@@ -0,0 +1,160 @@
from __future__ import annotations
from typing import List
from pydantic import BaseModel, Field
class Artwork(BaseModel):
width: int
url: str
height: int
textColor3: str
textColor2: str
textColor4: str
textColor1: str
bgColor: str
hasP3: bool
class PlayParams(BaseModel):
id: str
kind: str
class Attributes(BaseModel):
copyright: str
genreNames: List[str]
releaseDate: str
upc: str
isMasteredForItunes: bool
artwork: Artwork
url: str
playParams: PlayParams
recordLabel: str
isCompilation: bool
trackCount: int
isPrerelease: bool
audioTraits: List[str]
isSingle: bool
name: str
artistName: str
isComplete: bool
class Artwork1(BaseModel):
width: int
url: str
height: int
textColor3: str
textColor2: str
textColor4: str
textColor1: str
bgColor: str
hasP3: bool
class PlayParams1(BaseModel):
id: str
kind: str
class Preview(BaseModel):
url: str
class Attributes1(BaseModel):
hasTimeSyncedLyrics: bool
albumName: str
genreNames: List[str]
trackNumber: int
durationInMillis: int
releaseDate: str
isVocalAttenuationAllowed: bool
isMasteredForItunes: bool
isrc: str
artwork: Artwork1
composerName: str
audioLocale: str
playParams: PlayParams1
url: str
discNumber: int
hasCredits: bool
isAppleDigitalMaster: bool
hasLyrics: bool
audioTraits: List[str]
name: str
previews: List[Preview]
artistName: str
class Attributes2(BaseModel):
name: str
class Datum2(BaseModel):
id: str
type: str
href: str
attributes: Attributes2
class Artists(BaseModel):
href: str
data: List[Datum2]
class Relationships1(BaseModel):
artists: Artists
class Datum1(BaseModel):
id: str
type: str
href: str
attributes: Attributes1
relationships: Relationships1
class Tracks(BaseModel):
href: str
data: List[Datum1]
class Attributes3(BaseModel):
name: str
class Datum3(BaseModel):
id: str
type: str
href: str
attributes: Attributes3
class Artists1(BaseModel):
href: str
data: List[Datum3]
class RecordLabels(BaseModel):
href: str
data: List
class Relationships(BaseModel):
tracks: Tracks
artists: Artists1
record_labels: RecordLabels = Field(..., alias='record-labels')
class Datum(BaseModel):
id: str
type: str
href: str
attributes: Attributes
relationships: Relationships
class AlbumMeta(BaseModel):
data: List[Datum]

147
src/models/playlist_meta.py Normal file
View File

@@ -0,0 +1,147 @@
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel
class Description(BaseModel):
standard: str
short: str
class Artwork(BaseModel):
width: int
url: str
height: int
textColor3: str
textColor2: str
textColor4: str
textColor1: str
bgColor: str
hasP3: bool
class PlayParams(BaseModel):
id: str
kind: str
versionHash: str
class EditorialNotes(BaseModel):
name: str
standard: str
short: str
class Attributes(BaseModel):
lastModifiedDate: str
supportsSing: bool
description: Description
artwork: Artwork
playParams: PlayParams
url: str
hasCollaboration: bool
curatorName: str
audioTraits: List
name: str
isChart: bool
playlistType: str
editorialNotes: EditorialNotes
artistName: Optional[str] = None
class Artwork1(BaseModel):
width: int
url: str
height: int
textColor3: str
textColor2: str
textColor4: str
textColor1: str
bgColor: str
hasP3: bool
class PlayParams1(BaseModel):
id: str
kind: str
class Preview(BaseModel):
url: str
class Attributes1(BaseModel):
albumName: str
hasTimeSyncedLyrics: bool
genreNames: List[str]
trackNumber: int
releaseDate: str
durationInMillis: int
isVocalAttenuationAllowed: bool
isMasteredForItunes: bool
isrc: str
artwork: Artwork1
composerName: str
audioLocale: str
url: str
playParams: PlayParams1
discNumber: int
hasCredits: bool
hasLyrics: bool
isAppleDigitalMaster: bool
audioTraits: List[str]
name: str
previews: List[Preview]
artistName: str
class Attributes2(BaseModel):
name: str
class Datum2(BaseModel):
id: str
type: str
href: str
attributes: Attributes2
class Artists(BaseModel):
href: str
data: List[Datum2]
class Relationships1(BaseModel):
artists: Artists
class Datum1(BaseModel):
id: str
type: str
href: str
attributes: Attributes1
relationships: Relationships1
class Tracks(BaseModel):
href: str
next: Optional[str] = None
data: List[Datum1]
class Relationships(BaseModel):
tracks: Tracks
class Datum(BaseModel):
id: str
type: str
href: str
attributes: Attributes
relationships: Relationships
class PlaylistMeta(BaseModel):
data: List[Datum]

137
src/models/song_data.py Normal file
View File

@@ -0,0 +1,137 @@
from __future__ import annotations
from typing import List
from pydantic import BaseModel
class Artwork(BaseModel):
width: int
url: str
height: int
textColor3: str
textColor2: str
textColor4: str
textColor1: str
bgColor: str
hasP3: bool
class PlayParams(BaseModel):
id: str
kind: str
class Preview(BaseModel):
url: str
class ExtendedAssetUrls(BaseModel):
plus: str
lightweight: str
superLightweight: str
lightweightPlus: str
enhancedHls: str
class Attributes(BaseModel):
hasTimeSyncedLyrics: bool
albumName: str
genreNames: List[str]
trackNumber: int
durationInMillis: int
releaseDate: str
isVocalAttenuationAllowed: bool
isMasteredForItunes: bool
isrc: str
artwork: Artwork
composerName: str
audioLocale: str
url: str
playParams: PlayParams
discNumber: int
hasCredits: bool
isAppleDigitalMaster: bool
hasLyrics: bool
audioTraits: List[str]
name: str
previews: List[Preview]
artistName: str
extendedAssetUrls: ExtendedAssetUrls
class Artwork1(BaseModel):
width: int
url: str
height: int
textColor3: str
textColor2: str
textColor4: str
textColor1: str
bgColor: str
hasP3: bool
class PlayParams1(BaseModel):
id: str
kind: str
class Attributes1(BaseModel):
copyright: str
genreNames: List[str]
releaseDate: str
isMasteredForItunes: bool
upc: str
artwork: Artwork1
url: str
playParams: PlayParams1
recordLabel: str
isCompilation: bool
trackCount: int
isPrerelease: bool
audioTraits: List[str]
isSingle: bool
name: str
artistName: str
isComplete: bool
class Datum1(BaseModel):
id: str
type: str
href: str
attributes: Attributes1
class Albums(BaseModel):
href: str
data: List[Datum1]
class Datum2(BaseModel):
id: str
type: str
href: str
class Artists(BaseModel):
href: str
data: List[Datum2]
class Relationships(BaseModel):
albums: Albums
artists: Artists
class Datum(BaseModel):
id: str
type: str
href: str
attributes: Attributes
relationships: Relationships
class SongData(BaseModel):
data: List[Datum]

25
src/models/song_lyrics.py Normal file
View File

@@ -0,0 +1,25 @@
from typing import List
from pydantic import BaseModel
class PlayParams(BaseModel):
id: str
kind: str
catalogId: str
displayType: int
class Attributes(BaseModel):
ttml: str
playParams: PlayParams
class Datum(BaseModel):
id: str
type: str
attributes: Attributes
class SongLyrics(BaseModel):
data: List[Datum]

63
src/models/tracks_meta.py Normal file
View File

@@ -0,0 +1,63 @@
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel
class Artwork(BaseModel):
width: int
url: str
height: int
textColor3: str
textColor2: str
textColor4: str
textColor1: str
bgColor: str
hasP3: bool
class PlayParams(BaseModel):
id: str
kind: str
class Preview(BaseModel):
url: str
class Attributes(BaseModel):
hasTimeSyncedLyrics: bool
albumName: str
genreNames: List[str]
trackNumber: int
releaseDate: str
durationInMillis: int
isVocalAttenuationAllowed: bool
isMasteredForItunes: bool
isrc: str
artwork: Artwork
composerName: Optional[str] = None
audioLocale: str
url: str
playParams: PlayParams
discNumber: int
hasCredits: bool
isAppleDigitalMaster: bool
hasLyrics: bool
audioTraits: List[str]
name: str
previews: List[Preview]
artistName: str
class Datum(BaseModel):
id: str
type: str
href: str
attributes: Attributes
class TracksMeta(BaseModel):
next: Optional[str] = None
data: List[Datum]

165
src/mp4.py Normal file
View File

@@ -0,0 +1,165 @@
import subprocess
import uuid
from io import BytesIO
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Tuple
import m3u8
import regex
from bs4 import BeautifulSoup
from src.metadata import SongMetadata
from src.types import *
from src.utils import find_best_codec
async def extract_media(m3u8_url: str, codec: str) -> Tuple[str, list[str], str]:
parsed_m3u8 = m3u8.load(m3u8_url)
specifyPlaylist = find_best_codec(parsed_m3u8, codec)
selected_codec = specifyPlaylist.media[0].group_id
if not specifyPlaylist:
raise
stream = m3u8.load(specifyPlaylist.absolute_uri)
skds = [key.uri for key in stream.keys if regex.match('(skd?://[^"]*)', key.uri)]
keys = [prefetchKey]
key_suffix = CodecKeySuffix.KeySuffixDefault
match codec:
case Codec.ALAC:
key_suffix = CodecKeySuffix.KeySuffixAlac
case Codec.EC3:
key_suffix = CodecKeySuffix.KeySuffixAtmos
case Codec.AAC:
key_suffix = CodecKeySuffix.KeySuffixAAC
case Codec.AAC_BINAURAL:
key_suffix = CodecKeySuffix.KeySuffixAACBinaural
case Codec.AAC_DOWNMIX:
key_suffix = CodecKeySuffix.KeySuffixAACDownmix
for key in skds:
if key.endswith(key_suffix) or key.endswith(CodecKeySuffix.KeySuffixDefault):
keys.append(key)
return stream.segment_map[0].absolute_uri, keys, selected_codec
def extract_song(raw_song: bytes, codec: str) -> SongInfo:
tmp_dir = TemporaryDirectory()
mp4_name = uuid.uuid4().hex
raw_mp4 = Path(tmp_dir.name) / Path(f"{mp4_name}.mp4")
with open(raw_mp4.absolute(), "wb") as f:
f.write(raw_song)
nhml_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.nhml')).absolute()
media_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.media')).absolute()
subprocess.run(f"gpac -i {raw_mp4.absolute()} nhmlw:pckp=true -o {nhml_name}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
xml_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.xml')).absolute()
subprocess.run(f"mp4box -diso {raw_mp4.absolute()} -out {xml_name}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
decoder_params = None
with open(xml_name, "r") as f:
info_xml = BeautifulSoup(f.read(), "xml")
with open(nhml_name, "r") as f:
raw_nhml = f.read()
nhml = BeautifulSoup(raw_nhml, "xml")
with open(media_name, "rb") as f:
media = BytesIO(f.read())
if codec == Codec.ALAC:
alac_atom_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.atom')).absolute()
subprocess.run(f"mp4extract moov/trak/mdia/minf/stbl/stsd/enca[0]/alac {raw_mp4.absolute()} {alac_atom_name}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
with open(alac_atom_name, "rb") as f:
decoder_params = f.read()
samples = []
moofs = info_xml.find_all("MovieFragmentBox")
nhnt_sample_number = 0
nhnt_samples = {}
for sample in nhml.find_all("NHNTSample"):
nhnt_samples.update({int(sample.get("number")): sample})
for i, moof in enumerate(moofs):
tfhd = moof.TrackFragmentBox.TrackFragmentHeaderBox
index = 0 if not tfhd.get("SampleDescriptionIndex") else int(tfhd.get("SampleDescriptionIndex")) - 1
truns = moof.TrackFragmentBox.find_all("TrackRunBox")
for trun in truns:
for sample_number in range(int(trun.get("SampleCount"))):
nhnt_sample_number += 1
nhnt_sample = nhnt_samples[nhnt_sample_number]
sample_data = media.read(int(nhnt_sample.get("dataLength")))
duration = int(nhnt_sample.get("duration"))
samples.append(SampleInfo(descIndex=index, data=sample_data, duration=int(duration)))
tmp_dir.cleanup()
return SongInfo(codec=codec, raw=raw_song, samples=samples, nhml=raw_nhml, decoderParams=decoder_params)
def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent: bool) -> bytes:
tmp_dir = TemporaryDirectory()
name = uuid.uuid4().hex
media = Path(tmp_dir.name) / Path(name).with_suffix(".media")
with open(media.absolute(), "wb") as f:
f.write(decrypted_media)
if song_info.codec == Codec.EC3 and not atmos_convent:
song_name = Path(tmp_dir.name) / Path(name).with_suffix(".ec3")
else:
song_name = Path(tmp_dir.name) / Path(name).with_suffix(".m4a")
match song_info.codec:
case Codec.ALAC:
nhml_name = Path(tmp_dir.name) / Path(f"{name}.nhml")
with open(nhml_name.absolute(), "w", encoding="utf-8") as f:
nhml_xml = BeautifulSoup(song_info.nhml, features="xml")
nhml_xml.NHNTStream["baseMediaFile"] = media.name
f.write(str(nhml_xml))
subprocess.run(f"gpac -i {nhml_name.absolute()} nhmlr -o {song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
alac_params_atom_name = Path(tmp_dir.name) / Path(f"{name}.atom")
with open(alac_params_atom_name.absolute(), "wb") as f:
f.write(song_info.decoderParams)
final_m4a_name = Path(tmp_dir.name) / Path(f"{name}_final.m4a")
subprocess.run(
f"mp4edit --insert moov/trak/mdia/minf/stbl/stsd/alac:{alac_params_atom_name.absolute()} {song_name.absolute()} {final_m4a_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
song_name = final_m4a_name
case Codec.EC3:
if not atmos_convent:
with open(song_name.absolute(), "wb") as f:
f.write(decrypted_media)
subprocess.run(f"gpac -i {media.absolute()} -o {song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
case Codec.AAC_BINAURAL | Codec.AAC_DOWNMIX | Codec.AAC:
nhml_name = Path(tmp_dir.name) / Path(f"{name}.nhml")
with open(nhml_name.absolute(), "w", encoding="utf-8") as f:
nhml_xml = BeautifulSoup(song_info.nhml, features="xml")
nhml_xml.NHNTStream["baseMediaFile"] = media.name
del nhml_xml.NHNTStream["streamType"]
del nhml_xml.NHNTStream["objectTypeIndication"]
del nhml_xml.NHNTStream["specificInfoFile"]
nhml_xml.NHNTStream["mediaType"] = "soun"
nhml_xml.NHNTStream["mediaSubType"] = "mp4a"
f.write(str(nhml_xml))
subprocess.run(f"gpac -i {nhml_name.absolute()} nhmlr -o {song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
with open(song_name.absolute(), "rb") as f:
final_song = f.read()
tmp_dir.cleanup()
return final_song
def write_metadata(song: bytes, metadata: SongMetadata, embed_metadata: list[str], cover_format: str) -> bytes:
tmp_dir = TemporaryDirectory()
name = uuid.uuid4().hex
song_name = Path(tmp_dir.name) / Path(f"{name}.m4a")
with open(song_name.absolute(), "wb") as f:
f.write(song)
absolute_cover_path = ""
if "cover" in embed_metadata:
cover_path = Path(tmp_dir.name) / Path(f"cover.{cover_format}")
absolute_cover_path = cover_path.absolute()
with open(cover_path.absolute(), "wb") as f:
f.write(metadata.cover)
subprocess.run(["mp4box", "-time", "0", "-mtime", "0", "-keep-utc", "-name", f"1={metadata.title}", "-itags",
":".join(["tool=\"\"", f"cover={absolute_cover_path}", metadata.to_itags_params(embed_metadata, cover_format)]),
song_name.absolute()], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
with open(song_name.absolute(), "rb") as f:
embed_song = f.read()
tmp_dir.cleanup()
return embed_song

61
src/rip.py Normal file
View File

@@ -0,0 +1,61 @@
import asyncio
from loguru import logger
from src.api import get_info_from_adam, get_song_lyrics, get_meta, download_song
from src.config import Config, Device
from src.decrypt import decrypt
from src.metadata import SongMetadata
from src.mp4 import extract_media, extract_song, encapsulate, write_metadata
from src.save import save
from src.types import GlobalAuthParams, Codec
from src.url import Song, Album, URLType
from src.utils import check_song_exists
@logger.catch
async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device,
force_save: bool = False):
logger.debug(f"Task of song id {song.id} was created")
token = auth_params.anonymousAccessToken
song_data = await get_info_from_adam(song.id, token, song.storefront)
song_metadata = SongMetadata.parse_from_song_data(song_data)
logger.info(f"Ripping song: {song_metadata.artist} - {song_metadata.title}")
if not force_save and check_song_exists(song_metadata, config.download, codec):
logger.info(f"Song: {song_metadata.artist} - {song_metadata.title} already exists")
return
await song_metadata.get_cover(config.download.coverFormat)
if song_data.attributes.hasTimeSyncedLyrics:
lyrics = await get_song_lyrics(song.id, song.storefront, auth_params.accountAccessToken,
auth_params.dsid, auth_params.accountToken)
song_metadata.lyrics = lyrics
song_uri, keys, selected_codec = await extract_media(song_data.attributes.extendedAssetUrls.enhancedHls, codec)
logger.info(f"Selected codec: {selected_codec} for song: {song_metadata.artist} - {song_metadata.title}")
logger.info(f"Downloading song: {song_metadata.artist} - {song_metadata.title}")
raw_song = await download_song(song_uri)
song_info = extract_song(raw_song, codec)
decrypted_song = await decrypt(song_info, keys, song_data, device)
song = encapsulate(song_info, decrypted_song, config.download.atmosConventToM4a)
if codec != Codec.EC3 or (codec == Codec.EC3 and config.download.atmosConventToM4a):
song = write_metadata(song, song_metadata, config.metadata.embedMetadata, config.download.coverFormat)
save(song, codec, song_metadata, config.download)
logger.info(f"Song {song_metadata.artist} - {song_metadata.title} saved!")
async def rip_album(album: Album, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device,
force_save: bool = False):
album_info = await get_meta(album.id, auth_params.anonymousAccessToken, album.storefront)
logger.info(f"Ripping Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name}")
async with asyncio.TaskGroup() as tg:
for track in album_info.data[0].relationships.tracks.data:
song = Song(id=track.id, storefront=album.storefront, url="", type=URLType.Song)
tg.create_task(rip_song(song, auth_params, codec, config, device, force_save))
logger.info(f"Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name} finished ripping")
async def rip_playlist():
pass
async def rip_artist():
pass

29
src/save.py Normal file
View File

@@ -0,0 +1,29 @@
import os
from pathlib import Path
from src.config import Download
from src.metadata import SongMetadata
from src.types import Codec
from src.utils import ttml_convent_to_lrc, get_valid_filename
def save(song: bytes, codec: str, metadata: SongMetadata, config: Download):
song_name = get_valid_filename(config.songNameFormat.format(**metadata.model_dump()))
dir_path = Path(config.dirPathFormat.format(**metadata.model_dump()))
if not dir_path.exists() or not dir_path.is_dir():
os.makedirs(dir_path.absolute())
if codec == Codec.EC3 and not config.atmosConventToM4a:
song_path = dir_path / Path(song_name).with_suffix(".ec3")
else:
song_path = dir_path / Path(song_name).with_suffix(".m4a")
with open(song_path.absolute(), "wb") as f:
f.write(song)
if config.saveCover:
cover_path = dir_path / Path(f"cover.{config.coverFormat}")
with open(cover_path.absolute(), "wb") as f:
f.write(metadata.cover)
if config.saveLyrics and metadata.lyrics:
lrc_path = dir_path / Path(song_name).with_suffix(".lrc")
with open(lrc_path.absolute(), "w", encoding="utf-8") as f:
f.write(ttml_convent_to_lrc(metadata.lyrics))
return song_path.absolute()

68
src/types.py Normal file
View File

@@ -0,0 +1,68 @@
from typing import Optional
from pydantic import BaseModel
defaultId = "0"
prefetchKey = "skd://itunes.apple.com/P000000000/s1/e1"
class SampleInfo(BaseModel):
data: bytes
duration: int
descIndex: int
class SongInfo(BaseModel):
codec: str
raw: bytes
samples: list[SampleInfo]
nhml: str
decoderParams: Optional[bytes] = None
class Codec:
ALAC = "alac"
EC3 = "ec3"
AAC_BINAURAL = "aac-binaural"
AAC_DOWNMIX = "aac-downmix"
AAC = "aac"
class CodecKeySuffix:
KeySuffixAtmos = "c24"
KeySuffixAlac = "c23"
KeySuffixAAC = "c22"
KeySuffixAACDownmix = "c24"
KeySuffixAACBinaural = "c24"
KeySuffixDefault = "c6"
class CodecRegex:
RegexCodecAtmos = "audio-atmos-\\d{4}$"
RegexCodecAlac = "audio-alac-stereo-\\d{5}-\\d{2}$"
RegexCodecBinaural = "audio-stereo-\\d{3}-binaural$"
RegexCodecDownmix = "audio-stereo-\\d{3}-downmix$"
RegexCodecAAC = "audio-stereo-\\d{3}$"
@classmethod
def get_pattern_by_codec(cls, codec: str):
codec_pattern_mapping = {Codec.ALAC: cls.RegexCodecAlac, Codec.EC3: cls.RegexCodecAtmos,
Codec.AAC_DOWNMIX: cls.RegexCodecDownmix, Codec.AAC_BINAURAL: cls.RegexCodecBinaural,
Codec.AAC: cls.RegexCodecAAC}
return codec_pattern_mapping.get(codec)
class AuthParams(BaseModel):
dsid: str
accountToken: str
accountAccessToken: str
storefront: str
class GlobalAuthParams(AuthParams):
anonymousAccessToken: str
@classmethod
def from_auth_params_and_token(cls, auth_params: AuthParams, token: str):
return cls(dsid=auth_params.dsid, accountToken=auth_params.accountToken, anonymousAccessToken=token,
accountAccessToken=auth_params.accountAccessToken, storefront=auth_params.storefront)

62
src/url.py Normal file
View File

@@ -0,0 +1,62 @@
from urllib.parse import urlparse, parse_qs
from pydantic import BaseModel
class URLType:
Song = "song"
Album = "album"
Playlist = "playlist"
Artist = "artist"
class AppleMusicURL(BaseModel):
url: str
storefront: str
type: str
id: str
@classmethod
def parse_url(cls, url: str):
parsed_url = urlparse(url)
paths = parsed_url.path.split("/")
storefront = paths[1]
url_type = paths[2]
match url_type:
case URLType.Song:
url_id = paths[4]
return Song(url=url, storefront=storefront, id=url_id, type=URLType.Song)
case URLType.Album:
if not parsed_url.query:
url_id = paths[4]
return Album(url=url, storefront=storefront, id=url_id, type=URLType.Album)
else:
url_query = parse_qs(parsed_url.query)
if url_query.get("i"):
url_id = url_query.get("i")[0]
return Song(url=url, storefront=storefront, id=url_id, type=URLType.Song)
else:
url_id = paths[4]
return Album(url=url, storefront=storefront, id=url_id, type=URLType.Album)
case URLType.Artist:
url_id = paths[4]
return Artist(url=url, storefront=storefront, id=url_id, type=URLType.Artist)
case URLType.Playlist:
url_id = paths[4]
return Playlist(url=url, storefront=storefront, id=url_id, type=URLType.Playlist)
class Song(AppleMusicURL):
...
class Album(AppleMusicURL):
...
class Playlist(AppleMusicURL):
...
class Artist(AppleMusicURL):
...

117
src/utils.py Normal file
View File

@@ -0,0 +1,117 @@
import asyncio
import time
from itertools import islice
from pathlib import Path
import m3u8
import regex
from bs4 import BeautifulSoup
from src.config import Download
from src.exceptions import NotTimeSyncedLyricsException
from src.types import *
def check_url(url):
pattern = regex.compile(
r'^(?:https:\/\/(?:beta\.music|music)\.apple\.com\/(\w{2})(?:\/album|\/album\/.+))\/(?:id)?(\d[^\D]+)(?:$|\?)')
result = regex.findall(pattern, url)
return result[0][0], result[0][1]
def check_playlist_url(url):
pattern = regex.compile(
r'^(?:https:\/\/(?:beta\.music|music)\.apple\.com\/(\w{2})(?:\/playlist|\/playlist\/.+))\/(?:id)?(pl\.[\w-]+)(?:$|\?)')
result = regex.findall(pattern, url)
return result[0][0], result[0][1]
def byte_length(i):
return (i.bit_length() + 7) // 8
def find_best_codec(parsed_m3u8: m3u8.M3U8, codec: str) -> Optional[m3u8.Playlist]:
available_medias = [playlist for playlist in parsed_m3u8.playlists
if regex.match(CodecRegex.get_pattern_by_codec(codec), playlist.stream_info.audio)]
if not available_medias:
return None
available_medias.sort(key=lambda x: x.stream_info.average_bandwidth, reverse=True)
return available_medias[0]
def chunk(it, size):
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
def timeit(func):
async def process(func, *args, **params):
if asyncio.iscoroutinefunction(func):
print('this function is a coroutine: {}'.format(func.__name__))
return await func(*args, **params)
else:
print('this is not a coroutine')
return func(*args, **params)
async def helper(*args, **params):
print('{}.time'.format(func.__name__))
start = time.time()
result = await process(func, *args, **params)
# Test normal function route...
# result = await process(lambda *a, **p: print(*a, **p), *args, **params)
print('>>>', time.time() - start)
return result
return helper
def get_digit_from_string(text: str) -> int:
return int(''.join(filter(str.isdigit, text)))
def ttml_convent_to_lrc(ttml: str) -> str:
b = BeautifulSoup(ttml, features="xml")
lrc_lines = []
for item in b.tt.body.children:
for lyric in item.children:
h, m, s, ms = 0, 0, 0, 0
lyric_time: str = lyric.get("begin")
if not lyric_time:
raise NotTimeSyncedLyricsException
match lyric_time.count(":"):
case 0:
split_time = lyric_time.split(".")
s, ms = get_digit_from_string(split_time[0]), get_digit_from_string(split_time[1])
case 1:
split_time = lyric_time.split(":")
s_ms = split_time[-1]
del split_time[-1]
split_time.extend(s_ms.split("."))
m, s, ms = (get_digit_from_string(split_time[0]), get_digit_from_string(split_time[1]),
get_digit_from_string(split_time[2]))
case 2:
split_time = lyric_time.split(":")
s_ms = split_time[-1]
del split_time[-1]
split_time.extend(s_ms.split("."))
h, m, s, ms = (get_digit_from_string(split_time[0]), get_digit_from_string(split_time[1]),
get_digit_from_string(split_time[2]), get_digit_from_string(split_time[3]))
lrc_lines.append(
f"[{str(m + h * 60).rjust(2, '0')}:{str(s).rjust(2, '0')}.{str(int(ms / 10)).rjust(2, '0')}]{lyric.text}")
return "\n".join(lrc_lines)
def check_song_exists(metadata, config: Download, codec: str):
song_name = get_valid_filename(config.songNameFormat.format(**metadata.model_dump()))
dir_path = Path(config.dirPathFormat.format(**metadata.model_dump()))
if not config.atmosConventToM4a and codec == Codec.EC3:
return (Path(dir_path) / Path(song_name).with_suffix(".ec3")).exists()
else:
return (Path(dir_path) / Path(song_name).with_suffix(".m4a")).exists()
def get_valid_filename(filename: str):
return "".join(i for i in filename if i not in "\/:*?<>|")