Compare commits

...

10 commits

8 changed files with 454 additions and 115 deletions

92
github_forgejo_syncer.py Normal file
View file

@ -0,0 +1,92 @@
import requests
import os
import dotenv
# Load the environment variables
dotenv.load_dotenv()
# Configuration: Set your GitHub and Forgejo credentials and URLs
GITHUB_USERNAME = os.getenv('GITHUB_USERNAME')
GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')
FORGEJO_USERNAME = os.getenv('FORGEJO_USERNAME')
FORGEJO_API_URL = os.getenv('FORGEJO_API_URL')
FORGEJO_TOKEN = os.getenv('FORGEJO_TOKEN')
REPO_BLACKLIST = ["ZtereoMUSIC", "nicer-skies", "epr_grader"]
# Fetch repositories from GitHub
def get_github_repositories():
github_url = f'https://api.github.com/users/{GITHUB_USERNAME}/repos'
headers = {'Authorization': f'token {GITHUB_TOKEN}'}
repos = []
page = 1
while True:
response = requests.get(github_url, headers=headers, params={'page': page, 'per_page': 100})
if response.status_code != 200:
print(f"Error fetching GitHub repositories: {response.text}")
break
data = response.json()
if not data: # No more repositories
break
repos.extend(data)
page += 1
return repos
# Check if a repository exists on Forgejo
def check_forgejo_repo_exists(repo_name):
forgejo_url = f'{FORGEJO_API_URL}/repos/{FORGEJO_USERNAME}/{repo_name}'
headers = {'Authorization': f'token {FORGEJO_TOKEN}'}
response = requests.get(forgejo_url, headers=headers)
if response.status_code == 200:
return True # Repo exists
elif response.status_code == 404:
return False # Repo does not exist
else:
print(f"Error checking repository on Forgejo: {response.text}")
return False
# Create a mirror repository on Forgejo
def create_forgejo_repo_mirror(github_repo):
forgejo_url = f'{FORGEJO_API_URL}/repos/migrate'
headers = {'Authorization': f'token {FORGEJO_TOKEN}', 'Content-Type': 'application/json'}
# Prepare the payload
payload = {
'clone_addr': github_repo['clone_url'],
'repo_name': github_repo['name'],
'private': github_repo['private'],
'mirror': True,
'description': github_repo.get('description', ''),
}
response = requests.post(forgejo_url, json=payload, headers=headers)
if response.status_code == 201:
print(f"Created mirror for {github_repo['name']}")
else:
print(f"Error creating mirror for {github_repo['name']}: {response.text}")
# Main script
def main():
print("Fetching GitHub repositories...")
github_repos = get_github_repositories()
for github_repo in github_repos:
repo_name = github_repo['name']
print(f"Checking if {repo_name} exists on Forgejo...")
if repo_name in REPO_BLACKLIST:
print(f"Repository {repo_name} is blacklisted. Skipping.")
elif not check_forgejo_repo_exists(repo_name):
print(f"Repository {repo_name} does not exist on Forgejo. Creating mirror...")
create_forgejo_repo_mirror(github_repo)
else:
print(f"Repository {repo_name} already exists on Forgejo. Skipping.")
if __name__ == '__main__':
main()

View file

@ -0,0 +1,30 @@
import pyautogui
import time
# Constant delay in seconds
DELAY = 0.125
DELAY = 0
# Define the main function to perform the clicks
def perform_clicks():
while True:
# Click on pixel (300, 150)
pyautogui.click(300, 150)
time.sleep(DELAY)
# Click on pixel (960, 530)
pyautogui.click(960, 530)
time.sleep(DELAY)
# Click on pixel (960, 530) again
pyautogui.click(960, 530)
time.sleep(DELAY)
# Click on pixel (960, 555)
pyautogui.click(960, 555)
time.sleep(DELAY)
# Start the clicking loop
if __name__ == "__main__":
time.sleep(5)
perform_clicks()

View file

@ -247,87 +247,95 @@ def fetch_monthly_page(wiki_link, subreddit_name):
print(f"Error fetching Reddit wiki page: {e}")
return None
UPLOAD_TO_CDN = True if "--cdn" in sys.argv else False
SEND_WEBHOOK = False if "--no-webhook" in sys.argv else False if "-nwh" in sys.argv else True
def main():
UPLOAD_TO_CDN = True if "--cdn" in sys.argv else False
SEND_WEBHOOK = False if "--no-webhook" in sys.argv else False if "-nwh" in sys.argv else True
# reddit infos
subreddit_name = "kpop"
wiki_page_name = "upcoming-releases/archive"
# because im lazy
global reddit
global progress
# reddit instance
dotenv.load_dotenv()
# reddit infos
subreddit_name = "kpop"
wiki_page_name = "upcoming-releases/archive"
reddit = praw.Reddit(
client_id=os.getenv('REDDIT_CLIENT_ID'),
client_secret=os.getenv('REDDIT_CLIENT_SECRET'),
user_agent=os.getenv('REDDIT_USER_AGENT')
)
# reddit instance
dotenv.load_dotenv()
# fetch subreddit
print("Fetching Months...")
reddit = praw.Reddit(
client_id=os.getenv('REDDIT_CLIENT_ID'),
client_secret=os.getenv('REDDIT_CLIENT_SECRET'),
user_agent=os.getenv('REDDIT_USER_AGENT')
)
try:
subreddit = reddit.subreddit(subreddit_name)
except praw.exceptions.PRAWException as e:
print(f"Error fetching subreddit: {e}")
# fetch subreddit
print("Fetching Months...")
# fetch wiki page
content = fetch_main_reddit_wiki_page(subreddit_name, wiki_page_name)
try:
subreddit = reddit.subreddit(subreddit_name)
except praw.exceptions.PRAWException as e:
print(f"Error fetching subreddit: {e}")
print("Done!")
# fetch wiki page
content = fetch_main_reddit_wiki_page(subreddit_name, wiki_page_name)
if content:
print("Done!")
json_data = []
if content:
for wiki_link in content[::-1]:
json_data = []
progress = int(content[::-1].index(wiki_link)+1/len(content)*100)
for wiki_link in content[::-1]:
if progress < 10:
progress = " " + str(progress)
elif progress < 100:
progress = " " + str(progress)
progress = int(content[::-1].index(wiki_link)+1/len(content)*100)
#print(" ==>", end="\n")
print(f"[{progress}%] Fetching monthly page: " + wiki_link, end="\r")
if progress < 10:
progress = " " + str(progress)
elif progress < 100:
progress = " " + str(progress)
# sleep for 2 seconds to avoid getting rate limited
# reddit api is awful
time.sleep(2)
#print(" ==>", end="\n")
print(f"[{progress}%] Fetching monthly page: " + wiki_link, end="\r")
try:
# fetch the monthly page and parse it
json_data += fetch_monthly_page(wiki_link, subreddit_name)
except Exception as e:
# write json_data to file
with open(f"{subreddit_name}_upcoming_releases-CANCELED.json", "w") as f:
f.write(json.dumps(json_data, indent=4))
print("Error fetching monthly page: " + wiki_link)
print(e)
exit(1)
# sleep for 2 seconds to avoid getting rate limited
# reddit api is awful
# time.sleep(2)
#print(f"[{progress}%] Parsed monthly page: " + wiki_link + " ", end="\r")
try:
# fetch the monthly page and parse it
json_data += fetch_monthly_page(wiki_link, subreddit_name)
except Exception as e:
# write json_data to file
with open(f"{subreddit_name}_upcoming_releases-CANCELED.json", "w") as f:
f.write(json.dumps(json_data, indent=4))
print("Error fetching monthly page: " + wiki_link)
print(e)
exit(1)
# add a first element to the list that holds the date of the last update
json_data.insert(0, {"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) + " UTC"})
#print(f"[{progress}%] Parsed monthly page: " + wiki_link + " ", end="\r")
# add a first element to the list that holds the date of the last update
json_data.insert(0, {"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) + " UTC"})
# save json_data to file
with open(f"rkpop_data.json", "w") as f:
f.write(json.dumps(json_data, indent=4))
# save json_data to file
with open(f"rkpop_data.json", "w") as f:
f.write(json.dumps(json_data, indent=4))
print("Fetched", len(json_data) - 1, "entries.")
print("Fetched", len(json_data) - 1, "entries.")
cdn_upload_cmd = "rclone copy rkpop_data.json cdn:cdn/api/kcomebacks/"
cdn_upload_cmd = "rclone copy rkpop_data.json cdn:cdn/api/kcomebacks/"
if UPLOAD_TO_CDN:
print("Uploading...")
os.system(cdn_upload_cmd)
elif input("Upload to cdn? [Y/n]") in ["Y", "y", ""]:
print("Uploading...")
os.system(cdn_upload_cmd)
if UPLOAD_TO_CDN:
print("Uploading...")
os.system(cdn_upload_cmd)
elif input("Upload to cdn? [Y/n]") in ["Y", "y", ""]:
print("Uploading...")
os.system(cdn_upload_cmd)
if SEND_WEBHOOK:
rpop_webhook.send_webhook()
if SEND_WEBHOOK:
rpop_webhook.send_webhook()
if __name__ == "__main__":
main()

5
run_likedsongsync2.py Normal file
View file

@ -0,0 +1,5 @@
# import and run spotify_scripts/likedsongsync2.py
from spotify_scripts.likedsongsync2 import main
def run():
main()

165
script_interval_runner.py Normal file
View file

@ -0,0 +1,165 @@
import json
import time
import importlib
import os
import logging
import threading
import sys
import requests
# Set up logging
logging.basicConfig(level=logging.INFO)
class ScriptConfig:
def __init__(self, name, filename, method, interval, retry_on_error, launch_flags):
self.name = name
self.filename = filename
self.method = method
self.interval = interval
self.retry_on_error = retry_on_error
self.launch_flags = launch_flags # This will store the flags
self.failure_count = 0 # Track the failure count
def __repr__(self):
return f"ScriptConfig(name={self.name}, filename={self.filename}, method={self.method}, interval={self.interval}, retry_on_error={self.retry_on_error}, launch_flags={self.launch_flags}, failure_count={self.failure_count})"
class ScriptRunner:
def __init__(self, config_file):
self.scripts = []
self.load_config(config_file)
# Retrieve the Discord webhook URLs from the environment variables
self.logs_webhook_url = os.getenv('SCRIPT_LOGS_DISCORD_WEBHOOK_URL')
self.errors_webhook_url = os.getenv('SCRIPT_ERROR_DISCORD_WEBHOOK_URL')
def load_config(self, config_file):
"""Load script configurations from a JSON file."""
if not os.path.exists(config_file):
logging.error(f"Config file '{config_file}' not found.")
return
with open(config_file, 'r') as f:
data = json.load(f)
for item in data:
# Read launch_flags as a list (it could be an empty list if no flags are present)
launch_flags = item.get('launch_flags', [])
script_config = ScriptConfig(
name=item['name'],
filename=item['filename'],
method=item['method'],
interval=item['interval'],
retry_on_error=item['retry_on_error'].lower() == 'true',
launch_flags=launch_flags # Store launch flags
)
self.scripts.append(script_config)
def send_to_discord(self, webhook_url, message):
"""Send a message to a Discord webhook."""
if webhook_url:
try:
payload = {'content': message}
requests.post(webhook_url, json=payload)
except Exception as e:
logging.error(f"Failed to send message to Discord: {e}")
def run_script(self, script_config):
"""Run the script as per configuration."""
script_path = os.path.join(os.getcwd(), script_config.filename)
if not os.path.exists(script_path):
logging.error(f"Script file '{script_config.filename}' not found in the current directory.")
return
try:
# Import the script as a module
script_name = script_config.filename[:-3] # Strip ".py" from the filename
module = importlib.import_module(script_name)
# Temporarily modify sys.argv to simulate command-line arguments
original_argv = sys.argv
sys.argv = [script_name] + script_config.launch_flags # Simulate command-line args
# Check if method exists
if hasattr(module, script_config.method):
method = getattr(module, script_config.method)
retry_count = 0
while True:
try:
logging.info(f"Running {script_config.name} with flags: {script_config.launch_flags}...")
# Call the method without needing to pass arguments (sys.argv is used)
method()
# Log successful run to Discord
if self.logs_webhook_url:
self.send_to_discord(self.logs_webhook_url, f"Successfully ran {script_config.name}")
logging.info(f"Completed {script_config.name}.")
time.sleep(script_config.interval) # Sleep after execution before next run
script_config.failure_count = 0 # Reset failure count after a successful run
except Exception as e:
logging.error(f"Error running {script_config.name}: {e}")
script_config.failure_count += 1
if script_config.failure_count >= 3: # Notify on third failure
error_message = f"{script_config.name} has failed 3 times. Last error: {e}"
if self.errors_webhook_url:
self.send_to_discord(self.errors_webhook_url, error_message)
logging.error(f"Max retry attempts reached for {script_config.name}. Error sent to Discord.")
return 500 # return error code
if script_config.retry_on_error:
retry_count += 1
if retry_count >= 3: # Retry 3 times
logging.error(f"Max retry attempts reached for {script_config.name}.")
return 500
logging.info(f"Retrying {script_config.name}... ({retry_count}/3)")
else:
break
else:
logging.error(f"Method '{script_config.method}' not found in {script_config.filename}")
# Restore the original sys.argv
sys.argv = original_argv
except Exception as e:
logging.error(f"Failed to run script {script_config.filename}: {e}")
return 404 # return not found if script not found
return 200 # return success
def run_script_in_thread(self, script_config):
"""Run the script in a separate thread with an endless loop."""
def target():
error_count = 0
while error_count < 3:
error_code = self.run_script(script_config)
if error_code == 404:
error_count += 1
time.sleep(5)
elif error_code == 500:
break
else:
error_count = 0 # Reset error count on success
logging.error(f"Script {script_config.name} has failed 3 times. Stopping execution.")
thread = threading.Thread(target=target)
thread.daemon = True # Allow the thread to exit when the main program exits
thread.start()
return thread
def run_all_scripts(self):
"""Start all scripts concurrently in separate threads with an endless loop."""
threads = []
for script_config in self.scripts:
thread = self.run_script_in_thread(script_config)
threads.append(thread)
# The main thread only needs to start the loops, it doesn't need to join since the threads are infinite
logging.info("All scripts have been started and are running in their respective threads.")
# Main execution
if __name__ == "__main__":
config_file = 'scripts_config.json'
runner = ScriptRunner(config_file)
runner.run_all_scripts()
# Keep the main program running (this is important for the daemon threads to keep running)
while True:
time.sleep(1)

26
scripts_config.json Normal file
View file

@ -0,0 +1,26 @@
[
{
"name": "Kcomebacks Sync",
"filename": "rpopfetch.py",
"method": "main",
"interval": 86400,
"retry_on_error": "true",
"launch_flags": ["--cdn"]
},
{
"name": "Project Sync",
"filename": "update_projects.py",
"method": "main",
"interval": 360,
"retry_on_error": "false",
"launch_flags": ["--cdn"]
},
{
"name": "Likedsongsync 2",
"filename": "run_likedsongsync2.py",
"method": "run",
"interval": 10800,
"retry_on_error": "true",
"launch_flags": [""]
}
]

View file

@ -9,11 +9,7 @@ import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import top_lib
# load .env file
load_dotenv()
# Define your playlist IDs
LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID')
def progress_bar(current, total, last_time_stamp=float, etastr=None):
'''A function to print a progress bar to the terminal.
@ -155,8 +151,16 @@ def add_track_to_playlist(playlist_id, track_uri):
track_uri: The URI of the track to add to the playlist'''
sp.playlist_add_items(playlist_id, [track_uri])
def main():
# load .env file
load_dotenv()
if __name__ == "__main__":
# because im lazy
global VERBOSE_LOGGING
global sp
# Define your playlist IDs
LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID')
# Parse command-line arguments
VERBOSE_LOGGING = "-v" in sys.argv or "--verbose" in sys.argv
@ -258,3 +262,7 @@ if __name__ == "__main__":
except Exception:
#except e:
continue
if __name__ == "__main__":
main()

View file

@ -101,65 +101,70 @@ def get_languagages(repo, access_token):
print(f"[{repo}] Error fetching languages ", end="\n")
return None
# Path to the projects.json file
projects_json_path = os.path.expanduser("~/.cache/gh-projects/projects.json")
# create the directory if it doesn't exist
os.makedirs(os.path.dirname(projects_json_path), exist_ok=True)
def main():
# Path to the projects.json file
projects_json_path = os.path.expanduser("~/.cache/gh-projects/projects.json")
# create the directory if it doesn't exist
os.makedirs(os.path.dirname(projects_json_path), exist_ok=True)
if not DONTDOWNLOAD:
# fetch the projects.json file from https://cdn.jonasjones.dev/api/projects/projects.json
projects_json_url = "https://cdn.jonasjones.dev/api/projects/projects.json"
projects_json = requests.get(projects_json_url)
with open(projects_json_path, "wb") as file:
file.write(projects_json.content)
if not DONTDOWNLOAD:
# fetch the projects.json file from https://cdn.jonasjones.dev/api/projects/projects.json
projects_json_url = "https://cdn.jonasjones.dev/api/projects/projects.json"
projects_json = requests.get(projects_json_url)
with open(projects_json_path, "wb") as file:
file.write(projects_json.content)
verboseprint(f"Fetched projects.json file")
elif not os.path.exists("~/.cache/gh-projects/projects.json"):
FileNotFoundError("File 'projects.json' not found. Cannot proceed without \
downloading it. Remove '-dd' or '--dontdownload' from the launch arguments.")
else:
print("Skipping download of 'projects.json'")
verboseprint(f"Fetched projects.json file")
elif not os.path.exists("~/.cache/gh-projects/projects.json"):
FileNotFoundError("File 'projects.json' not found. Cannot proceed without \
downloading it. Remove '-dd' or '--dontdownload' from the launch arguments.")
else:
print("Skipping download of 'projects.json'")
# Load the existing projects.json file
with open(projects_json_path, "r") as file:
projects_data = json.load(file)
# Load the existing projects.json file
with open(projects_json_path, "r") as file:
projects_data = json.load(file)
if not DONTUPDATEGH:
print("Fetching Repo data...")
if not DONTUPDATEGH:
print("Fetching Repo data...")
# Update the last_update (Unix timestamp) for each project
for project in projects_data:
gh_api = project.get("gh_api")
if gh_api:
last_commit_timestamp = get_last_commit_timestamp(gh_api, GITHUB_API_TOKEN)
last_release_version = get_last_release_version(gh_api, GITHUB_API_TOKEN)
if last_commit_timestamp:
project["last_update"] = last_commit_timestamp
else:
project["last_update"] = 0
if last_release_version:
project["version"] = last_release_version.replace("v", "")
languages = get_languagages(gh_api, GITHUB_API_TOKEN)
if languages:
project["languages"] = languages
else:
print("Skipping Github updates...")
# Update the last_update (Unix timestamp) for each project
for project in projects_data:
gh_api = project.get("gh_api")
if gh_api:
last_commit_timestamp = get_last_commit_timestamp(gh_api, GITHUB_API_TOKEN)
last_release_version = get_last_release_version(gh_api, GITHUB_API_TOKEN)
if last_commit_timestamp:
project["last_update"] = last_commit_timestamp
else:
project["last_update"] = 0
if last_release_version:
project["version"] = last_release_version.replace("v", "")
languages = get_languagages(gh_api, GITHUB_API_TOKEN)
if languages:
project["languages"] = languages
else:
print("Skipping Github updates...")
# remove first element
projects_data.pop(0)
# remove first element
projects_data.pop(0)
# sort projects alphabetically
projects_data = sorted(projects_data, key=lambda x: x['last_update'], reverse=True)
# sort projects alphabetically
projects_data = sorted(projects_data, key=lambda x: x['last_update'], reverse=True)
# add a first element to the list that holds the date of the last update
projects_data.insert(0, {"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) + " UTC"})
# add a first element to the list that holds the date of the last update
projects_data.insert(0, {"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) + " UTC"})
# Save the updated data back to the projects.json file
with open(projects_json_path, "w") as file:
json.dump(projects_data, file, indent=2)
# Save the updated data back to the projects.json file
with open(projects_json_path, "w") as file:
json.dump(projects_data, file, indent=2)
print("Updated projects.json\nUploading to cdn...")
print("Updated projects.json\nUploading to cdn...")
os.system(f"rclone copy {projects_json_path} cdn:cdn/api/projects/")
os.system(f"rclone copy {projects_json_path} cdn:cdn/api/projects/")
print("Uploaded projects.json to cdn")
print("Uploaded projects.json to cdn")
if __name__ == "__main__":
main()