diff --git a/likedsongsync2.py b/likedsongsync2.py index 0ba7809..b816bdc 100644 --- a/likedsongsync2.py +++ b/likedsongsync2.py @@ -1,6 +1,11 @@ -import pylast, spotipy, sys, os, time -from spotipy.oauth2 import SpotifyOAuth +'''A script to sync your liked songs from Spotify to Last.fm and a Spotify +playlist that can be made public (unlike the built-in liked songs playlist).''' +import sys +import os +import time +import spotipy from dotenv import load_dotenv + import top_lib # load .env file @@ -10,31 +15,53 @@ load_dotenv() LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID') def progress_bar(current, total, last_time_stamp=float, etastr=None): + '''A function to print a progress bar to the terminal. + current: The current progress + total: The total progress + last_time_stamp: The time stamp of the last call of this function + etastr: The estimated time until completion as a string + + Returns: A string with the progress bar''' current = total if current > total else current this_timestamp = time.time() width = os.get_terminal_size().columns - progress = round((current/total)*width) + #progress = round((current/total)*width) total_num_len = len(str(total)) if width < 2* total_num_len + 15: return f"{current}/{total}", this_timestamp + + current_spacer = " "*(total_num_len-len(str(current))) + if etastr: + eta = etastr else: - current_spacer = " "*(total_num_len-len(str(current))) - if etastr: - eta = etastr - else: - eta = str(round((total - current)* (this_timestamp - last_time_stamp)/60)) + "s" - percent = str(round(current/total*100)) - percent = " "*(3-len(percent)) + percent - progress_bar_length = width - 2*total_num_len - 13 - len(str(eta)) - len(percent) - progress_bar_progress = round((current/total)*progress_bar_length) - progress_bar_spacer = " "*(progress_bar_length-progress_bar_progress) - return f"[{current_spacer}{current}/{total}|{percent}%|ETA: {eta}|{'='*progress_bar_progress}>{progress_bar_spacer}]", this_timestamp + eta = str(round((total - current) * + (this_timestamp - last_time_stamp)/60)) + "s" + percent = str(round(current/total*100)) + percent = " "*(3-len(percent)) + percent + progress_bar_length = width - 2 * \ + total_num_len - 13 - len(str(eta)) - len(percent) + progress_bar_progress = round((current/total)*progress_bar_length) + progress_bar_spacer = " "*(progress_bar_length-progress_bar_progress) + return f"[{current_spacer}{current}/{total}|{percent}%|ETA: {eta}|" + \ + f"{'='*progress_bar_progress}>{progress_bar_spacer}]", \ + this_timestamp def verboseprint(message, end="\n"): + '''A function to print verbose output. + + message: The message to print + end: The end character to use for the print function''' if VERBOSE_LOGGING: print(message, end=end) def handle_playlist_part_return(playlist_part:str, all_songs:list): + '''A function to handle parsing the playlist part and adding them to the + list of all songs. + + playlist_part: The playlist part to handle + all_songs: The list of all songs to append the new songs to + + Returns: The updated list of all songs''' for item in playlist_part["items"]: track_uri = item["track"]["uri"] track_name = item["track"]["name"] @@ -44,22 +71,31 @@ def handle_playlist_part_return(playlist_part:str, all_songs:list): return all_songs def get_all_songs_from_playlist(playlist_id): + '''A function to get all songs from a playlist. + + playlist_id: The ID of the playlist to get the songs from + + Returns: A list of all songs in the playlist''' verboseprint("Fetching songs from the liked songs playlist...") progressbar = top_lib.Progressbar() - progressBarEtaManager = top_lib.ProgressBarEtaManager() + progress_bar_eta_manager = top_lib.ProgressBarEtaManager() all_songs = [] limit = 100 offset = 0 while True: - playlist_part = sp.playlist_items(playlist_id, limit=limit, offset=offset) + playlist_part = sp.playlist_items(playlist_id, + limit=limit, + offset=offset) if not playlist_part["items"]: break all_songs = handle_playlist_part_return(playlist_part, all_songs) - progressBarEtaManager.now() + progress_bar_eta_manager.now() progressbar.setTotal(playlist_part["total"]) - progress_print = progressbar.buildSnapshot(offset+limit, progressBarEtaManager.getAvgEta()/limit) + progress_print = progressbar.buildSnapshot(offset+limit, + progress_bar_eta_manager + .getAvgEta()/limit) verboseprint(progress_print, end="\r") offset += limit @@ -67,23 +103,31 @@ def get_all_songs_from_playlist(playlist_id): return all_songs def get_all_liked_songs(): + '''A function to get all liked songs from Spotify. + + Returns: A list of all liked songs''' verboseprint("Fetching liked songs...") - verboseprint("This may take a while... (the API only allows for 50 songs per request for the liked songs)") + verboseprint("This may take a while...(the API only allows for " + \ + "50 songs per request for the liked songs)") progressbar = top_lib.Progressbar() - progressBarEtaManager = top_lib.ProgressBarEtaManager() + progress_bar_eta_manager = top_lib.ProgressBarEtaManager() all_liked_songs = [] limit = 50 offset = 0 while True: - liked_songs_chunk = sp.current_user_saved_tracks(limit=limit, offset=offset) + liked_songs_chunk = sp.current_user_saved_tracks(limit=limit, + offset=offset) if not liked_songs_chunk["items"]: break - all_liked_songs = handle_playlist_part_return(liked_songs_chunk, all_liked_songs) + all_liked_songs = handle_playlist_part_return(liked_songs_chunk, + all_liked_songs) - progressBarEtaManager.now(), + progress_bar_eta_manager.now() progressbar.setTotal(liked_songs_chunk["total"]) - progress_print = progressbar.buildSnapshot(offset+limit, progressBarEtaManager.getAvgEta()/limit) + progress_print = progressbar.buildSnapshot(offset+limit, + progress_bar_eta_manager + .getAvgEta()/limit) verboseprint(progress_print, end="\r") offset += limit @@ -91,6 +135,12 @@ def get_all_liked_songs(): return all_liked_songs def is_track_in_playlist(playlist_song_list, track_uri): + '''A function to check if a track is in a playlist. + + playlist_song_list: The list of songs in the playlist + track_uri: The URI of the track to check + + Returns: True if the track is in the playlist, False otherwise''' playlist_tracks = playlist_song_list for item in playlist_tracks: if item[0] == track_uri: @@ -98,8 +148,13 @@ def is_track_in_playlist(playlist_song_list, track_uri): return False def add_track_to_playlist(playlist_id, track_uri): + '''A function to add a track to a playlist. + + playlist_id: The ID of the playlist to add the track to + track_uri: The URI of the track to add to the playlist''' sp.playlist_add_items(playlist_id, [track_uri]) + if __name__ == "__main__": # Parse command-line arguments @@ -107,24 +162,25 @@ if __name__ == "__main__": FORCE_RESYNC_ALL = "-f" in sys.argv or "--force-all" in sys.argv try: - SKIPSONGS = int(sys.argv[sys.argv.index("--skip") + 1]) if "--skip" in sys.argv else int(sys.argv[sys.argv.index("-s") + 1]) if "-s" in sys.argv else 0 - except: + SKIPSONGS = int(sys.argv[sys.argv.index("--skip") + 1]) if "--skip" \ + in sys.argv else int(sys.argv[sys.argv.index("-s") + 1]) \ + if "-s" in sys.argv else 0 + except ValueError: print("[--skip/-s] Require a number to be set.") print("E.g.: --skip 88") - exit() + sys.exit(-1) verboseprint("Authenticating Spotify...") authenticator = top_lib.Auth(verbose=VERBOSE_LOGGING) # Create a Spotipy instance with authentication - #sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=SPOTIPY_CLIENT_ID, client_secret=SPOTIPY_CLIENT_SECRET, redirect_uri=SPOTIPY_REDIRECT_URI, scope="user-library-read playlist-modify-public playlist-modify-private")) - sp = authenticator.newSpotifyauth(scope="user-library-read playlist-modify-public playlist-modify-private") + sp = authenticator.newSpotifyauth( + scope="user-library-read playlist-modify-public playlist-modify-private") verboseprint("Authenticating Last.fm...") # Set up Last.fm network - #network = pylast.LastFMNetwork(api_key=LASTFM_API_KEY, api_secret=LASTFM_API_SECRET, username=LASTFM_USERNAME, password_hash=LASTFM_PASSWORD_HASH) network = authenticator.newLastfmauth() # Main loop for syncing liked songs @@ -137,50 +193,60 @@ if __name__ == "__main__": liked_songs = [x for x in liked_songs if x not in liked_songs_playlist_songs] if len(liked_songs) == 0: print("Nothing to do.") - exit() + sys.exit(0) verboseprint(f"Number of playlist songs: {len(liked_songs_playlist_songs)}") verboseprint(f"Skipping the first {SKIPSONGS} songs...") - tracknr = 0 + TRACK_NR = 0 last_time_stamp = time.time() progressbar = top_lib.Progressbar() progressbar.setTotal(len(liked_songs)) progressBarEtaManager = top_lib.ProgressBarEtaManager() for track_uri, track_name, artist_name in liked_songs[::-1]: - tracknr += 1 + TRACK_NR += 1 def loop_do(last_time_stamp): + '''A function to loop until the API call succeeds. + + last_time_stamp: The time stamp of the last call of this function + + Returns: The time stamp of the last call of this function''' track = sp.track(track_uri) fm_track = network.get_track(artist_name, track_name) fm_track.love() fm_track.add_tags(("awesome", "favorite")) verboseprint(' '* os.get_terminal_size().columns, end="\r") if not is_track_in_playlist(liked_songs_playlist_songs, track_uri): - verboseprint("[" + f"%{4 + len(str(len(liked_songs)))*2}s" % (f"{tracknr}/{len(liked_songs)}|+]") + "%30.32s %s" % (track['artists'][0]['name'], track['name'])) - #progress_print, last_time_stamp = progress_bar(tracknr, len(liked_songs), etastr=str(round((((int(len(liked_songs))-tracknr)*0.75)/60)))+"min") + verboseprint("[" + f"%{4 + len(str(len(liked_songs)))*2}s" % + (f"{TRACK_NR}/{len(liked_songs)}|+]") + + "%30.32s %s" % (track['artists'][0]['name'], + track['name'])) else: - verboseprint("[" + f"%{2 + len(str(len(liked_songs)))*2}s" % (f"{tracknr}/{len(liked_songs)}]") + "%32.32s %s" % (track['artists'][0]['name'], track['name'])) - #progress_print, last_time_stamp = progress_bar(tracknr, len(liked_songs), etastr=str(round((((int(len(liked_songs))-tracknr)*0.75)/60)))+"min") - #progressBarEtaManager.now() - #progress_print = progressbar.buildSnapshot(tracknr, progressBarEtaManager.getAvgEta()) - #verboseprint(progress_print, end="\r") + verboseprint("[" + f"%{2 + len(str(len(liked_songs)))*2}s" % + (f"{TRACK_NR}/{len(liked_songs)}]") + + "%32.32s %s" % (track['artists'][0]['name'], + track['name'])) progressBarEtaManager.now() #print(progressBarEtaManager.getDurations()) #print(tracknr) - progress_print = progressbar.buildSnapshot(tracknr, progressBarEtaManager.getAvgEta()) + progress_print = progressbar.buildSnapshot(TRACK_NR, + progressBarEtaManager + .getAvgEta()) #print(progressBarEtaManager.getDurations()) verboseprint(progress_print, end="\r") - add_track_to_playlist(LIKEDSONGPLAYLIST_ID, track_uri) + if not is_track_in_playlist(liked_songs_playlist_songs, track_uri): + add_track_to_playlist(LIKEDSONGPLAYLIST_ID, track_uri) return last_time_stamp # Loop until the API call succeeds - while tracknr > SKIPSONGS: + while TRACK_NR > SKIPSONGS: try: last_time_stamp = loop_do(last_time_stamp) break except KeyboardInterrupt: # Allow the user to interrupt the script - exit() + print("Interrupted by user.") + sys.exit(0) except spotipy.SpotifyException as e: if e.http_status == 429: time.sleep(30) @@ -188,6 +254,6 @@ if __name__ == "__main__": verboseprint("WARN:RATELIMIT EXCEEDED] Waiting 30 seconds to proceed...") else: verboseprint(e.http_status) - except: + except Exception: #except e: continue