mirror of
https://github.com/zhaarey/AppleMusicDecrypt.git
synced 2025-10-23 15:11:06 +00:00
feat: force mode
This commit is contained in:
@@ -22,6 +22,8 @@ suMethod = "su -c"
|
||||
[m3u8Api]
|
||||
# Use zhaarey's m3u8 api to get higher song m3u8.
|
||||
enable = false
|
||||
# Only use m3u8 api to download song
|
||||
force = false
|
||||
endpoint = ""
|
||||
|
||||
[download]
|
||||
|
||||
11
src/api.py
11
src/api.py
@@ -30,14 +30,23 @@ def init_client_and_lock(proxy: str, parallel_num: int):
|
||||
request_lock = asyncio.Semaphore(64)
|
||||
|
||||
|
||||
async def get_m3u8_from_api(endpoint: str, song_id: str) -> str:
|
||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
||||
stop=stop_after_attempt(5),
|
||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||
async def get_m3u8_from_api(endpoint: str, song_id: str, wait_and_retry: bool = False, recursion_times: int = 0) -> str:
|
||||
async with request_lock:
|
||||
resp = (await client.get(endpoint, params={"songid": song_id})).text
|
||||
if resp == "no_found":
|
||||
if wait_and_retry and recursion_times <= 5:
|
||||
await asyncio.sleep(5)
|
||||
return await get_m3u8_from_api(endpoint, song_id, recursion_times=recursion_times+1)
|
||||
return ""
|
||||
return resp
|
||||
|
||||
|
||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
||||
stop=stop_after_attempt(5),
|
||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||
async def upload_m3u8_to_api(endpoint: str, m3u8_url: str, song_info: Datum):
|
||||
async with request_lock:
|
||||
await client.post(endpoint, json={
|
||||
|
||||
@@ -17,6 +17,7 @@ class Device(BaseModel):
|
||||
|
||||
class M3U8Api(BaseModel):
|
||||
enable: bool
|
||||
force: bool
|
||||
endpoint: str
|
||||
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ async def get_available_codecs(m3u8_url: str) -> Tuple[list[str], list[str]]:
|
||||
|
||||
|
||||
async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata,
|
||||
codec_priority: list[str], alternative_codec: bool = False) -> Tuple[str, list[str]]:
|
||||
codec_priority: list[str], alternative_codec: bool = False) -> Tuple[str, list[str], str]:
|
||||
parsed_m3u8 = m3u8.loads(await download_m3u8(m3u8_url), uri=m3u8_url)
|
||||
specifyPlaylist = find_best_codec(parsed_m3u8, codec)
|
||||
if not specifyPlaylist and alternative_codec:
|
||||
@@ -58,7 +58,7 @@ async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata,
|
||||
for key in skds:
|
||||
if key.endswith(key_suffix) or key.endswith(CodecKeySuffix.KeySuffixDefault):
|
||||
keys.append(key)
|
||||
return stream.segment_map[0].absolute_uri, keys
|
||||
return stream.segment_map[0].absolute_uri, keys, selected_codec
|
||||
|
||||
|
||||
def extract_song(raw_song: bytes, codec: str) -> SongInfo:
|
||||
|
||||
18
src/rip.py
18
src/rip.py
@@ -14,7 +14,7 @@ from src.mp4 import extract_media, extract_song, encapsulate, write_metadata
|
||||
from src.save import save
|
||||
from src.types import GlobalAuthParams, Codec
|
||||
from src.url import Song, Album, URLType, Artist, Playlist
|
||||
from src.utils import check_song_exists, if_raw_atmos, playlist_write_song_index
|
||||
from src.utils import check_song_exists, if_raw_atmos, playlist_write_song_index, get_codec_from_codec_id
|
||||
|
||||
|
||||
@logger.catch
|
||||
@@ -35,18 +35,22 @@ async def rip_song(song: Song, auth_params: GlobalAuthParams, codec: str, config
|
||||
lyrics = await get_song_lyrics(song.id, song.storefront, auth_params.accountAccessToken,
|
||||
auth_params.dsid, auth_params.accountToken, config.region.language)
|
||||
song_metadata.lyrics = lyrics
|
||||
if config.m3u8Api.enable and codec == Codec.ALAC:
|
||||
m3u8_url = await get_m3u8_from_api(config.m3u8Api.endpoint, song.id)
|
||||
if config.m3u8Api.enable and codec == Codec.ALAC and not specified_m3u8:
|
||||
m3u8_url = await get_m3u8_from_api(config.m3u8Api.endpoint, song.id, config.m3u8Api.enable)
|
||||
if m3u8_url:
|
||||
specified_m3u8 = m3u8_url
|
||||
logger.info(f"Use m3u8 from API for song: {song_metadata.artist} - {song_metadata.title}")
|
||||
elif not m3u8_url and config.m3u8Api.force:
|
||||
logger.error(f"Failed to get m3u8 from API for song: {song_metadata.artist} - {song_metadata.title}")
|
||||
return
|
||||
if specified_m3u8:
|
||||
song_uri, keys = await extract_media(specified_m3u8, codec, song_metadata,
|
||||
config.download.codecPriority, config.download.codecAlternative)
|
||||
song_uri, keys, codec_id = await extract_media(specified_m3u8, codec, song_metadata,
|
||||
config.download.codecPriority, config.download.codecAlternative)
|
||||
else:
|
||||
song_uri, keys = await extract_media(song_data.attributes.extendedAssetUrls.enhancedHls, codec, song_metadata,
|
||||
config.download.codecPriority, config.download.codecAlternative)
|
||||
song_uri, keys, codec_id = await extract_media(song_data.attributes.extendedAssetUrls.enhancedHls, codec, song_metadata,
|
||||
config.download.codecPriority, config.download.codecAlternative)
|
||||
logger.info(f"Downloading song: {song_metadata.artist} - {song_metadata.title}")
|
||||
codec = get_codec_from_codec_id(codec_id)
|
||||
raw_song = await download_song(song_uri)
|
||||
song_info = extract_song(raw_song, codec)
|
||||
decrypted_song = await decrypt(song_info, keys, song_data, device)
|
||||
|
||||
Reference in New Issue
Block a user