mirror of
https://github.com/zhaarey/AppleMusicDecrypt.git
synced 2025-10-23 15:11:06 +00:00
feat: request cache and request lock
This commit is contained in:
13
poetry.lock
generated
13
poetry.lock
generated
@@ -87,6 +87,17 @@ files = [
|
|||||||
[package.extras]
|
[package.extras]
|
||||||
tests = ["mypy (>=0.800)", "pytest", "pytest-asyncio"]
|
tests = ["mypy (>=0.800)", "pytest", "pytest-asyncio"]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-lru"
|
||||||
|
version = "2.0.4"
|
||||||
|
description = "Simple LRU cache for asyncio"
|
||||||
|
optional = false
|
||||||
|
python-versions = ">=3.8"
|
||||||
|
files = [
|
||||||
|
{file = "async-lru-2.0.4.tar.gz", hash = "sha256:b8a59a5df60805ff63220b2a0c5b5393da5521b113cd5465a44eb037d81a5627"},
|
||||||
|
{file = "async_lru-2.0.4-py3-none-any.whl", hash = "sha256:ff02944ce3c288c5be660c42dbcca0742b32c3b279d6dceda655190240b99224"},
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "attrs"
|
name = "attrs"
|
||||||
version = "23.2.0"
|
version = "23.2.0"
|
||||||
@@ -1758,4 +1769,4 @@ cffi = ["cffi (>=1.11)"]
|
|||||||
[metadata]
|
[metadata]
|
||||||
lock-version = "2.0"
|
lock-version = "2.0"
|
||||||
python-versions = "^3.11"
|
python-versions = "^3.11"
|
||||||
content-hash = "cde2194f4da1bd7402b4ffaed566937745f18e0758784876a62b1914faec9c00"
|
content-hash = "6eb629b08089983b830419b9b143f4ebcbb7e824ed4110cc3a4aba25e13e5df7"
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ frida = "^16.2.1"
|
|||||||
tenacity = "^8.2.3"
|
tenacity = "^8.2.3"
|
||||||
prompt-toolkit = "^3.0.43"
|
prompt-toolkit = "^3.0.43"
|
||||||
mitmproxy = "^10.3.0"
|
mitmproxy = "^10.3.0"
|
||||||
|
async-lru = "^2.0.4"
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["poetry-core"]
|
requires = ["poetry-core"]
|
||||||
|
|||||||
292
src/api.py
292
src/api.py
@@ -5,6 +5,7 @@ from ssl import SSLError
|
|||||||
import httpcore
|
import httpcore
|
||||||
import httpx
|
import httpx
|
||||||
import regex
|
import regex
|
||||||
|
from async_lru import alru_cache
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from tenacity import retry, retry_if_exception_type, stop_after_attempt, before_sleep_log
|
from tenacity import retry, retry_if_exception_type, stop_after_attempt, before_sleep_log
|
||||||
|
|
||||||
@@ -12,181 +13,226 @@ from src.models import *
|
|||||||
from src.models.song_data import Datum
|
from src.models.song_data import Datum
|
||||||
|
|
||||||
client: httpx.AsyncClient
|
client: httpx.AsyncClient
|
||||||
lock: asyncio.Semaphore
|
download_lock: asyncio.Semaphore
|
||||||
|
request_lock: asyncio.Semaphore
|
||||||
user_agent_browser = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
user_agent_browser = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
||||||
user_agent_itunes = "iTunes/12.11.3 (Windows; Microsoft Windows 10 x64 Professional Edition (Build 19041); x64) AppleWebKit/7611.1022.4001.1 (dt:2)"
|
user_agent_itunes = "iTunes/12.11.3 (Windows; Microsoft Windows 10 x64 Professional Edition (Build 19041); x64) AppleWebKit/7611.1022.4001.1 (dt:2)"
|
||||||
user_agent_app = "Music/5.7 Android/10 model/Pixel6GR1YH build/1234 (dt:66)"
|
user_agent_app = "Music/5.7 Android/10 model/Pixel6GR1YH build/1234 (dt:66)"
|
||||||
|
|
||||||
|
|
||||||
def init_client_and_lock(proxy: str, parallel_num: int):
|
def init_client_and_lock(proxy: str, parallel_num: int):
|
||||||
global client, lock
|
global client, download_lock, request_lock
|
||||||
if proxy:
|
if proxy:
|
||||||
client = httpx.AsyncClient(proxy=proxy)
|
client = httpx.AsyncClient(proxy=proxy)
|
||||||
else:
|
else:
|
||||||
client = httpx.AsyncClient()
|
client = httpx.AsyncClient()
|
||||||
lock = asyncio.Semaphore(parallel_num)
|
download_lock = asyncio.Semaphore(parallel_num)
|
||||||
|
request_lock = asyncio.Semaphore(64)
|
||||||
|
|
||||||
|
|
||||||
async def get_m3u8_from_api(endpoint: str, song_id: str) -> str:
|
async def get_m3u8_from_api(endpoint: str, song_id: str) -> str:
|
||||||
resp = (await client.get(endpoint, params={"songid": song_id})).text
|
async with request_lock:
|
||||||
if resp == "no_found":
|
resp = (await client.get(endpoint, params={"songid": song_id})).text
|
||||||
return ""
|
if resp == "no_found":
|
||||||
return resp
|
return ""
|
||||||
|
return resp
|
||||||
|
|
||||||
|
|
||||||
async def upload_m3u8_to_api(endpoint: str, m3u8_url: str, song_info: Datum):
|
async def upload_m3u8_to_api(endpoint: str, m3u8_url: str, song_info: Datum):
|
||||||
await client.post(endpoint, json={
|
async with request_lock:
|
||||||
"method": "add_m3u8",
|
await client.post(endpoint, json={
|
||||||
"params": {
|
"method": "add_m3u8",
|
||||||
"songid": song_info.id,
|
"params": {
|
||||||
"song_title": f"Disk {song_info.attributes.discNumber} Track {song_info.attributes.trackNumber} - {song_info.attributes.name}",
|
"songid": song_info.id,
|
||||||
"albumid": song_info.relationships.albums.data[0].id,
|
"song_title": f"Disk {song_info.attributes.discNumber} Track {song_info.attributes.trackNumber} - {song_info.attributes.name}",
|
||||||
"album_title": song_info.attributes.albumName,
|
"albumid": song_info.relationships.albums.data[0].id,
|
||||||
"m3u8": m3u8_url,
|
"album_title": song_info.attributes.albumName,
|
||||||
}
|
"m3u8": m3u8_url,
|
||||||
})
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
||||||
stop=stop_after_attempt(5),
|
stop=stop_after_attempt(5),
|
||||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||||
async def get_token():
|
async def get_token():
|
||||||
req = await client.get("https://beta.music.apple.com")
|
async with request_lock:
|
||||||
index_js_uri = regex.findall(r"/assets/index-legacy-[^/]+\.js", req.text)[0]
|
req = await client.get("https://beta.music.apple.com")
|
||||||
js_req = await client.get("https://beta.music.apple.com" + index_js_uri)
|
index_js_uri = regex.findall(r"/assets/index-legacy-[^/]+\.js", req.text)[0]
|
||||||
token = regex.search(r'eyJh([^"]*)', js_req.text)[0]
|
js_req = await client.get("https://beta.music.apple.com" + index_js_uri)
|
||||||
return token
|
token = regex.search(r'eyJh([^"]*)', js_req.text)[0]
|
||||||
|
return token
|
||||||
|
|
||||||
|
|
||||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
@alru_cache
|
||||||
|
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError,
|
||||||
|
httpcore.RemoteProtocolError)),
|
||||||
stop=stop_after_attempt(5),
|
stop=stop_after_attempt(5),
|
||||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||||
async def download_song(url: str) -> bytes:
|
async def download_song(url: str) -> bytes:
|
||||||
async with lock:
|
async with download_lock:
|
||||||
return (await client.get(url)).content
|
return (await client.get(url)).content
|
||||||
|
|
||||||
|
|
||||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
@alru_cache
|
||||||
stop=stop_after_attempt(5),
|
@retry(retry=retry_if_exception_type(
|
||||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
(httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError, httpcore.RemoteProtocolError)),
|
||||||
|
stop=stop_after_attempt(5),
|
||||||
|
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||||
async def get_album_info(album_id: str, token: str, storefront: str, lang: str):
|
async def get_album_info(album_id: str, token: str, storefront: str, lang: str):
|
||||||
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/albums/{album_id}",
|
async with request_lock:
|
||||||
params={"omit[resource]": "autos", "include": "tracks,artists,record-labels",
|
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/albums/{album_id}",
|
||||||
"include[songs]": "artists", "fields[artists]": "name",
|
params={"omit[resource]": "autos", "include": "tracks,artists,record-labels",
|
||||||
"fields[albums:albums]": "artistName,artwork,name,releaseDate,url",
|
"include[songs]": "artists", "fields[artists]": "name",
|
||||||
"fields[record-labels]": "name", "l": lang},
|
"fields[albums:albums]": "artistName,artwork,name,releaseDate,url",
|
||||||
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
|
"fields[record-labels]": "name", "l": lang},
|
||||||
"Origin": "https://music.apple.com"})
|
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
|
||||||
return AlbumMeta.model_validate(req.json())
|
"Origin": "https://music.apple.com"})
|
||||||
|
return AlbumMeta.model_validate(req.json())
|
||||||
|
|
||||||
|
|
||||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
@alru_cache
|
||||||
stop=stop_after_attempt(5),
|
@retry(retry=retry_if_exception_type(
|
||||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
(httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError, httpcore.RemoteProtocolError)),
|
||||||
|
stop=stop_after_attempt(5),
|
||||||
|
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||||
async def get_playlist_info_and_tracks(playlist_id: str, token: str, storefront: str, lang: str):
|
async def get_playlist_info_and_tracks(playlist_id: str, token: str, storefront: str, lang: str):
|
||||||
resp = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/playlists/{playlist_id}",
|
async with request_lock:
|
||||||
params={"l": lang},
|
resp = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/playlists/{playlist_id}",
|
||||||
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
|
params={"l": lang},
|
||||||
"Origin": "https://music.apple.com"})
|
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
|
||||||
playlist_info_obj = PlaylistInfo.parse_obj(resp.json())
|
"Origin": "https://music.apple.com"})
|
||||||
if playlist_info_obj.data[0].relationships.tracks.next:
|
playlist_info_obj = PlaylistInfo.parse_obj(resp.json())
|
||||||
all_tracks = await get_playlist_tracks(playlist_id, token, storefront, lang)
|
if playlist_info_obj.data[0].relationships.tracks.next:
|
||||||
playlist_info_obj.data[0].relationships.tracks.data = all_tracks
|
all_tracks = await get_playlist_tracks(playlist_id, token, storefront, lang)
|
||||||
return playlist_info_obj
|
playlist_info_obj.data[0].relationships.tracks.data = all_tracks
|
||||||
|
return playlist_info_obj
|
||||||
|
|
||||||
|
|
||||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
@alru_cache
|
||||||
stop=stop_after_attempt(5),
|
@retry(retry=retry_if_exception_type(
|
||||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
(httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError, httpcore.RemoteProtocolError)),
|
||||||
|
stop=stop_after_attempt(5),
|
||||||
|
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||||
async def get_playlist_tracks(playlist_id: str, token: str, storefront: str, lang: str, offset: int = 0):
|
async def get_playlist_tracks(playlist_id: str, token: str, storefront: str, lang: str, offset: int = 0):
|
||||||
resp = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/playlists/{playlist_id}/tracks",
|
async with request_lock:
|
||||||
params={"l": lang, "offset": offset},
|
resp = await client.get(
|
||||||
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
|
f"https://amp-api.music.apple.com/v1/catalog/{storefront}/playlists/{playlist_id}/tracks",
|
||||||
"Origin": "https://music.apple.com"})
|
params={"l": lang, "offset": offset},
|
||||||
playlist_tracks = PlaylistTracks.parse_obj(resp.json())
|
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
|
||||||
tracks = playlist_tracks.data
|
"Origin": "https://music.apple.com"})
|
||||||
if playlist_tracks.next:
|
playlist_tracks = PlaylistTracks.parse_obj(resp.json())
|
||||||
next_tracks = await get_playlist_info_and_tracks(playlist_id, token, storefront, lang, offset + 100)
|
tracks = playlist_tracks.data
|
||||||
tracks.extend(next_tracks)
|
if playlist_tracks.next:
|
||||||
return tracks
|
next_tracks = await get_playlist_info_and_tracks(playlist_id, token, storefront, lang, offset + 100)
|
||||||
|
tracks.extend(next_tracks)
|
||||||
|
return tracks
|
||||||
|
|
||||||
|
|
||||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
@alru_cache
|
||||||
stop=stop_after_attempt(5),
|
@retry(retry=retry_if_exception_type(
|
||||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
(httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError, httpcore.RemoteProtocolError)),
|
||||||
|
stop=stop_after_attempt(5),
|
||||||
|
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||||
async def get_cover(url: str, cover_format: str, cover_size: str):
|
async def get_cover(url: str, cover_format: str, cover_size: str):
|
||||||
formatted_url = regex.sub('bb.jpg', f'bb.{cover_format}', url)
|
async with request_lock:
|
||||||
req = await client.get(formatted_url.replace("{w}x{h}", cover_size),
|
formatted_url = regex.sub('bb.jpg', f'bb.{cover_format}', url)
|
||||||
headers={"User-Agent": user_agent_browser})
|
req = await client.get(formatted_url.replace("{w}x{h}", cover_size),
|
||||||
return req.content
|
headers={"User-Agent": user_agent_browser})
|
||||||
|
return req.content
|
||||||
|
|
||||||
|
|
||||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
@alru_cache
|
||||||
stop=stop_after_attempt(5),
|
@retry(retry=retry_if_exception_type(
|
||||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
(httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError, httpcore.RemoteProtocolError)),
|
||||||
|
stop=stop_after_attempt(5),
|
||||||
|
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||||
async def get_song_info(song_id: str, token: str, storefront: str, lang: str):
|
async def get_song_info(song_id: str, token: str, storefront: str, lang: str):
|
||||||
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/songs/{song_id}",
|
async with request_lock:
|
||||||
params={"extend": "extendedAssetUrls", "include": "albums", "l": lang},
|
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/songs/{song_id}",
|
||||||
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_itunes,
|
params={"extend": "extendedAssetUrls", "include": "albums", "l": lang},
|
||||||
"Origin": "https://music.apple.com"})
|
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_itunes,
|
||||||
song_data_obj = SongData.model_validate(req.json())
|
"Origin": "https://music.apple.com"})
|
||||||
for data in song_data_obj.data:
|
song_data_obj = SongData.model_validate(req.json())
|
||||||
if data.id == song_id:
|
for data in song_data_obj.data:
|
||||||
return data
|
if data.id == song_id:
|
||||||
return None
|
return data
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
@alru_cache
|
||||||
stop=stop_after_attempt(5),
|
@retry(retry=retry_if_exception_type(
|
||||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
(httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError, httpcore.RemoteProtocolError)),
|
||||||
|
stop=stop_after_attempt(5),
|
||||||
|
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||||
async def get_song_lyrics(song_id: str, storefront: str, token: str, dsid: str, account_token: str, lang: str) -> str:
|
async def get_song_lyrics(song_id: str, storefront: str, token: str, dsid: str, account_token: str, lang: str) -> str:
|
||||||
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/songs/{song_id}/lyrics",
|
async with request_lock:
|
||||||
params={"l": lang},
|
req = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/songs/{song_id}/lyrics",
|
||||||
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_app,
|
params={"l": lang},
|
||||||
"X-Dsid": dsid},
|
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_app,
|
||||||
cookies={f"mz_at_ssl-{dsid}": account_token})
|
"X-Dsid": dsid},
|
||||||
result = SongLyrics.model_validate(req.json())
|
cookies={f"mz_at_ssl-{dsid}": account_token})
|
||||||
return result.data[0].attributes.ttml
|
result = SongLyrics.model_validate(req.json())
|
||||||
|
return result.data[0].attributes.ttml
|
||||||
|
|
||||||
|
|
||||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
@alru_cache
|
||||||
stop=stop_after_attempt(5),
|
@retry(retry=retry_if_exception_type(
|
||||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
(httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError, httpcore.RemoteProtocolError)),
|
||||||
|
stop=stop_after_attempt(5),
|
||||||
|
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||||
async def get_albums_from_artist(artist_id: str, storefront: str, token: str, lang: str, offset: int = 0):
|
async def get_albums_from_artist(artist_id: str, storefront: str, token: str, lang: str, offset: int = 0):
|
||||||
resp = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/artists/{artist_id}/albums",
|
async with request_lock:
|
||||||
params={"l": lang, "offset": offset},
|
resp = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/artists/{artist_id}/albums",
|
||||||
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
|
params={"l": lang, "offset": offset},
|
||||||
"Origin": "https://music.apple.com"})
|
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
|
||||||
artist_album = ArtistAlbums.parse_obj(resp.json())
|
"Origin": "https://music.apple.com"})
|
||||||
albums = [album.attributes.url for album in artist_album.data]
|
artist_album = ArtistAlbums.parse_obj(resp.json())
|
||||||
if artist_album.next:
|
albums = [album.attributes.url for album in artist_album.data]
|
||||||
next_albums = await get_albums_from_artist(artist_id, storefront, token, lang, offset + 25)
|
if artist_album.next:
|
||||||
albums.extend(next_albums)
|
next_albums = await get_albums_from_artist(artist_id, storefront, token, lang, offset + 25)
|
||||||
return list(set(albums))
|
albums.extend(next_albums)
|
||||||
|
return list(set(albums))
|
||||||
|
|
||||||
|
|
||||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
@alru_cache
|
||||||
stop=stop_after_attempt(5),
|
@retry(retry=retry_if_exception_type(
|
||||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
(httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError, httpcore.RemoteProtocolError)),
|
||||||
|
stop=stop_after_attempt(5),
|
||||||
|
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||||
async def get_songs_from_artist(artist_id: str, storefront: str, token: str, lang: str, offset: int = 0):
|
async def get_songs_from_artist(artist_id: str, storefront: str, token: str, lang: str, offset: int = 0):
|
||||||
resp = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/artists/{artist_id}/songs",
|
async with request_lock:
|
||||||
params={"l": lang, "offset": offset},
|
resp = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/artists/{artist_id}/songs",
|
||||||
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
|
params={"l": lang, "offset": offset},
|
||||||
"Origin": "https://music.apple.com"})
|
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
|
||||||
artist_song = ArtistSongs.parse_obj(resp.json())
|
"Origin": "https://music.apple.com"})
|
||||||
songs = [song.attributes.url for song in artist_song.data]
|
artist_song = ArtistSongs.parse_obj(resp.json())
|
||||||
if artist_song.next:
|
songs = [song.attributes.url for song in artist_song.data]
|
||||||
next_songs = await get_songs_from_artist(artist_id, storefront, token, lang, offset + 20)
|
if artist_song.next:
|
||||||
songs.extend(next_songs)
|
next_songs = await get_songs_from_artist(artist_id, storefront, token, lang, offset + 20)
|
||||||
return list[set(songs)]
|
songs.extend(next_songs)
|
||||||
|
return list[set(songs)]
|
||||||
|
|
||||||
|
|
||||||
@retry(retry=retry_if_exception_type((httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError)),
|
@alru_cache
|
||||||
stop=stop_after_attempt(5),
|
@retry(retry=retry_if_exception_type(
|
||||||
before_sleep=before_sleep_log(logger, logging.WARN))
|
(httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError, httpcore.RemoteProtocolError)),
|
||||||
|
stop=stop_after_attempt(5),
|
||||||
|
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||||
async def get_artist_info(artist_id: str, storefront: str, token: str, lang: str):
|
async def get_artist_info(artist_id: str, storefront: str, token: str, lang: str):
|
||||||
resp = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/artists/{artist_id}",
|
async with request_lock:
|
||||||
params={"l": lang},
|
resp = await client.get(f"https://amp-api.music.apple.com/v1/catalog/{storefront}/artists/{artist_id}",
|
||||||
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
|
params={"l": lang},
|
||||||
"Origin": "https://music.apple.com"})
|
headers={"Authorization": f"Bearer {token}", "User-Agent": user_agent_browser,
|
||||||
return ArtistInfo.parse_obj(resp.json())
|
"Origin": "https://music.apple.com"})
|
||||||
|
return ArtistInfo.parse_obj(resp.json())
|
||||||
|
|
||||||
|
|
||||||
|
@alru_cache
|
||||||
|
@retry(retry=retry_if_exception_type(
|
||||||
|
(httpx.TimeoutException, httpcore.ConnectError, SSLError, FileNotFoundError, httpcore.RemoteProtocolError)),
|
||||||
|
stop=stop_after_attempt(5),
|
||||||
|
before_sleep=before_sleep_log(logger, logging.WARN))
|
||||||
|
async def download_m3u8(m3u8_url: str) -> str:
|
||||||
|
async with request_lock:
|
||||||
|
return (await client.get(m3u8_url)).text
|
||||||
|
|||||||
@@ -1,16 +1,17 @@
|
|||||||
import subprocess
|
import subprocess
|
||||||
import uuid
|
import uuid
|
||||||
|
from datetime import datetime
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
from typing import Tuple
|
from typing import Tuple
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
import m3u8
|
import m3u8
|
||||||
import regex
|
import regex
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
|
from src.api import download_m3u8
|
||||||
from src.exceptions import CodecNotFoundException
|
from src.exceptions import CodecNotFoundException
|
||||||
from src.metadata import SongMetadata
|
from src.metadata import SongMetadata
|
||||||
from src.types import *
|
from src.types import *
|
||||||
@@ -18,7 +19,7 @@ from src.utils import find_best_codec, get_codec_from_codec_id, get_suffix
|
|||||||
|
|
||||||
|
|
||||||
async def get_available_codecs(m3u8_url: str) -> Tuple[list[str], list[str]]:
|
async def get_available_codecs(m3u8_url: str) -> Tuple[list[str], list[str]]:
|
||||||
parsed_m3u8 = m3u8.load(m3u8_url)
|
parsed_m3u8 = m3u8.loads(await download_m3u8(m3u8_url), uri=m3u8_url)
|
||||||
codec_ids = [playlist.stream_info.audio for playlist in parsed_m3u8.playlists]
|
codec_ids = [playlist.stream_info.audio for playlist in parsed_m3u8.playlists]
|
||||||
codecs = [get_codec_from_codec_id(codec_id) for codec_id in codec_ids]
|
codecs = [get_codec_from_codec_id(codec_id) for codec_id in codec_ids]
|
||||||
return codecs, codec_ids
|
return codecs, codec_ids
|
||||||
@@ -26,7 +27,7 @@ async def get_available_codecs(m3u8_url: str) -> Tuple[list[str], list[str]]:
|
|||||||
|
|
||||||
async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata,
|
async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata,
|
||||||
codec_priority: list[str], alternative_codec: bool = False) -> Tuple[str, list[str]]:
|
codec_priority: list[str], alternative_codec: bool = False) -> Tuple[str, list[str]]:
|
||||||
parsed_m3u8 = m3u8.load(m3u8_url)
|
parsed_m3u8 = m3u8.loads(await download_m3u8(m3u8_url), uri=m3u8_url)
|
||||||
specifyPlaylist = find_best_codec(parsed_m3u8, codec)
|
specifyPlaylist = find_best_codec(parsed_m3u8, codec)
|
||||||
if not specifyPlaylist and alternative_codec:
|
if not specifyPlaylist and alternative_codec:
|
||||||
logger.warning(f"Codec {codec} of song: {song_metadata.artist} - {song_metadata.title} did not found")
|
logger.warning(f"Codec {codec} of song: {song_metadata.artist} - {song_metadata.title} did not found")
|
||||||
@@ -39,7 +40,7 @@ async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata,
|
|||||||
raise CodecNotFoundException
|
raise CodecNotFoundException
|
||||||
selected_codec = specifyPlaylist.media[0].group_id
|
selected_codec = specifyPlaylist.media[0].group_id
|
||||||
logger.info(f"Selected codec: {selected_codec} for song: {song_metadata.artist} - {song_metadata.title}")
|
logger.info(f"Selected codec: {selected_codec} for song: {song_metadata.artist} - {song_metadata.title}")
|
||||||
stream = m3u8.load(specifyPlaylist.absolute_uri)
|
stream = m3u8.loads(await download_m3u8(specifyPlaylist.absolute_uri), uri=specifyPlaylist.absolute_uri)
|
||||||
skds = [key.uri for key in stream.keys if regex.match('(skd?://[^"]*)', key.uri)]
|
skds = [key.uri for key in stream.keys if regex.match('(skd?://[^"]*)', key.uri)]
|
||||||
keys = [prefetchKey]
|
keys = [prefetchKey]
|
||||||
key_suffix = CodecKeySuffix.KeySuffixDefault
|
key_suffix = CodecKeySuffix.KeySuffixDefault
|
||||||
|
|||||||
Reference in New Issue
Block a user