mirror of
https://github.com/JonasunderscoreJones/turbo-octo-potato.git
synced 2025-10-28 03:29:18 +01:00
Refactored code to obey to pep-8
This commit is contained in:
parent
8e9225eb9e
commit
4fbf55d1f9
1 changed files with 111 additions and 45 deletions
|
|
@ -1,6 +1,11 @@
|
||||||
import pylast, spotipy, sys, os, time
|
'''A script to sync your liked songs from Spotify to Last.fm and a Spotify
|
||||||
from spotipy.oauth2 import SpotifyOAuth
|
playlist that can be made public (unlike the built-in liked songs playlist).'''
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import spotipy
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
import top_lib
|
import top_lib
|
||||||
|
|
||||||
# load .env file
|
# load .env file
|
||||||
|
|
@ -10,31 +15,53 @@ load_dotenv()
|
||||||
LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID')
|
LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID')
|
||||||
|
|
||||||
def progress_bar(current, total, last_time_stamp=float, etastr=None):
|
def progress_bar(current, total, last_time_stamp=float, etastr=None):
|
||||||
|
'''A function to print a progress bar to the terminal.
|
||||||
|
current: The current progress
|
||||||
|
total: The total progress
|
||||||
|
last_time_stamp: The time stamp of the last call of this function
|
||||||
|
etastr: The estimated time until completion as a string
|
||||||
|
|
||||||
|
Returns: A string with the progress bar'''
|
||||||
current = total if current > total else current
|
current = total if current > total else current
|
||||||
this_timestamp = time.time()
|
this_timestamp = time.time()
|
||||||
width = os.get_terminal_size().columns
|
width = os.get_terminal_size().columns
|
||||||
progress = round((current/total)*width)
|
#progress = round((current/total)*width)
|
||||||
total_num_len = len(str(total))
|
total_num_len = len(str(total))
|
||||||
if width < 2* total_num_len + 15:
|
if width < 2* total_num_len + 15:
|
||||||
return f"{current}/{total}", this_timestamp
|
return f"{current}/{total}", this_timestamp
|
||||||
else:
|
|
||||||
current_spacer = " "*(total_num_len-len(str(current)))
|
current_spacer = " "*(total_num_len-len(str(current)))
|
||||||
if etastr:
|
if etastr:
|
||||||
eta = etastr
|
eta = etastr
|
||||||
else:
|
else:
|
||||||
eta = str(round((total - current)* (this_timestamp - last_time_stamp)/60)) + "s"
|
eta = str(round((total - current) *
|
||||||
|
(this_timestamp - last_time_stamp)/60)) + "s"
|
||||||
percent = str(round(current/total*100))
|
percent = str(round(current/total*100))
|
||||||
percent = " "*(3-len(percent)) + percent
|
percent = " "*(3-len(percent)) + percent
|
||||||
progress_bar_length = width - 2*total_num_len - 13 - len(str(eta)) - len(percent)
|
progress_bar_length = width - 2 * \
|
||||||
|
total_num_len - 13 - len(str(eta)) - len(percent)
|
||||||
progress_bar_progress = round((current/total)*progress_bar_length)
|
progress_bar_progress = round((current/total)*progress_bar_length)
|
||||||
progress_bar_spacer = " "*(progress_bar_length-progress_bar_progress)
|
progress_bar_spacer = " "*(progress_bar_length-progress_bar_progress)
|
||||||
return f"[{current_spacer}{current}/{total}|{percent}%|ETA: {eta}|{'='*progress_bar_progress}>{progress_bar_spacer}]", this_timestamp
|
return f"[{current_spacer}{current}/{total}|{percent}%|ETA: {eta}|" + \
|
||||||
|
f"{'='*progress_bar_progress}>{progress_bar_spacer}]", \
|
||||||
|
this_timestamp
|
||||||
|
|
||||||
def verboseprint(message, end="\n"):
|
def verboseprint(message, end="\n"):
|
||||||
|
'''A function to print verbose output.
|
||||||
|
|
||||||
|
message: The message to print
|
||||||
|
end: The end character to use for the print function'''
|
||||||
if VERBOSE_LOGGING:
|
if VERBOSE_LOGGING:
|
||||||
print(message, end=end)
|
print(message, end=end)
|
||||||
|
|
||||||
def handle_playlist_part_return(playlist_part:str, all_songs:list):
|
def handle_playlist_part_return(playlist_part:str, all_songs:list):
|
||||||
|
'''A function to handle parsing the playlist part and adding them to the
|
||||||
|
list of all songs.
|
||||||
|
|
||||||
|
playlist_part: The playlist part to handle
|
||||||
|
all_songs: The list of all songs to append the new songs to
|
||||||
|
|
||||||
|
Returns: The updated list of all songs'''
|
||||||
for item in playlist_part["items"]:
|
for item in playlist_part["items"]:
|
||||||
track_uri = item["track"]["uri"]
|
track_uri = item["track"]["uri"]
|
||||||
track_name = item["track"]["name"]
|
track_name = item["track"]["name"]
|
||||||
|
|
@ -44,22 +71,31 @@ def handle_playlist_part_return(playlist_part:str, all_songs:list):
|
||||||
return all_songs
|
return all_songs
|
||||||
|
|
||||||
def get_all_songs_from_playlist(playlist_id):
|
def get_all_songs_from_playlist(playlist_id):
|
||||||
|
'''A function to get all songs from a playlist.
|
||||||
|
|
||||||
|
playlist_id: The ID of the playlist to get the songs from
|
||||||
|
|
||||||
|
Returns: A list of all songs in the playlist'''
|
||||||
verboseprint("Fetching songs from the liked songs playlist...")
|
verboseprint("Fetching songs from the liked songs playlist...")
|
||||||
progressbar = top_lib.Progressbar()
|
progressbar = top_lib.Progressbar()
|
||||||
progressBarEtaManager = top_lib.ProgressBarEtaManager()
|
progress_bar_eta_manager = top_lib.ProgressBarEtaManager()
|
||||||
all_songs = []
|
all_songs = []
|
||||||
limit = 100
|
limit = 100
|
||||||
offset = 0
|
offset = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
playlist_part = sp.playlist_items(playlist_id, limit=limit, offset=offset)
|
playlist_part = sp.playlist_items(playlist_id,
|
||||||
|
limit=limit,
|
||||||
|
offset=offset)
|
||||||
if not playlist_part["items"]:
|
if not playlist_part["items"]:
|
||||||
break
|
break
|
||||||
all_songs = handle_playlist_part_return(playlist_part, all_songs)
|
all_songs = handle_playlist_part_return(playlist_part, all_songs)
|
||||||
|
|
||||||
progressBarEtaManager.now()
|
progress_bar_eta_manager.now()
|
||||||
progressbar.setTotal(playlist_part["total"])
|
progressbar.setTotal(playlist_part["total"])
|
||||||
progress_print = progressbar.buildSnapshot(offset+limit, progressBarEtaManager.getAvgEta()/limit)
|
progress_print = progressbar.buildSnapshot(offset+limit,
|
||||||
|
progress_bar_eta_manager
|
||||||
|
.getAvgEta()/limit)
|
||||||
verboseprint(progress_print, end="\r")
|
verboseprint(progress_print, end="\r")
|
||||||
|
|
||||||
offset += limit
|
offset += limit
|
||||||
|
|
@ -67,23 +103,31 @@ def get_all_songs_from_playlist(playlist_id):
|
||||||
return all_songs
|
return all_songs
|
||||||
|
|
||||||
def get_all_liked_songs():
|
def get_all_liked_songs():
|
||||||
|
'''A function to get all liked songs from Spotify.
|
||||||
|
|
||||||
|
Returns: A list of all liked songs'''
|
||||||
verboseprint("Fetching liked songs...")
|
verboseprint("Fetching liked songs...")
|
||||||
verboseprint("This may take a while... (the API only allows for 50 songs per request for the liked songs)")
|
verboseprint("This may take a while...(the API only allows for " + \
|
||||||
|
"50 songs per request for the liked songs)")
|
||||||
progressbar = top_lib.Progressbar()
|
progressbar = top_lib.Progressbar()
|
||||||
progressBarEtaManager = top_lib.ProgressBarEtaManager()
|
progress_bar_eta_manager = top_lib.ProgressBarEtaManager()
|
||||||
all_liked_songs = []
|
all_liked_songs = []
|
||||||
limit = 50
|
limit = 50
|
||||||
offset = 0
|
offset = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
liked_songs_chunk = sp.current_user_saved_tracks(limit=limit, offset=offset)
|
liked_songs_chunk = sp.current_user_saved_tracks(limit=limit,
|
||||||
|
offset=offset)
|
||||||
if not liked_songs_chunk["items"]:
|
if not liked_songs_chunk["items"]:
|
||||||
break
|
break
|
||||||
all_liked_songs = handle_playlist_part_return(liked_songs_chunk, all_liked_songs)
|
all_liked_songs = handle_playlist_part_return(liked_songs_chunk,
|
||||||
|
all_liked_songs)
|
||||||
|
|
||||||
progressBarEtaManager.now(),
|
progress_bar_eta_manager.now()
|
||||||
progressbar.setTotal(liked_songs_chunk["total"])
|
progressbar.setTotal(liked_songs_chunk["total"])
|
||||||
progress_print = progressbar.buildSnapshot(offset+limit, progressBarEtaManager.getAvgEta()/limit)
|
progress_print = progressbar.buildSnapshot(offset+limit,
|
||||||
|
progress_bar_eta_manager
|
||||||
|
.getAvgEta()/limit)
|
||||||
verboseprint(progress_print, end="\r")
|
verboseprint(progress_print, end="\r")
|
||||||
|
|
||||||
offset += limit
|
offset += limit
|
||||||
|
|
@ -91,6 +135,12 @@ def get_all_liked_songs():
|
||||||
return all_liked_songs
|
return all_liked_songs
|
||||||
|
|
||||||
def is_track_in_playlist(playlist_song_list, track_uri):
|
def is_track_in_playlist(playlist_song_list, track_uri):
|
||||||
|
'''A function to check if a track is in a playlist.
|
||||||
|
|
||||||
|
playlist_song_list: The list of songs in the playlist
|
||||||
|
track_uri: The URI of the track to check
|
||||||
|
|
||||||
|
Returns: True if the track is in the playlist, False otherwise'''
|
||||||
playlist_tracks = playlist_song_list
|
playlist_tracks = playlist_song_list
|
||||||
for item in playlist_tracks:
|
for item in playlist_tracks:
|
||||||
if item[0] == track_uri:
|
if item[0] == track_uri:
|
||||||
|
|
@ -98,8 +148,13 @@ def is_track_in_playlist(playlist_song_list, track_uri):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def add_track_to_playlist(playlist_id, track_uri):
|
def add_track_to_playlist(playlist_id, track_uri):
|
||||||
|
'''A function to add a track to a playlist.
|
||||||
|
|
||||||
|
playlist_id: The ID of the playlist to add the track to
|
||||||
|
track_uri: The URI of the track to add to the playlist'''
|
||||||
sp.playlist_add_items(playlist_id, [track_uri])
|
sp.playlist_add_items(playlist_id, [track_uri])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
||||||
# Parse command-line arguments
|
# Parse command-line arguments
|
||||||
|
|
@ -107,24 +162,25 @@ if __name__ == "__main__":
|
||||||
FORCE_RESYNC_ALL = "-f" in sys.argv or "--force-all" in sys.argv
|
FORCE_RESYNC_ALL = "-f" in sys.argv or "--force-all" in sys.argv
|
||||||
|
|
||||||
try:
|
try:
|
||||||
SKIPSONGS = int(sys.argv[sys.argv.index("--skip") + 1]) if "--skip" in sys.argv else int(sys.argv[sys.argv.index("-s") + 1]) if "-s" in sys.argv else 0
|
SKIPSONGS = int(sys.argv[sys.argv.index("--skip") + 1]) if "--skip" \
|
||||||
except:
|
in sys.argv else int(sys.argv[sys.argv.index("-s") + 1]) \
|
||||||
|
if "-s" in sys.argv else 0
|
||||||
|
except ValueError:
|
||||||
print("[--skip/-s] Require a number to be set.")
|
print("[--skip/-s] Require a number to be set.")
|
||||||
print("E.g.: --skip 88")
|
print("E.g.: --skip 88")
|
||||||
exit()
|
sys.exit(-1)
|
||||||
|
|
||||||
verboseprint("Authenticating Spotify...")
|
verboseprint("Authenticating Spotify...")
|
||||||
|
|
||||||
authenticator = top_lib.Auth(verbose=VERBOSE_LOGGING)
|
authenticator = top_lib.Auth(verbose=VERBOSE_LOGGING)
|
||||||
|
|
||||||
# Create a Spotipy instance with authentication
|
# Create a Spotipy instance with authentication
|
||||||
#sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=SPOTIPY_CLIENT_ID, client_secret=SPOTIPY_CLIENT_SECRET, redirect_uri=SPOTIPY_REDIRECT_URI, scope="user-library-read playlist-modify-public playlist-modify-private"))
|
sp = authenticator.newSpotifyauth(
|
||||||
sp = authenticator.newSpotifyauth(scope="user-library-read playlist-modify-public playlist-modify-private")
|
scope="user-library-read playlist-modify-public playlist-modify-private")
|
||||||
|
|
||||||
verboseprint("Authenticating Last.fm...")
|
verboseprint("Authenticating Last.fm...")
|
||||||
|
|
||||||
# Set up Last.fm network
|
# Set up Last.fm network
|
||||||
#network = pylast.LastFMNetwork(api_key=LASTFM_API_KEY, api_secret=LASTFM_API_SECRET, username=LASTFM_USERNAME, password_hash=LASTFM_PASSWORD_HASH)
|
|
||||||
network = authenticator.newLastfmauth()
|
network = authenticator.newLastfmauth()
|
||||||
|
|
||||||
# Main loop for syncing liked songs
|
# Main loop for syncing liked songs
|
||||||
|
|
@ -137,50 +193,60 @@ if __name__ == "__main__":
|
||||||
liked_songs = [x for x in liked_songs if x not in liked_songs_playlist_songs]
|
liked_songs = [x for x in liked_songs if x not in liked_songs_playlist_songs]
|
||||||
if len(liked_songs) == 0:
|
if len(liked_songs) == 0:
|
||||||
print("Nothing to do.")
|
print("Nothing to do.")
|
||||||
exit()
|
sys.exit(0)
|
||||||
verboseprint(f"Number of playlist songs: {len(liked_songs_playlist_songs)}")
|
verboseprint(f"Number of playlist songs: {len(liked_songs_playlist_songs)}")
|
||||||
verboseprint(f"Skipping the first {SKIPSONGS} songs...")
|
verboseprint(f"Skipping the first {SKIPSONGS} songs...")
|
||||||
tracknr = 0
|
TRACK_NR = 0
|
||||||
last_time_stamp = time.time()
|
last_time_stamp = time.time()
|
||||||
progressbar = top_lib.Progressbar()
|
progressbar = top_lib.Progressbar()
|
||||||
progressbar.setTotal(len(liked_songs))
|
progressbar.setTotal(len(liked_songs))
|
||||||
progressBarEtaManager = top_lib.ProgressBarEtaManager()
|
progressBarEtaManager = top_lib.ProgressBarEtaManager()
|
||||||
for track_uri, track_name, artist_name in liked_songs[::-1]:
|
for track_uri, track_name, artist_name in liked_songs[::-1]:
|
||||||
tracknr += 1
|
TRACK_NR += 1
|
||||||
|
|
||||||
def loop_do(last_time_stamp):
|
def loop_do(last_time_stamp):
|
||||||
|
'''A function to loop until the API call succeeds.
|
||||||
|
|
||||||
|
last_time_stamp: The time stamp of the last call of this function
|
||||||
|
|
||||||
|
Returns: The time stamp of the last call of this function'''
|
||||||
track = sp.track(track_uri)
|
track = sp.track(track_uri)
|
||||||
fm_track = network.get_track(artist_name, track_name)
|
fm_track = network.get_track(artist_name, track_name)
|
||||||
fm_track.love()
|
fm_track.love()
|
||||||
fm_track.add_tags(("awesome", "favorite"))
|
fm_track.add_tags(("awesome", "favorite"))
|
||||||
verboseprint(' '* os.get_terminal_size().columns, end="\r")
|
verboseprint(' '* os.get_terminal_size().columns, end="\r")
|
||||||
if not is_track_in_playlist(liked_songs_playlist_songs, track_uri):
|
if not is_track_in_playlist(liked_songs_playlist_songs, track_uri):
|
||||||
verboseprint("[" + f"%{4 + len(str(len(liked_songs)))*2}s" % (f"{tracknr}/{len(liked_songs)}|+]") + "%30.32s %s" % (track['artists'][0]['name'], track['name']))
|
verboseprint("[" + f"%{4 + len(str(len(liked_songs)))*2}s" %
|
||||||
#progress_print, last_time_stamp = progress_bar(tracknr, len(liked_songs), etastr=str(round((((int(len(liked_songs))-tracknr)*0.75)/60)))+"min")
|
(f"{TRACK_NR}/{len(liked_songs)}|+]") +
|
||||||
|
"%30.32s %s" % (track['artists'][0]['name'],
|
||||||
|
track['name']))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
verboseprint("[" + f"%{2 + len(str(len(liked_songs)))*2}s" % (f"{tracknr}/{len(liked_songs)}]") + "%32.32s %s" % (track['artists'][0]['name'], track['name']))
|
verboseprint("[" + f"%{2 + len(str(len(liked_songs)))*2}s" %
|
||||||
#progress_print, last_time_stamp = progress_bar(tracknr, len(liked_songs), etastr=str(round((((int(len(liked_songs))-tracknr)*0.75)/60)))+"min")
|
(f"{TRACK_NR}/{len(liked_songs)}]") +
|
||||||
#progressBarEtaManager.now()
|
"%32.32s %s" % (track['artists'][0]['name'],
|
||||||
#progress_print = progressbar.buildSnapshot(tracknr, progressBarEtaManager.getAvgEta())
|
track['name']))
|
||||||
#verboseprint(progress_print, end="\r")
|
|
||||||
|
|
||||||
progressBarEtaManager.now()
|
progressBarEtaManager.now()
|
||||||
#print(progressBarEtaManager.getDurations())
|
#print(progressBarEtaManager.getDurations())
|
||||||
#print(tracknr)
|
#print(tracknr)
|
||||||
progress_print = progressbar.buildSnapshot(tracknr, progressBarEtaManager.getAvgEta())
|
progress_print = progressbar.buildSnapshot(TRACK_NR,
|
||||||
|
progressBarEtaManager
|
||||||
|
.getAvgEta())
|
||||||
#print(progressBarEtaManager.getDurations())
|
#print(progressBarEtaManager.getDurations())
|
||||||
verboseprint(progress_print, end="\r")
|
verboseprint(progress_print, end="\r")
|
||||||
|
if not is_track_in_playlist(liked_songs_playlist_songs, track_uri):
|
||||||
add_track_to_playlist(LIKEDSONGPLAYLIST_ID, track_uri)
|
add_track_to_playlist(LIKEDSONGPLAYLIST_ID, track_uri)
|
||||||
|
|
||||||
return last_time_stamp
|
return last_time_stamp
|
||||||
# Loop until the API call succeeds
|
# Loop until the API call succeeds
|
||||||
while tracknr > SKIPSONGS:
|
while TRACK_NR > SKIPSONGS:
|
||||||
try:
|
try:
|
||||||
last_time_stamp = loop_do(last_time_stamp)
|
last_time_stamp = loop_do(last_time_stamp)
|
||||||
break
|
break
|
||||||
except KeyboardInterrupt: # Allow the user to interrupt the script
|
except KeyboardInterrupt: # Allow the user to interrupt the script
|
||||||
exit()
|
print("Interrupted by user.")
|
||||||
|
sys.exit(0)
|
||||||
except spotipy.SpotifyException as e:
|
except spotipy.SpotifyException as e:
|
||||||
if e.http_status == 429:
|
if e.http_status == 429:
|
||||||
time.sleep(30)
|
time.sleep(30)
|
||||||
|
|
@ -188,6 +254,6 @@ if __name__ == "__main__":
|
||||||
verboseprint("WARN:RATELIMIT EXCEEDED] Waiting 30 seconds to proceed...")
|
verboseprint("WARN:RATELIMIT EXCEEDED] Waiting 30 seconds to proceed...")
|
||||||
else:
|
else:
|
||||||
verboseprint(e.http_status)
|
verboseprint(e.http_status)
|
||||||
except:
|
except Exception:
|
||||||
#except e:
|
#except e:
|
||||||
continue
|
continue
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue