fix: format code

This commit is contained in:
WorldObservationLog
2024-05-04 23:59:52 +08:00
parent efe1f4efd5
commit 3b00e3175a
18 changed files with 1123 additions and 1080 deletions

View File

@@ -100,7 +100,8 @@ class Device:
def _get_dsid(self) -> str:
logger.debug("getting dsid")
dsid = self._execute_command(
"sqlite3 /data/data/com.apple.android.music/files/mpl_db/cookies.sqlitedb \"select value from cookies where name='X-Dsid';\"", True)
"sqlite3 /data/data/com.apple.android.music/files/mpl_db/cookies.sqlitedb \"select value from cookies where name='X-Dsid';\"",
True)
if not dsid:
raise FailedGetAuthParamException
return dsid.strip()
@@ -108,7 +109,8 @@ class Device:
def _get_account_token(self, dsid: str) -> str:
logger.debug("getting account token")
account_token = self._execute_command(
f"sqlite3 /data/data/com.apple.android.music/files/mpl_db/cookies.sqlitedb \"select value from cookies where name='mz_at_ssl-{dsid}';\"", True)
f"sqlite3 /data/data/com.apple.android.music/files/mpl_db/cookies.sqlitedb \"select value from cookies where name='mz_at_ssl-{dsid}';\"",
True)
if not account_token:
raise FailedGetAuthParamException
return account_token.strip()
@@ -124,7 +126,8 @@ class Device:
def _get_storefront(self) -> str | None:
logger.debug("getting storefront")
storefront_id = self._execute_command(
"sqlite3 /data/data/com.apple.android.music/files/mpl_db/accounts.sqlitedb \"select storeFront from account;\"", True)
"sqlite3 /data/data/com.apple.android.music/files/mpl_db/accounts.sqlitedb \"select storeFront from account;\"",
True)
if not storefront_id:
raise FailedGetAuthParamException
with open("assets/storefront_ids.json") as f:

View File

@@ -5,9 +5,8 @@ from ssl import SSLError
import httpcore
import httpx
import regex
from tenacity import retry, retry_if_exception_type, stop_after_attempt, before_sleep_log
from loguru import logger
from tenacity import retry, retry_if_exception_type, stop_after_attempt, before_sleep_log
from src.models import *
@@ -18,7 +17,8 @@ user_agent_itunes = "iTunes/12.11.3 (Windows; Microsoft Windows 10 x64 Professio
user_agent_app = "Music/5.7 Android/10 model/Pixel6GR1YH build/1234 (dt:66)"
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)), stop=stop_after_attempt(5),
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_token():
req = await client.get("https://beta.music.apple.com")
@@ -28,14 +28,16 @@ async def get_token():
return token
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)), stop=stop_after_attempt(5),
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def download_song(url: str) -> bytes:
async with lock:
return (await client.get(url)).content
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)), stop=stop_after_attempt(5),
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_meta(album_id: str, token: str, storefront: str):
if "pl." in album_id:
@@ -69,7 +71,8 @@ async def get_meta(album_id: str, token: str, storefront: str):
return result
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)), stop=stop_after_attempt(5),
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_cover(url: str, cover_format: str):
formatted_url = regex.sub('bb.jpg', f'bb.{cover_format}', url)
@@ -78,7 +81,8 @@ async def get_cover(url: str, cover_format: str):
return req.content
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)), stop=stop_after_attempt(5),
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_info_from_adam(adam_id: str, token: str, storefront: str):
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/songs/{adam_id}",
@@ -92,7 +96,8 @@ async def get_info_from_adam(adam_id: str, token: str, storefront: str):
return None
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)), stop=stop_after_attempt(5),
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARN))
async def get_song_lyrics(song_id: str, storefront: str, token: str, dsid: str, account_token: str) -> str:
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/songs/{song_id}/lyrics",

View File

@@ -35,7 +35,8 @@ class NewInteractiveShell:
download_parser = subparser.add_parser("download")
download_parser.add_argument("url", type=str)
download_parser.add_argument("-c", "--codec",
choices=["alac", "ec3", "aac", "aac-binaural", "aac-downmix", "ac3"], default="alac")
choices=["alac", "ec3", "aac", "aac-binaural", "aac-downmix", "ac3"],
default="alac")
download_parser.add_argument("-f", "--force", type=bool, default=False)
subparser.add_parser("exit")
@@ -79,12 +80,14 @@ class NewInteractiveShell:
available_device: Device = random.choice(devices)
else:
available_device: Device = random.choice(available_devices)
global_auth_param = GlobalAuthParams.from_auth_params_and_token(available_device.get_auth_params(), self.anonymous_access_token)
global_auth_param = GlobalAuthParams.from_auth_params_and_token(available_device.get_auth_params(),
self.anonymous_access_token)
match url.type:
case URLType.Song:
self.loop.create_task(rip_song(url, global_auth_param, codec, self.config, available_device, force_download))
task = self.loop.create_task(
rip_song(url, global_auth_param, codec, self.config, available_device, force_download))
case URLType.Album:
self.loop.create_task(rip_album(url, global_auth_param, codec, self.config, available_device))
task = self.loop.create_task(rip_album(url, global_auth_param, codec, self.config, available_device))
async def handle_command(self):
session = PromptSession("> ")

View File

@@ -1,10 +1,6 @@
import asyncio
import logging
import sys
from prompt_toolkit.shortcuts import ProgressBar
from loguru import logger
from tenacity import retry, retry_if_exception_type, stop_after_attempt, before_sleep_log
from src.adb import Device
from src.exceptions import DecryptException

View File

@@ -59,4 +59,4 @@ class SongMetadata(BaseModel):
self.lyrics = lyrics
async def get_cover(self, cover_format: str):
self.cover = await get_cover(self.cover_url, cover_format)
self.cover = await get_cover(self.cover_url, cover_format)

View File

@@ -1,5 +1,5 @@
from src.models.album_meta import AlbumMeta
from src.models.playlist_meta import PlaylistMeta
from src.models.tracks_meta import TracksMeta
from src.models.song_data import SongData
from src.models.song_lyrics import SongLyrics
from src.models.tracks_meta import TracksMeta

View File

@@ -12,7 +12,14 @@ from bs4 import BeautifulSoup
from src.exceptions import CodecNotFoundException
from src.metadata import SongMetadata
from src.types import *
from src.utils import find_best_codec
from src.utils import find_best_codec, get_codec_from_codec_id
async def get_available_codecs(m3u8_url: str) -> Tuple[list[str], list[str]]:
parsed_m3u8 = m3u8.load(m3u8_url)
codec_ids = [playlist.stream_info.audio for playlist in parsed_m3u8.playlists]
codecs = [get_codec_from_codec_id(codec_id) for codec_id in codec_ids]
return codecs, codec_ids
async def extract_media(m3u8_url: str, codec: str) -> Tuple[str, list[str], str]:
@@ -161,7 +168,8 @@ def write_metadata(song: bytes, metadata: SongMetadata, embed_metadata: list[str
with open(cover_path.absolute(), "wb") as f:
f.write(metadata.cover)
subprocess.run(["mp4box", "-time", "0", "-mtime", "0", "-keep-utc", "-name", f"1={metadata.title}", "-itags",
":".join(["tool=\"\"", f"cover={absolute_cover_path}", metadata.to_itags_params(embed_metadata, cover_format)]),
":".join(["tool=\"\"", f"cover={absolute_cover_path}",
metadata.to_itags_params(embed_metadata, cover_format)]),
song_name.absolute()], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
with open(song_name.absolute(), "rb") as f:
embed_song = f.read()

View File

@@ -10,10 +10,8 @@ from src.mp4 import extract_media, extract_song, encapsulate, write_metadata
from src.save import save
from src.types import GlobalAuthParams, Codec
from src.url import Song, Album, URLType
from src.utils import check_song_exists
@logger.catch
async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config: Config, device: Device,
force_save: bool = False):
logger.debug(f"Task of song id {song.id} was created")
@@ -52,7 +50,8 @@ async def rip_album(album: Album, auth_params: GlobalAuthParams, codec: str, con
for track in album_info.data[0].relationships.tracks.data:
song = Song(id=track.id, storefront=album.storefront, url="", type=URLType.Song)
tg.create_task(rip_song(song, auth_params, codec, config, device, force_save))
logger.info(f"Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name} finished ripping")
logger.info(
f"Album: {album_info.data[0].attributes.artistName} - {album_info.data[0].attributes.name} finished ripping")
async def rip_playlist():

View File

@@ -28,4 +28,4 @@ def save(song: bytes, codec: str, metadata: SongMetadata, config: Download):
lrc_path = dir_path / Path(song_name).with_suffix(".lrc")
with open(lrc_path.absolute(), "w", encoding="utf-8") as f:
f.write(ttml_convent_to_lrc(metadata.lyrics))
return song_path.absolute()
return song_path.absolute()

View File

@@ -67,4 +67,4 @@ class GlobalAuthParams(AuthParams):
@classmethod
def from_auth_params_and_token(cls, auth_params: AuthParams, token: str):
return cls(dsid=auth_params.dsid, accountToken=auth_params.accountToken, anonymousAccessToken=token,
accountAccessToken=auth_params.accountAccessToken, storefront=auth_params.storefront)
accountAccessToken=auth_params.accountAccessToken, storefront=auth_params.storefront)

View File

@@ -9,7 +9,6 @@ from bs4 import BeautifulSoup
from src.config import Download
from src.exceptions import NotTimeSyncedLyricsException
from src.types import *
@@ -117,3 +116,11 @@ def check_song_exists(metadata, config: Download, codec: str):
def get_valid_filename(filename: str):
return "".join(i for i in filename if i not in r"\/:*?<>|")
def get_codec_from_codec_id(codec_id: str) -> str:
codecs = [Codec.AC3, Codec.EC3, Codec.AAC, Codec.ALAC, Codec.AAC_BINAURAL, Codec.AAC_DOWNMIX]
for codec in codecs:
if regex.match(CodecRegex.get_pattern_by_codec(codec), codec_id):
return codec
return ""