mirror of
https://github.com/JonasunderscoreJones/turbo-octo-potato.git
synced 2025-10-24 10:39:19 +02:00
261 lines
10 KiB
Python
261 lines
10 KiB
Python
import os, spotipy, pylast, time
|
|
from spotipy.oauth2 import SpotifyOAuth
|
|
from typing import Union
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
class SpotifyTooManyAlbumsError(Exception):
|
|
"""Raised when an artist has more than 50 albums since the spotify API currently has a bug that prevents fetching more than the last 50 albums"""
|
|
pass
|
|
|
|
class Auth:
|
|
"""Authentication for Spotify and Last.fm"""
|
|
def __init__(self, verbose:bool=True, lastfm_network:pylast.LastFMNetwork=None, spotify:spotipy.Spotify=None) -> None:
|
|
load_dotenv()
|
|
self.LASTFM_API_KEY = os.getenv('LASTFM_API_KEY')
|
|
self.LASTFM_API_SECRET = os.getenv('LASTFM_API_SECRET')
|
|
self.LASTFM_USERNAME = os.getenv('LASTFM_USERNAME')
|
|
self.LASTFM_PASSWORD_HASH = os.getenv('LASTFM_PASSWORD_HASH')
|
|
self.SPOTIPY_CLIENT_ID = os.getenv('SPOTIFY_CLIENT_ID')
|
|
self.SPOTIPY_CLIENT_SECRET = os.getenv('SPOTIFY_CLIENT_SECRET')
|
|
self.SPOTIPY_REDIRECT_URI = os.getenv('SPOTIFY_REDIRECT_URI')
|
|
self.SPOTIFY_USER_ID = os.getenv('SPOTIFY_USER_ID')
|
|
self.VERBOSE = verbose
|
|
self.lastfm_network = lastfm_network
|
|
self.spotify = spotify
|
|
|
|
def verbose(self, verbose:bool=False) -> None:
|
|
"""Set verbose mode"""
|
|
self.VERBOSE = verbose
|
|
|
|
def newLastfmauth(self) -> pylast.LastFMNetwork:
|
|
"""
|
|
Authenticate with Last.fm
|
|
|
|
Returns:
|
|
pylast.LastFMNetwork: Last.fm network object
|
|
"""
|
|
|
|
if not self.LASTFM_API_KEY or not self.LASTFM_API_SECRET or not self.LASTFM_USERNAME or not self.LASTFM_PASSWORD_HASH:
|
|
raise ValueError("Please provide LASTFM_API_KEY, LASTFM_API_SECRET, LASTFM_USERNAME, and LASTFM_PASSWORD_HASH in the .env file.")
|
|
|
|
SESSION_KEY_FILE = os.path.join(os.path.expanduser("~"), ".lfm_session_key")
|
|
self.lastfm_network = pylast.LastFMNetwork(self.LASTFM_API_KEY, self.LASTFM_API_SECRET)
|
|
if not os.path.exists(SESSION_KEY_FILE):
|
|
skg = pylast.SessionKeyGenerator(self.lastfm_network)
|
|
url = skg.get_web_auth_url()
|
|
|
|
print(f"Please authorize this script to access your account: {url}\n")
|
|
import time
|
|
import webbrowser
|
|
|
|
webbrowser.open(url)
|
|
|
|
while True:
|
|
try:
|
|
session_key = skg.get_web_auth_session_key(url)
|
|
with open(SESSION_KEY_FILE, "w") as f:
|
|
f.write(session_key)
|
|
break
|
|
except pylast.WSError:
|
|
time.sleep(1)
|
|
else:
|
|
session_key = open(SESSION_KEY_FILE).read()
|
|
|
|
self.lastfm_network.session_key = session_key
|
|
return self.lastfm_network
|
|
|
|
def newSpotifyauth(self, scope: str) -> spotipy.Spotify:
|
|
"""
|
|
Authenticate with Spotify
|
|
|
|
Returns:
|
|
spotipy.Spotify: Spotify object
|
|
"""
|
|
|
|
if not self.SPOTIPY_CLIENT_ID or not self.SPOTIPY_CLIENT_SECRET or not self.SPOTIPY_REDIRECT_URI:
|
|
raise ValueError("Please provide SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET, and SPOTIPY_REDIRECT_URI in the .env file.")
|
|
|
|
# Create a SpotifyOAuth instance with headless=True
|
|
sp_oauth = SpotifyOAuth(
|
|
self.SPOTIPY_CLIENT_ID,
|
|
self.SPOTIPY_CLIENT_SECRET,
|
|
self.SPOTIPY_REDIRECT_URI,
|
|
scope=scope,
|
|
cache_path=".cache",
|
|
show_dialog=False, # Set this to False to avoid opening the browser
|
|
open_browser=False # Set this to False to avoid opening the browser
|
|
)
|
|
|
|
# Get the access token and refresh token
|
|
token_info = sp_oauth.get_access_token()
|
|
|
|
# Create a Spotipy instance using the obtained token
|
|
self.spotify = spotipy.Spotify(auth=token_info['access_token'])
|
|
return self.spotify
|
|
|
|
def getSpotify(self) -> spotipy.Spotify:
|
|
"""Get Spotify object"""
|
|
return self.spotify
|
|
|
|
def getLastfm(self) -> pylast.LastFMNetwork:
|
|
"""Get Last.fm network object"""
|
|
return self.lastfm_network
|
|
|
|
def getCreds(self) -> dict:
|
|
"""
|
|
Get credentials
|
|
|
|
Returns:
|
|
dict: Dictionary with credentials
|
|
"""
|
|
|
|
return {
|
|
"LASTFM_API_KEY": self.LASTFM_API_KEY,
|
|
"LASTFM_API_SECRET": self.LASTFM_API_SECRET,
|
|
"LASTFM_USERNAME": self.LASTFM_USERNAME,
|
|
"LASTFM_PASSWORD_HASH": self.LASTFM_PASSWORD_HASH,
|
|
"SPOTIPY_CLIENT_ID": self.SPOTIPY_CLIENT_ID,
|
|
"SPOTIPY_CLIENT_SECRET": self.SPOTIPY_CLIENT_SECRET,
|
|
"SPOTIPY_REDIRECT_URI": self.SPOTIPY_REDIRECT_URI,
|
|
"SPOTIFY_USER_ID": self.SPOTIFY_USER_ID,
|
|
}
|
|
|
|
class SpotifyManager:
|
|
def __init__(self, spotify:spotipy.Spotify) -> None:
|
|
self.spotify = spotify
|
|
|
|
def _artistAlbumGarbageHandler(self, apiresponse: str):
|
|
this_artist_album_part = []
|
|
for i in apiresponse['items']:
|
|
this_artist_album_part.append((i['name'], i['id'], i['release_date'], i['artists'][0]['name']))
|
|
#verboseprint("Found " + str(len(this_artist_album_part)) + " Albums for " + apiresponse['items'][0]['artists'][0]['name'])
|
|
return this_artist_album_part
|
|
|
|
def fetchArtistAlbums(self, artist: str, raise_error = True) -> list[(str, str, str, str)]:
|
|
albums = []
|
|
try:
|
|
this_artist_albums = []
|
|
this_artist_album_part_garbage = self.spotify.artist_albums(artist_id=artist, album_type="album,single,compilation", limit=50)
|
|
this_artist_albums = self._artistAlbumGarbageHandler(this_artist_album_part_garbage)
|
|
if this_artist_album_part_garbage['total'] > 50 and raise_error:
|
|
raise SpotifyTooManyAlbumsError("There currently is a bug in the Spotify API that prevents fetching anything after the first 50 Albums. Given that your artist " + this_artist_album_part_garbage['items'][0]['artists'][0]['name'] + " has more than 50 Albums, you'll need to manually add the missing ones.")
|
|
|
|
albums.append(this_artist_albums)
|
|
except TimeoutError:
|
|
print("\nNetwork unreachable. THIS IS NOT RECOVERABLE. Please restart the process")
|
|
exit()
|
|
return albums
|
|
|
|
def getTrackUrisFromAlbum(self, album_id: str) -> list[str]:
|
|
album_tracks = self.spotify.album_tracks(album_id)
|
|
song_uris = [track['uri'] for track in album_tracks['items']]
|
|
return song_uris
|
|
|
|
def fetchUserFollowedArtists(self):
|
|
followed_artists = []
|
|
followed_artists_part = self.spotify.current_user_followed_artists(limit=50)
|
|
for artist in followed_artists_part['artists']['items']:
|
|
followed_artists.append((artist['id'], artist['name']))
|
|
while followed_artists_part['artists']['next']:
|
|
followed_artists_part = self.spotify.next(followed_artists_part['artists'])
|
|
for artist in followed_artists_part['artists']['items']:
|
|
followed_artists.append((artist['id'], artist['name']))
|
|
return followed_artists
|
|
|
|
class Progressbar:
|
|
"""Progressbar for CLI"""
|
|
def __init__(self, total: int=None, etaCalc=None) -> None:
|
|
self.total = total
|
|
self.etaCalc = etaCalc
|
|
|
|
def _defaultEtaCalc(self, current: int, total: int) -> int:
|
|
return round((total - current)* 1)
|
|
|
|
def setTotal(self, total: int) -> None:
|
|
"""Set total"""
|
|
self.total = total
|
|
|
|
def setEta(self, eta) -> None:
|
|
"""Set eta"""
|
|
self.etaCalc = eta
|
|
|
|
def _getWidth(self) -> int:
|
|
try:
|
|
columns = os.get_terminal_size(0).columns
|
|
except OSError:
|
|
columns = os.get_terminal_size(1).columns
|
|
return columns
|
|
|
|
def buildSnapshot(self, current: int, eta: int=None) -> str:
|
|
if not eta:
|
|
if self.etaCalc == None:
|
|
eta = "N/A"
|
|
else:
|
|
eta = self.etaCalc(current, self.total)
|
|
eta = eta if eta >= 0 else 0
|
|
if eta < 60:
|
|
eta = str(eta) + "s"
|
|
elif eta < 3600:
|
|
eta = str(round(eta/60)) + "m"
|
|
else:
|
|
eta = str(round(eta/3600)) + "h"
|
|
else:
|
|
eta = round((eta*(self.total - current)))
|
|
if eta < 60:
|
|
eta = str(eta) + "s"
|
|
elif eta < 3600:
|
|
eta = str(round(eta/60)) + "min"
|
|
else:
|
|
eta = str(round(eta/3600)) + "h"
|
|
width = self._getWidth()
|
|
total_num_len = len(str(self.total))
|
|
if width < 2* total_num_len + 15:
|
|
return f"{current}/{self.total}", eta
|
|
else:
|
|
if current > self.total:
|
|
current = self.total
|
|
current_spacer = " "*(total_num_len-len(str(current)))
|
|
percent = str(round(current/self.total*100))
|
|
percent = " "*(3-len(percent)) + percent
|
|
progress_bar_length = width - 2*total_num_len - 13 - len(str(eta)) - len(percent)
|
|
progress_bar_progress = round((current/self.total)*progress_bar_length)
|
|
progress_bar_spacer = " "*(progress_bar_length-progress_bar_progress)
|
|
return f"[{current_spacer}{current}/{self.total}|{percent}%|ETA: {eta}|{'~'*progress_bar_progress if percent == ' 69' else '='*progress_bar_progress}>{progress_bar_spacer}]"
|
|
|
|
def print(self, current: int, eta: int=None) -> None:
|
|
"""Print progressbar"""
|
|
print(self.buildSnapshot(current, eta), end="\r")
|
|
|
|
|
|
class ProgressBarEtaManager:
|
|
def __init__(self):
|
|
self.last_timestamp = time.time()
|
|
self.durations = []
|
|
|
|
def now(self):
|
|
self.durations.append(time.time() - self.last_timestamp)
|
|
self.last_timestamp = time.time()
|
|
|
|
def getDurations(self):
|
|
return self.durations
|
|
|
|
def getAvgEta(self):
|
|
eta = sum(self.durations)/len(self.durations)
|
|
return eta if eta >= 0 else 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
def eta(current, total):
|
|
return round((total - current)* 1)
|
|
|
|
progressbar = Progressbar(10, eta)
|
|
for i in range(10):
|
|
print(progressbar.buildSnapshot(i), end="\r")
|
|
time.sleep(1)
|
|
|
|
progressbar = Progressbar(10)
|
|
for i in range(10):
|
|
print(progressbar.buildSnapshot(i), end="\r")
|
|
time.sleep(1)
|