mirror of
https://github.com/JonasunderscoreJones/turbo-octo-potato.git
synced 2025-10-25 19:19:19 +02:00
Added adaptiation for top_lib
Adapted script to use this repo's library
This commit is contained in:
parent
f63dfb9504
commit
5db80dfdb6
1 changed files with 39 additions and 27 deletions
|
|
@ -1,25 +1,15 @@
|
||||||
import pylast, spotipy, sys, os, time
|
import pylast, spotipy, sys, os, time
|
||||||
from spotipy.oauth2 import SpotifyOAuth
|
from spotipy.oauth2 import SpotifyOAuth
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
import top_lib
|
||||||
|
|
||||||
# load .env file
|
# load .env file
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
# Define your Last.fm API credentials
|
|
||||||
LASTFM_API_KEY = os.getenv(' LASTFM_API_KEY')
|
|
||||||
LASTFM_API_SECRET = os.getenv('LASTFM_API_SECRET')
|
|
||||||
LASTFM_USERNAME = os.getenv('LASTFM_USERNAME')
|
|
||||||
LASTFM_PASSWORD_HASH = os.getenv('LASTFM_PASSWORD_HASH')
|
|
||||||
|
|
||||||
# Define your Spotify API credentials
|
|
||||||
SPOTIPY_CLIENT_ID = os.getenv('SPOTIPY_CLIENT_ID')
|
|
||||||
SPOTIPY_CLIENT_SECRET = os.getenv('SPOTIPY_CLIENT_SECRET')
|
|
||||||
SPOTIPY_REDIRECT_URI = os.getenv('SPOTIPY_REDIRECT_URI')
|
|
||||||
|
|
||||||
# Define your playlist IDs
|
# Define your playlist IDs
|
||||||
LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID')
|
LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID')
|
||||||
|
|
||||||
def progress_bar(current, total, last_time_stamp=time.time(), etastr=None):
|
def progress_bar(current, total, last_time_stamp=float, etastr=None):
|
||||||
current = total if current > total else current
|
current = total if current > total else current
|
||||||
this_timestamp = time.time()
|
this_timestamp = time.time()
|
||||||
width = os.get_terminal_size().columns
|
width = os.get_terminal_size().columns
|
||||||
|
|
@ -44,7 +34,7 @@ def verboseprint(message, end="\n"):
|
||||||
if VERBOSE_LOGGING:
|
if VERBOSE_LOGGING:
|
||||||
print(message, end=end)
|
print(message, end=end)
|
||||||
|
|
||||||
def handle_playlist_part_return(playlist_part, all_songs):
|
def handle_playlist_part_return(playlist_part:str, all_songs:list):
|
||||||
for item in playlist_part["items"]:
|
for item in playlist_part["items"]:
|
||||||
track_uri = item["track"]["uri"]
|
track_uri = item["track"]["uri"]
|
||||||
track_name = item["track"]["name"]
|
track_name = item["track"]["name"]
|
||||||
|
|
@ -55,10 +45,11 @@ def handle_playlist_part_return(playlist_part, all_songs):
|
||||||
|
|
||||||
def get_all_songs_from_playlist(playlist_id):
|
def get_all_songs_from_playlist(playlist_id):
|
||||||
verboseprint("Fetching songs from the liked songs playlist...")
|
verboseprint("Fetching songs from the liked songs playlist...")
|
||||||
|
progressbar = top_lib.Progressbar()
|
||||||
|
progressBarEtaManager = top_lib.ProgressBarEtaManager()
|
||||||
all_songs = []
|
all_songs = []
|
||||||
limit = 100 # Adjust the limit based on your needs
|
limit = 100
|
||||||
offset = 0
|
offset = 0
|
||||||
last_time_stamp = time.time()
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
playlist_part = sp.playlist_items(playlist_id, limit=limit, offset=offset)
|
playlist_part = sp.playlist_items(playlist_id, limit=limit, offset=offset)
|
||||||
|
|
@ -66,7 +57,9 @@ def get_all_songs_from_playlist(playlist_id):
|
||||||
break
|
break
|
||||||
all_songs = handle_playlist_part_return(playlist_part, all_songs)
|
all_songs = handle_playlist_part_return(playlist_part, all_songs)
|
||||||
|
|
||||||
progress_print, last_time_stamp = progress_bar(offset+limit, playlist_part["total"], last_time_stamp)
|
progressBarEtaManager.now()
|
||||||
|
progressbar.setTotal(playlist_part["total"])
|
||||||
|
progress_print = progressbar.buildSnapshot(offset+limit, progressBarEtaManager.getAvgEta()/limit)
|
||||||
verboseprint(progress_print, end="\r")
|
verboseprint(progress_print, end="\r")
|
||||||
|
|
||||||
offset += limit
|
offset += limit
|
||||||
|
|
@ -76,10 +69,11 @@ def get_all_songs_from_playlist(playlist_id):
|
||||||
def get_all_liked_songs():
|
def get_all_liked_songs():
|
||||||
verboseprint("Fetching liked songs...")
|
verboseprint("Fetching liked songs...")
|
||||||
verboseprint("This may take a while... (the API only allows for 50 songs per request for the liked songs)")
|
verboseprint("This may take a while... (the API only allows for 50 songs per request for the liked songs)")
|
||||||
|
progressbar = top_lib.Progressbar()
|
||||||
|
progressBarEtaManager = top_lib.ProgressBarEtaManager()
|
||||||
all_liked_songs = []
|
all_liked_songs = []
|
||||||
limit = 50 # Adjust the limit based on your needs
|
limit = 50
|
||||||
offset = 0
|
offset = 0
|
||||||
last_time_stamp = time.time()
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
liked_songs_chunk = sp.current_user_saved_tracks(limit=limit, offset=offset)
|
liked_songs_chunk = sp.current_user_saved_tracks(limit=limit, offset=offset)
|
||||||
|
|
@ -87,7 +81,9 @@ def get_all_liked_songs():
|
||||||
break
|
break
|
||||||
all_liked_songs = handle_playlist_part_return(liked_songs_chunk, all_liked_songs)
|
all_liked_songs = handle_playlist_part_return(liked_songs_chunk, all_liked_songs)
|
||||||
|
|
||||||
progress_print, last_time_stamp = progress_bar(offset+limit, liked_songs_chunk["total"], last_time_stamp)
|
progressBarEtaManager.now(),
|
||||||
|
progressbar.setTotal(liked_songs_chunk["total"])
|
||||||
|
progress_print = progressbar.buildSnapshot(offset+limit, progressBarEtaManager.getAvgEta()/limit)
|
||||||
verboseprint(progress_print, end="\r")
|
verboseprint(progress_print, end="\r")
|
||||||
|
|
||||||
offset += limit
|
offset += limit
|
||||||
|
|
@ -119,13 +115,17 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
verboseprint("Authenticating Spotify...")
|
verboseprint("Authenticating Spotify...")
|
||||||
|
|
||||||
|
authenticator = top_lib.Auth(verbose=VERBOSE_LOGGING)
|
||||||
|
|
||||||
# Create a Spotipy instance with authentication
|
# Create a Spotipy instance with authentication
|
||||||
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=SPOTIPY_CLIENT_ID, client_secret=SPOTIPY_CLIENT_SECRET, redirect_uri=SPOTIPY_REDIRECT_URI, scope="user-library-read playlist-modify-public playlist-modify-private"))
|
#sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=SPOTIPY_CLIENT_ID, client_secret=SPOTIPY_CLIENT_SECRET, redirect_uri=SPOTIPY_REDIRECT_URI, scope="user-library-read playlist-modify-public playlist-modify-private"))
|
||||||
|
sp = authenticator.newSpotifyauth(scope="user-library-read playlist-modify-public playlist-modify-private")
|
||||||
|
|
||||||
verboseprint("Authenticating Last.fm...")
|
verboseprint("Authenticating Last.fm...")
|
||||||
|
|
||||||
# Set up Last.fm network
|
# Set up Last.fm network
|
||||||
network = pylast.LastFMNetwork(api_key=LASTFM_API_KEY, api_secret=LASTFM_API_SECRET, username=LASTFM_USERNAME, password_hash=LASTFM_PASSWORD_HASH)
|
#network = pylast.LastFMNetwork(api_key=LASTFM_API_KEY, api_secret=LASTFM_API_SECRET, username=LASTFM_USERNAME, password_hash=LASTFM_PASSWORD_HASH)
|
||||||
|
network = authenticator.newLastfmauth()
|
||||||
|
|
||||||
# Main loop for syncing liked songs
|
# Main loop for syncing liked songs
|
||||||
liked_songs = get_all_liked_songs()
|
liked_songs = get_all_liked_songs()
|
||||||
|
|
@ -142,6 +142,9 @@ if __name__ == "__main__":
|
||||||
verboseprint(f"Skipping the first {SKIPSONGS} songs...")
|
verboseprint(f"Skipping the first {SKIPSONGS} songs...")
|
||||||
tracknr = 0
|
tracknr = 0
|
||||||
last_time_stamp = time.time()
|
last_time_stamp = time.time()
|
||||||
|
progressbar = top_lib.Progressbar()
|
||||||
|
progressbar.setTotal(len(liked_songs))
|
||||||
|
progressBarEtaManager = top_lib.ProgressBarEtaManager()
|
||||||
for track_uri, track_name, artist_name in liked_songs:
|
for track_uri, track_name, artist_name in liked_songs:
|
||||||
tracknr += 1
|
tracknr += 1
|
||||||
|
|
||||||
|
|
@ -153,13 +156,22 @@ if __name__ == "__main__":
|
||||||
verboseprint(' '* os.get_terminal_size().columns, end="\r")
|
verboseprint(' '* os.get_terminal_size().columns, end="\r")
|
||||||
if not is_track_in_playlist(liked_songs_playlist_songs, track_uri):
|
if not is_track_in_playlist(liked_songs_playlist_songs, track_uri):
|
||||||
verboseprint("[" + f"%{4 + len(str(len(liked_songs)))*2}s" % (f"{tracknr}/{len(liked_songs)}|+]") + "%30.32s %s" % (track['artists'][0]['name'], track['name']))
|
verboseprint("[" + f"%{4 + len(str(len(liked_songs)))*2}s" % (f"{tracknr}/{len(liked_songs)}|+]") + "%30.32s %s" % (track['artists'][0]['name'], track['name']))
|
||||||
progress_print, last_time_stamp = progress_bar(tracknr, len(liked_songs), etastr=str(round((((int(len(liked_songs))-tracknr)*0.75)/60)))+"min")
|
#progress_print, last_time_stamp = progress_bar(tracknr, len(liked_songs), etastr=str(round((((int(len(liked_songs))-tracknr)*0.75)/60)))+"min")
|
||||||
verboseprint(progress_print, end="\r")
|
|
||||||
add_track_to_playlist(LIKEDSONGPLAYLIST_ID, track_uri)
|
|
||||||
else:
|
else:
|
||||||
verboseprint("[" + f"%{2 + len(str(len(liked_songs)))*2}s" % (f"{tracknr}/{len(liked_songs)}]") + "%32.32s %s" % (track['artists'][0]['name'], track['name']))
|
verboseprint("[" + f"%{2 + len(str(len(liked_songs)))*2}s" % (f"{tracknr}/{len(liked_songs)}]") + "%32.32s %s" % (track['artists'][0]['name'], track['name']))
|
||||||
progress_print, last_time_stamp = progress_bar(tracknr, len(liked_songs), etastr=str(round((((int(len(liked_songs))-tracknr)*0.75)/60)))+"min")
|
#progress_print, last_time_stamp = progress_bar(tracknr, len(liked_songs), etastr=str(round((((int(len(liked_songs))-tracknr)*0.75)/60)))+"min")
|
||||||
verboseprint(progress_print, end="\r")
|
#progressBarEtaManager.now()
|
||||||
|
#progress_print = progressbar.buildSnapshot(tracknr, progressBarEtaManager.getAvgEta())
|
||||||
|
#verboseprint(progress_print, end="\r")
|
||||||
|
|
||||||
|
progressBarEtaManager.now()
|
||||||
|
#print(progressBarEtaManager.getDurations())
|
||||||
|
#print(tracknr)
|
||||||
|
progress_print = progressbar.buildSnapshot(tracknr, progressBarEtaManager.getAvgEta())
|
||||||
|
#print(progressBarEtaManager.getDurations())
|
||||||
|
verboseprint(progress_print, end="\r")
|
||||||
|
add_track_to_playlist(LIKEDSONGPLAYLIST_ID, track_uri)
|
||||||
|
|
||||||
return last_time_stamp
|
return last_time_stamp
|
||||||
# Loop until the API call succeeds
|
# Loop until the API call succeeds
|
||||||
|
|
@ -177,5 +189,5 @@ if __name__ == "__main__":
|
||||||
else:
|
else:
|
||||||
verboseprint(e.http_status)
|
verboseprint(e.http_status)
|
||||||
except:
|
except:
|
||||||
# except e:
|
#except e:
|
||||||
continue
|
continue
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue