Compare commits

...

10 commits

8 changed files with 454 additions and 115 deletions

92
github_forgejo_syncer.py Normal file
View file

@ -0,0 +1,92 @@
import requests
import os
import dotenv
# Load the environment variables
dotenv.load_dotenv()
# Configuration: Set your GitHub and Forgejo credentials and URLs
GITHUB_USERNAME = os.getenv('GITHUB_USERNAME')
GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')
FORGEJO_USERNAME = os.getenv('FORGEJO_USERNAME')
FORGEJO_API_URL = os.getenv('FORGEJO_API_URL')
FORGEJO_TOKEN = os.getenv('FORGEJO_TOKEN')
REPO_BLACKLIST = ["ZtereoMUSIC", "nicer-skies", "epr_grader"]
# Fetch repositories from GitHub
def get_github_repositories():
github_url = f'https://api.github.com/users/{GITHUB_USERNAME}/repos'
headers = {'Authorization': f'token {GITHUB_TOKEN}'}
repos = []
page = 1
while True:
response = requests.get(github_url, headers=headers, params={'page': page, 'per_page': 100})
if response.status_code != 200:
print(f"Error fetching GitHub repositories: {response.text}")
break
data = response.json()
if not data: # No more repositories
break
repos.extend(data)
page += 1
return repos
# Check if a repository exists on Forgejo
def check_forgejo_repo_exists(repo_name):
forgejo_url = f'{FORGEJO_API_URL}/repos/{FORGEJO_USERNAME}/{repo_name}'
headers = {'Authorization': f'token {FORGEJO_TOKEN}'}
response = requests.get(forgejo_url, headers=headers)
if response.status_code == 200:
return True # Repo exists
elif response.status_code == 404:
return False # Repo does not exist
else:
print(f"Error checking repository on Forgejo: {response.text}")
return False
# Create a mirror repository on Forgejo
def create_forgejo_repo_mirror(github_repo):
forgejo_url = f'{FORGEJO_API_URL}/repos/migrate'
headers = {'Authorization': f'token {FORGEJO_TOKEN}', 'Content-Type': 'application/json'}
# Prepare the payload
payload = {
'clone_addr': github_repo['clone_url'],
'repo_name': github_repo['name'],
'private': github_repo['private'],
'mirror': True,
'description': github_repo.get('description', ''),
}
response = requests.post(forgejo_url, json=payload, headers=headers)
if response.status_code == 201:
print(f"Created mirror for {github_repo['name']}")
else:
print(f"Error creating mirror for {github_repo['name']}: {response.text}")
# Main script
def main():
print("Fetching GitHub repositories...")
github_repos = get_github_repositories()
for github_repo in github_repos:
repo_name = github_repo['name']
print(f"Checking if {repo_name} exists on Forgejo...")
if repo_name in REPO_BLACKLIST:
print(f"Repository {repo_name} is blacklisted. Skipping.")
elif not check_forgejo_repo_exists(repo_name):
print(f"Repository {repo_name} does not exist on Forgejo. Creating mirror...")
create_forgejo_repo_mirror(github_repo)
else:
print(f"Repository {repo_name} already exists on Forgejo. Skipping.")
if __name__ == '__main__':
main()

View file

@ -0,0 +1,30 @@
import pyautogui
import time
# Constant delay in seconds
DELAY = 0.125
DELAY = 0
# Define the main function to perform the clicks
def perform_clicks():
while True:
# Click on pixel (300, 150)
pyautogui.click(300, 150)
time.sleep(DELAY)
# Click on pixel (960, 530)
pyautogui.click(960, 530)
time.sleep(DELAY)
# Click on pixel (960, 530) again
pyautogui.click(960, 530)
time.sleep(DELAY)
# Click on pixel (960, 555)
pyautogui.click(960, 555)
time.sleep(DELAY)
# Start the clicking loop
if __name__ == "__main__":
time.sleep(5)
perform_clicks()

View file

@ -247,36 +247,41 @@ def fetch_monthly_page(wiki_link, subreddit_name):
print(f"Error fetching Reddit wiki page: {e}") print(f"Error fetching Reddit wiki page: {e}")
return None return None
UPLOAD_TO_CDN = True if "--cdn" in sys.argv else False def main():
SEND_WEBHOOK = False if "--no-webhook" in sys.argv else False if "-nwh" in sys.argv else True UPLOAD_TO_CDN = True if "--cdn" in sys.argv else False
SEND_WEBHOOK = False if "--no-webhook" in sys.argv else False if "-nwh" in sys.argv else True
# reddit infos # because im lazy
subreddit_name = "kpop" global reddit
wiki_page_name = "upcoming-releases/archive" global progress
# reddit instance # reddit infos
dotenv.load_dotenv() subreddit_name = "kpop"
wiki_page_name = "upcoming-releases/archive"
reddit = praw.Reddit( # reddit instance
dotenv.load_dotenv()
reddit = praw.Reddit(
client_id=os.getenv('REDDIT_CLIENT_ID'), client_id=os.getenv('REDDIT_CLIENT_ID'),
client_secret=os.getenv('REDDIT_CLIENT_SECRET'), client_secret=os.getenv('REDDIT_CLIENT_SECRET'),
user_agent=os.getenv('REDDIT_USER_AGENT') user_agent=os.getenv('REDDIT_USER_AGENT')
) )
# fetch subreddit # fetch subreddit
print("Fetching Months...") print("Fetching Months...")
try: try:
subreddit = reddit.subreddit(subreddit_name) subreddit = reddit.subreddit(subreddit_name)
except praw.exceptions.PRAWException as e: except praw.exceptions.PRAWException as e:
print(f"Error fetching subreddit: {e}") print(f"Error fetching subreddit: {e}")
# fetch wiki page # fetch wiki page
content = fetch_main_reddit_wiki_page(subreddit_name, wiki_page_name) content = fetch_main_reddit_wiki_page(subreddit_name, wiki_page_name)
print("Done!") print("Done!")
if content: if content:
json_data = [] json_data = []
@ -294,7 +299,7 @@ if content:
# sleep for 2 seconds to avoid getting rate limited # sleep for 2 seconds to avoid getting rate limited
# reddit api is awful # reddit api is awful
time.sleep(2) # time.sleep(2)
try: try:
# fetch the monthly page and parse it # fetch the monthly page and parse it
@ -329,5 +334,8 @@ if content:
os.system(cdn_upload_cmd) os.system(cdn_upload_cmd)
if SEND_WEBHOOK: if SEND_WEBHOOK:
rpop_webhook.send_webhook() rpop_webhook.send_webhook()
if __name__ == "__main__":
main()

5
run_likedsongsync2.py Normal file
View file

@ -0,0 +1,5 @@
# import and run spotify_scripts/likedsongsync2.py
from spotify_scripts.likedsongsync2 import main
def run():
main()

165
script_interval_runner.py Normal file
View file

@ -0,0 +1,165 @@
import json
import time
import importlib
import os
import logging
import threading
import sys
import requests
# Set up logging
logging.basicConfig(level=logging.INFO)
class ScriptConfig:
def __init__(self, name, filename, method, interval, retry_on_error, launch_flags):
self.name = name
self.filename = filename
self.method = method
self.interval = interval
self.retry_on_error = retry_on_error
self.launch_flags = launch_flags # This will store the flags
self.failure_count = 0 # Track the failure count
def __repr__(self):
return f"ScriptConfig(name={self.name}, filename={self.filename}, method={self.method}, interval={self.interval}, retry_on_error={self.retry_on_error}, launch_flags={self.launch_flags}, failure_count={self.failure_count})"
class ScriptRunner:
def __init__(self, config_file):
self.scripts = []
self.load_config(config_file)
# Retrieve the Discord webhook URLs from the environment variables
self.logs_webhook_url = os.getenv('SCRIPT_LOGS_DISCORD_WEBHOOK_URL')
self.errors_webhook_url = os.getenv('SCRIPT_ERROR_DISCORD_WEBHOOK_URL')
def load_config(self, config_file):
"""Load script configurations from a JSON file."""
if not os.path.exists(config_file):
logging.error(f"Config file '{config_file}' not found.")
return
with open(config_file, 'r') as f:
data = json.load(f)
for item in data:
# Read launch_flags as a list (it could be an empty list if no flags are present)
launch_flags = item.get('launch_flags', [])
script_config = ScriptConfig(
name=item['name'],
filename=item['filename'],
method=item['method'],
interval=item['interval'],
retry_on_error=item['retry_on_error'].lower() == 'true',
launch_flags=launch_flags # Store launch flags
)
self.scripts.append(script_config)
def send_to_discord(self, webhook_url, message):
"""Send a message to a Discord webhook."""
if webhook_url:
try:
payload = {'content': message}
requests.post(webhook_url, json=payload)
except Exception as e:
logging.error(f"Failed to send message to Discord: {e}")
def run_script(self, script_config):
"""Run the script as per configuration."""
script_path = os.path.join(os.getcwd(), script_config.filename)
if not os.path.exists(script_path):
logging.error(f"Script file '{script_config.filename}' not found in the current directory.")
return
try:
# Import the script as a module
script_name = script_config.filename[:-3] # Strip ".py" from the filename
module = importlib.import_module(script_name)
# Temporarily modify sys.argv to simulate command-line arguments
original_argv = sys.argv
sys.argv = [script_name] + script_config.launch_flags # Simulate command-line args
# Check if method exists
if hasattr(module, script_config.method):
method = getattr(module, script_config.method)
retry_count = 0
while True:
try:
logging.info(f"Running {script_config.name} with flags: {script_config.launch_flags}...")
# Call the method without needing to pass arguments (sys.argv is used)
method()
# Log successful run to Discord
if self.logs_webhook_url:
self.send_to_discord(self.logs_webhook_url, f"Successfully ran {script_config.name}")
logging.info(f"Completed {script_config.name}.")
time.sleep(script_config.interval) # Sleep after execution before next run
script_config.failure_count = 0 # Reset failure count after a successful run
except Exception as e:
logging.error(f"Error running {script_config.name}: {e}")
script_config.failure_count += 1
if script_config.failure_count >= 3: # Notify on third failure
error_message = f"{script_config.name} has failed 3 times. Last error: {e}"
if self.errors_webhook_url:
self.send_to_discord(self.errors_webhook_url, error_message)
logging.error(f"Max retry attempts reached for {script_config.name}. Error sent to Discord.")
return 500 # return error code
if script_config.retry_on_error:
retry_count += 1
if retry_count >= 3: # Retry 3 times
logging.error(f"Max retry attempts reached for {script_config.name}.")
return 500
logging.info(f"Retrying {script_config.name}... ({retry_count}/3)")
else:
break
else:
logging.error(f"Method '{script_config.method}' not found in {script_config.filename}")
# Restore the original sys.argv
sys.argv = original_argv
except Exception as e:
logging.error(f"Failed to run script {script_config.filename}: {e}")
return 404 # return not found if script not found
return 200 # return success
def run_script_in_thread(self, script_config):
"""Run the script in a separate thread with an endless loop."""
def target():
error_count = 0
while error_count < 3:
error_code = self.run_script(script_config)
if error_code == 404:
error_count += 1
time.sleep(5)
elif error_code == 500:
break
else:
error_count = 0 # Reset error count on success
logging.error(f"Script {script_config.name} has failed 3 times. Stopping execution.")
thread = threading.Thread(target=target)
thread.daemon = True # Allow the thread to exit when the main program exits
thread.start()
return thread
def run_all_scripts(self):
"""Start all scripts concurrently in separate threads with an endless loop."""
threads = []
for script_config in self.scripts:
thread = self.run_script_in_thread(script_config)
threads.append(thread)
# The main thread only needs to start the loops, it doesn't need to join since the threads are infinite
logging.info("All scripts have been started and are running in their respective threads.")
# Main execution
if __name__ == "__main__":
config_file = 'scripts_config.json'
runner = ScriptRunner(config_file)
runner.run_all_scripts()
# Keep the main program running (this is important for the daemon threads to keep running)
while True:
time.sleep(1)

26
scripts_config.json Normal file
View file

@ -0,0 +1,26 @@
[
{
"name": "Kcomebacks Sync",
"filename": "rpopfetch.py",
"method": "main",
"interval": 86400,
"retry_on_error": "true",
"launch_flags": ["--cdn"]
},
{
"name": "Project Sync",
"filename": "update_projects.py",
"method": "main",
"interval": 360,
"retry_on_error": "false",
"launch_flags": ["--cdn"]
},
{
"name": "Likedsongsync 2",
"filename": "run_likedsongsync2.py",
"method": "run",
"interval": 10800,
"retry_on_error": "true",
"launch_flags": [""]
}
]

View file

@ -9,11 +9,7 @@ import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import top_lib import top_lib
# load .env file
load_dotenv()
# Define your playlist IDs
LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID')
def progress_bar(current, total, last_time_stamp=float, etastr=None): def progress_bar(current, total, last_time_stamp=float, etastr=None):
'''A function to print a progress bar to the terminal. '''A function to print a progress bar to the terminal.
@ -155,8 +151,16 @@ def add_track_to_playlist(playlist_id, track_uri):
track_uri: The URI of the track to add to the playlist''' track_uri: The URI of the track to add to the playlist'''
sp.playlist_add_items(playlist_id, [track_uri]) sp.playlist_add_items(playlist_id, [track_uri])
def main():
# load .env file
load_dotenv()
if __name__ == "__main__": # because im lazy
global VERBOSE_LOGGING
global sp
# Define your playlist IDs
LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID')
# Parse command-line arguments # Parse command-line arguments
VERBOSE_LOGGING = "-v" in sys.argv or "--verbose" in sys.argv VERBOSE_LOGGING = "-v" in sys.argv or "--verbose" in sys.argv
@ -258,3 +262,7 @@ if __name__ == "__main__":
except Exception: except Exception:
#except e: #except e:
continue continue
if __name__ == "__main__":
main()

View file

@ -101,12 +101,13 @@ def get_languagages(repo, access_token):
print(f"[{repo}] Error fetching languages ", end="\n") print(f"[{repo}] Error fetching languages ", end="\n")
return None return None
# Path to the projects.json file def main():
projects_json_path = os.path.expanduser("~/.cache/gh-projects/projects.json") # Path to the projects.json file
# create the directory if it doesn't exist projects_json_path = os.path.expanduser("~/.cache/gh-projects/projects.json")
os.makedirs(os.path.dirname(projects_json_path), exist_ok=True) # create the directory if it doesn't exist
os.makedirs(os.path.dirname(projects_json_path), exist_ok=True)
if not DONTDOWNLOAD: if not DONTDOWNLOAD:
# fetch the projects.json file from https://cdn.jonasjones.dev/api/projects/projects.json # fetch the projects.json file from https://cdn.jonasjones.dev/api/projects/projects.json
projects_json_url = "https://cdn.jonasjones.dev/api/projects/projects.json" projects_json_url = "https://cdn.jonasjones.dev/api/projects/projects.json"
projects_json = requests.get(projects_json_url) projects_json = requests.get(projects_json_url)
@ -114,17 +115,17 @@ if not DONTDOWNLOAD:
file.write(projects_json.content) file.write(projects_json.content)
verboseprint(f"Fetched projects.json file") verboseprint(f"Fetched projects.json file")
elif not os.path.exists("~/.cache/gh-projects/projects.json"): elif not os.path.exists("~/.cache/gh-projects/projects.json"):
FileNotFoundError("File 'projects.json' not found. Cannot proceed without \ FileNotFoundError("File 'projects.json' not found. Cannot proceed without \
downloading it. Remove '-dd' or '--dontdownload' from the launch arguments.") downloading it. Remove '-dd' or '--dontdownload' from the launch arguments.")
else: else:
print("Skipping download of 'projects.json'") print("Skipping download of 'projects.json'")
# Load the existing projects.json file # Load the existing projects.json file
with open(projects_json_path, "r") as file: with open(projects_json_path, "r") as file:
projects_data = json.load(file) projects_data = json.load(file)
if not DONTUPDATEGH: if not DONTUPDATEGH:
print("Fetching Repo data...") print("Fetching Repo data...")
# Update the last_update (Unix timestamp) for each project # Update the last_update (Unix timestamp) for each project
@ -142,24 +143,28 @@ if not DONTUPDATEGH:
languages = get_languagages(gh_api, GITHUB_API_TOKEN) languages = get_languagages(gh_api, GITHUB_API_TOKEN)
if languages: if languages:
project["languages"] = languages project["languages"] = languages
else: else:
print("Skipping Github updates...") print("Skipping Github updates...")
# remove first element # remove first element
projects_data.pop(0) projects_data.pop(0)
# sort projects alphabetically # sort projects alphabetically
projects_data = sorted(projects_data, key=lambda x: x['last_update'], reverse=True) projects_data = sorted(projects_data, key=lambda x: x['last_update'], reverse=True)
# add a first element to the list that holds the date of the last update # add a first element to the list that holds the date of the last update
projects_data.insert(0, {"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) + " UTC"}) projects_data.insert(0, {"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) + " UTC"})
# Save the updated data back to the projects.json file # Save the updated data back to the projects.json file
with open(projects_json_path, "w") as file: with open(projects_json_path, "w") as file:
json.dump(projects_data, file, indent=2) json.dump(projects_data, file, indent=2)
print("Updated projects.json\nUploading to cdn...") print("Updated projects.json\nUploading to cdn...")
os.system(f"rclone copy {projects_json_path} cdn:cdn/api/projects/") os.system(f"rclone copy {projects_json_path} cdn:cdn/api/projects/")
print("Uploaded projects.json to cdn") print("Uploaded projects.json to cdn")
if __name__ == "__main__":
main()