mirror of
https://github.com/JonasunderscoreJones/turbo-octo-potato.git
synced 2025-10-25 11:09:18 +02:00
Compare commits
10 commits
58c5233413
...
4fd2404f36
| Author | SHA1 | Date | |
|---|---|---|---|
| 4fd2404f36 | |||
| 065aa592f9 | |||
| 8766ffb9a9 | |||
| 58508e1d27 | |||
| 745e78d227 | |||
| 79cb848f92 | |||
| 0c92b155cd | |||
| ce1b6cdba1 | |||
| 30581a554a | |||
| a1de9edebe |
8 changed files with 454 additions and 115 deletions
92
github_forgejo_syncer.py
Normal file
92
github_forgejo_syncer.py
Normal file
|
|
@ -0,0 +1,92 @@
|
||||||
|
import requests
|
||||||
|
import os
|
||||||
|
import dotenv
|
||||||
|
|
||||||
|
# Load the environment variables
|
||||||
|
dotenv.load_dotenv()
|
||||||
|
|
||||||
|
# Configuration: Set your GitHub and Forgejo credentials and URLs
|
||||||
|
GITHUB_USERNAME = os.getenv('GITHUB_USERNAME')
|
||||||
|
GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')
|
||||||
|
FORGEJO_USERNAME = os.getenv('FORGEJO_USERNAME')
|
||||||
|
FORGEJO_API_URL = os.getenv('FORGEJO_API_URL')
|
||||||
|
FORGEJO_TOKEN = os.getenv('FORGEJO_TOKEN')
|
||||||
|
|
||||||
|
REPO_BLACKLIST = ["ZtereoMUSIC", "nicer-skies", "epr_grader"]
|
||||||
|
|
||||||
|
# Fetch repositories from GitHub
|
||||||
|
def get_github_repositories():
|
||||||
|
github_url = f'https://api.github.com/users/{GITHUB_USERNAME}/repos'
|
||||||
|
headers = {'Authorization': f'token {GITHUB_TOKEN}'}
|
||||||
|
repos = []
|
||||||
|
page = 1
|
||||||
|
|
||||||
|
while True:
|
||||||
|
response = requests.get(github_url, headers=headers, params={'page': page, 'per_page': 100})
|
||||||
|
if response.status_code != 200:
|
||||||
|
print(f"Error fetching GitHub repositories: {response.text}")
|
||||||
|
break
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
if not data: # No more repositories
|
||||||
|
break
|
||||||
|
|
||||||
|
repos.extend(data)
|
||||||
|
page += 1
|
||||||
|
|
||||||
|
return repos
|
||||||
|
|
||||||
|
# Check if a repository exists on Forgejo
|
||||||
|
def check_forgejo_repo_exists(repo_name):
|
||||||
|
forgejo_url = f'{FORGEJO_API_URL}/repos/{FORGEJO_USERNAME}/{repo_name}'
|
||||||
|
headers = {'Authorization': f'token {FORGEJO_TOKEN}'}
|
||||||
|
response = requests.get(forgejo_url, headers=headers)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
return True # Repo exists
|
||||||
|
elif response.status_code == 404:
|
||||||
|
return False # Repo does not exist
|
||||||
|
else:
|
||||||
|
print(f"Error checking repository on Forgejo: {response.text}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Create a mirror repository on Forgejo
|
||||||
|
def create_forgejo_repo_mirror(github_repo):
|
||||||
|
forgejo_url = f'{FORGEJO_API_URL}/repos/migrate'
|
||||||
|
headers = {'Authorization': f'token {FORGEJO_TOKEN}', 'Content-Type': 'application/json'}
|
||||||
|
|
||||||
|
# Prepare the payload
|
||||||
|
payload = {
|
||||||
|
'clone_addr': github_repo['clone_url'],
|
||||||
|
'repo_name': github_repo['name'],
|
||||||
|
'private': github_repo['private'],
|
||||||
|
'mirror': True,
|
||||||
|
'description': github_repo.get('description', ''),
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.post(forgejo_url, json=payload, headers=headers)
|
||||||
|
|
||||||
|
if response.status_code == 201:
|
||||||
|
print(f"Created mirror for {github_repo['name']}")
|
||||||
|
else:
|
||||||
|
print(f"Error creating mirror for {github_repo['name']}: {response.text}")
|
||||||
|
|
||||||
|
# Main script
|
||||||
|
def main():
|
||||||
|
print("Fetching GitHub repositories...")
|
||||||
|
github_repos = get_github_repositories()
|
||||||
|
|
||||||
|
for github_repo in github_repos:
|
||||||
|
repo_name = github_repo['name']
|
||||||
|
print(f"Checking if {repo_name} exists on Forgejo...")
|
||||||
|
|
||||||
|
if repo_name in REPO_BLACKLIST:
|
||||||
|
print(f"Repository {repo_name} is blacklisted. Skipping.")
|
||||||
|
elif not check_forgejo_repo_exists(repo_name):
|
||||||
|
print(f"Repository {repo_name} does not exist on Forgejo. Creating mirror...")
|
||||||
|
create_forgejo_repo_mirror(github_repo)
|
||||||
|
else:
|
||||||
|
print(f"Repository {repo_name} already exists on Forgejo. Skipping.")
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
30
mydreamsetup_achievements/mydreamsetup_placefurniture.py
Normal file
30
mydreamsetup_achievements/mydreamsetup_placefurniture.py
Normal file
|
|
@ -0,0 +1,30 @@
|
||||||
|
import pyautogui
|
||||||
|
import time
|
||||||
|
|
||||||
|
# Constant delay in seconds
|
||||||
|
DELAY = 0.125
|
||||||
|
DELAY = 0
|
||||||
|
|
||||||
|
# Define the main function to perform the clicks
|
||||||
|
def perform_clicks():
|
||||||
|
while True:
|
||||||
|
# Click on pixel (300, 150)
|
||||||
|
pyautogui.click(300, 150)
|
||||||
|
time.sleep(DELAY)
|
||||||
|
|
||||||
|
# Click on pixel (960, 530)
|
||||||
|
pyautogui.click(960, 530)
|
||||||
|
time.sleep(DELAY)
|
||||||
|
|
||||||
|
# Click on pixel (960, 530) again
|
||||||
|
pyautogui.click(960, 530)
|
||||||
|
time.sleep(DELAY)
|
||||||
|
|
||||||
|
# Click on pixel (960, 555)
|
||||||
|
pyautogui.click(960, 555)
|
||||||
|
time.sleep(DELAY)
|
||||||
|
|
||||||
|
# Start the clicking loop
|
||||||
|
if __name__ == "__main__":
|
||||||
|
time.sleep(5)
|
||||||
|
perform_clicks()
|
||||||
10
rpopfetch.py
10
rpopfetch.py
|
|
@ -247,9 +247,14 @@ def fetch_monthly_page(wiki_link, subreddit_name):
|
||||||
print(f"Error fetching Reddit wiki page: {e}")
|
print(f"Error fetching Reddit wiki page: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def main():
|
||||||
UPLOAD_TO_CDN = True if "--cdn" in sys.argv else False
|
UPLOAD_TO_CDN = True if "--cdn" in sys.argv else False
|
||||||
SEND_WEBHOOK = False if "--no-webhook" in sys.argv else False if "-nwh" in sys.argv else True
|
SEND_WEBHOOK = False if "--no-webhook" in sys.argv else False if "-nwh" in sys.argv else True
|
||||||
|
|
||||||
|
# because im lazy
|
||||||
|
global reddit
|
||||||
|
global progress
|
||||||
|
|
||||||
# reddit infos
|
# reddit infos
|
||||||
subreddit_name = "kpop"
|
subreddit_name = "kpop"
|
||||||
wiki_page_name = "upcoming-releases/archive"
|
wiki_page_name = "upcoming-releases/archive"
|
||||||
|
|
@ -294,7 +299,7 @@ if content:
|
||||||
|
|
||||||
# sleep for 2 seconds to avoid getting rate limited
|
# sleep for 2 seconds to avoid getting rate limited
|
||||||
# reddit api is awful
|
# reddit api is awful
|
||||||
time.sleep(2)
|
# time.sleep(2)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# fetch the monthly page and parse it
|
# fetch the monthly page and parse it
|
||||||
|
|
@ -331,3 +336,6 @@ if content:
|
||||||
|
|
||||||
if SEND_WEBHOOK:
|
if SEND_WEBHOOK:
|
||||||
rpop_webhook.send_webhook()
|
rpop_webhook.send_webhook()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
|
||||||
5
run_likedsongsync2.py
Normal file
5
run_likedsongsync2.py
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
# import and run spotify_scripts/likedsongsync2.py
|
||||||
|
from spotify_scripts.likedsongsync2 import main
|
||||||
|
|
||||||
|
def run():
|
||||||
|
main()
|
||||||
165
script_interval_runner.py
Normal file
165
script_interval_runner.py
Normal file
|
|
@ -0,0 +1,165 @@
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
import importlib
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
import sys
|
||||||
|
import requests
|
||||||
|
|
||||||
|
# Set up logging
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
|
class ScriptConfig:
|
||||||
|
def __init__(self, name, filename, method, interval, retry_on_error, launch_flags):
|
||||||
|
self.name = name
|
||||||
|
self.filename = filename
|
||||||
|
self.method = method
|
||||||
|
self.interval = interval
|
||||||
|
self.retry_on_error = retry_on_error
|
||||||
|
self.launch_flags = launch_flags # This will store the flags
|
||||||
|
self.failure_count = 0 # Track the failure count
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"ScriptConfig(name={self.name}, filename={self.filename}, method={self.method}, interval={self.interval}, retry_on_error={self.retry_on_error}, launch_flags={self.launch_flags}, failure_count={self.failure_count})"
|
||||||
|
|
||||||
|
|
||||||
|
class ScriptRunner:
|
||||||
|
def __init__(self, config_file):
|
||||||
|
self.scripts = []
|
||||||
|
self.load_config(config_file)
|
||||||
|
# Retrieve the Discord webhook URLs from the environment variables
|
||||||
|
self.logs_webhook_url = os.getenv('SCRIPT_LOGS_DISCORD_WEBHOOK_URL')
|
||||||
|
self.errors_webhook_url = os.getenv('SCRIPT_ERROR_DISCORD_WEBHOOK_URL')
|
||||||
|
|
||||||
|
def load_config(self, config_file):
|
||||||
|
"""Load script configurations from a JSON file."""
|
||||||
|
if not os.path.exists(config_file):
|
||||||
|
logging.error(f"Config file '{config_file}' not found.")
|
||||||
|
return
|
||||||
|
|
||||||
|
with open(config_file, 'r') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
for item in data:
|
||||||
|
# Read launch_flags as a list (it could be an empty list if no flags are present)
|
||||||
|
launch_flags = item.get('launch_flags', [])
|
||||||
|
|
||||||
|
script_config = ScriptConfig(
|
||||||
|
name=item['name'],
|
||||||
|
filename=item['filename'],
|
||||||
|
method=item['method'],
|
||||||
|
interval=item['interval'],
|
||||||
|
retry_on_error=item['retry_on_error'].lower() == 'true',
|
||||||
|
launch_flags=launch_flags # Store launch flags
|
||||||
|
)
|
||||||
|
self.scripts.append(script_config)
|
||||||
|
|
||||||
|
def send_to_discord(self, webhook_url, message):
|
||||||
|
"""Send a message to a Discord webhook."""
|
||||||
|
if webhook_url:
|
||||||
|
try:
|
||||||
|
payload = {'content': message}
|
||||||
|
requests.post(webhook_url, json=payload)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Failed to send message to Discord: {e}")
|
||||||
|
|
||||||
|
def run_script(self, script_config):
|
||||||
|
"""Run the script as per configuration."""
|
||||||
|
script_path = os.path.join(os.getcwd(), script_config.filename)
|
||||||
|
|
||||||
|
if not os.path.exists(script_path):
|
||||||
|
logging.error(f"Script file '{script_config.filename}' not found in the current directory.")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Import the script as a module
|
||||||
|
script_name = script_config.filename[:-3] # Strip ".py" from the filename
|
||||||
|
module = importlib.import_module(script_name)
|
||||||
|
|
||||||
|
# Temporarily modify sys.argv to simulate command-line arguments
|
||||||
|
original_argv = sys.argv
|
||||||
|
sys.argv = [script_name] + script_config.launch_flags # Simulate command-line args
|
||||||
|
|
||||||
|
# Check if method exists
|
||||||
|
if hasattr(module, script_config.method):
|
||||||
|
method = getattr(module, script_config.method)
|
||||||
|
retry_count = 0
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
logging.info(f"Running {script_config.name} with flags: {script_config.launch_flags}...")
|
||||||
|
# Call the method without needing to pass arguments (sys.argv is used)
|
||||||
|
method()
|
||||||
|
# Log successful run to Discord
|
||||||
|
if self.logs_webhook_url:
|
||||||
|
self.send_to_discord(self.logs_webhook_url, f"Successfully ran {script_config.name}")
|
||||||
|
logging.info(f"Completed {script_config.name}.")
|
||||||
|
time.sleep(script_config.interval) # Sleep after execution before next run
|
||||||
|
script_config.failure_count = 0 # Reset failure count after a successful run
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error running {script_config.name}: {e}")
|
||||||
|
script_config.failure_count += 1
|
||||||
|
if script_config.failure_count >= 3: # Notify on third failure
|
||||||
|
error_message = f"{script_config.name} has failed 3 times. Last error: {e}"
|
||||||
|
if self.errors_webhook_url:
|
||||||
|
self.send_to_discord(self.errors_webhook_url, error_message)
|
||||||
|
logging.error(f"Max retry attempts reached for {script_config.name}. Error sent to Discord.")
|
||||||
|
return 500 # return error code
|
||||||
|
if script_config.retry_on_error:
|
||||||
|
retry_count += 1
|
||||||
|
if retry_count >= 3: # Retry 3 times
|
||||||
|
logging.error(f"Max retry attempts reached for {script_config.name}.")
|
||||||
|
return 500
|
||||||
|
logging.info(f"Retrying {script_config.name}... ({retry_count}/3)")
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
logging.error(f"Method '{script_config.method}' not found in {script_config.filename}")
|
||||||
|
|
||||||
|
# Restore the original sys.argv
|
||||||
|
sys.argv = original_argv
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Failed to run script {script_config.filename}: {e}")
|
||||||
|
return 404 # return not found if script not found
|
||||||
|
|
||||||
|
return 200 # return success
|
||||||
|
|
||||||
|
def run_script_in_thread(self, script_config):
|
||||||
|
"""Run the script in a separate thread with an endless loop."""
|
||||||
|
def target():
|
||||||
|
error_count = 0
|
||||||
|
while error_count < 3:
|
||||||
|
error_code = self.run_script(script_config)
|
||||||
|
if error_code == 404:
|
||||||
|
error_count += 1
|
||||||
|
time.sleep(5)
|
||||||
|
elif error_code == 500:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
error_count = 0 # Reset error count on success
|
||||||
|
|
||||||
|
logging.error(f"Script {script_config.name} has failed 3 times. Stopping execution.")
|
||||||
|
|
||||||
|
thread = threading.Thread(target=target)
|
||||||
|
thread.daemon = True # Allow the thread to exit when the main program exits
|
||||||
|
thread.start()
|
||||||
|
return thread
|
||||||
|
|
||||||
|
def run_all_scripts(self):
|
||||||
|
"""Start all scripts concurrently in separate threads with an endless loop."""
|
||||||
|
threads = []
|
||||||
|
for script_config in self.scripts:
|
||||||
|
thread = self.run_script_in_thread(script_config)
|
||||||
|
threads.append(thread)
|
||||||
|
|
||||||
|
# The main thread only needs to start the loops, it doesn't need to join since the threads are infinite
|
||||||
|
logging.info("All scripts have been started and are running in their respective threads.")
|
||||||
|
|
||||||
|
# Main execution
|
||||||
|
if __name__ == "__main__":
|
||||||
|
config_file = 'scripts_config.json'
|
||||||
|
runner = ScriptRunner(config_file)
|
||||||
|
runner.run_all_scripts()
|
||||||
|
|
||||||
|
# Keep the main program running (this is important for the daemon threads to keep running)
|
||||||
|
while True:
|
||||||
|
time.sleep(1)
|
||||||
26
scripts_config.json
Normal file
26
scripts_config.json
Normal file
|
|
@ -0,0 +1,26 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"name": "Kcomebacks Sync",
|
||||||
|
"filename": "rpopfetch.py",
|
||||||
|
"method": "main",
|
||||||
|
"interval": 86400,
|
||||||
|
"retry_on_error": "true",
|
||||||
|
"launch_flags": ["--cdn"]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Project Sync",
|
||||||
|
"filename": "update_projects.py",
|
||||||
|
"method": "main",
|
||||||
|
"interval": 360,
|
||||||
|
"retry_on_error": "false",
|
||||||
|
"launch_flags": ["--cdn"]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Likedsongsync 2",
|
||||||
|
"filename": "run_likedsongsync2.py",
|
||||||
|
"method": "run",
|
||||||
|
"interval": 10800,
|
||||||
|
"retry_on_error": "true",
|
||||||
|
"launch_flags": [""]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
@ -9,11 +9,7 @@ import os
|
||||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||||
import top_lib
|
import top_lib
|
||||||
|
|
||||||
# load .env file
|
|
||||||
load_dotenv()
|
|
||||||
|
|
||||||
# Define your playlist IDs
|
|
||||||
LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID')
|
|
||||||
|
|
||||||
def progress_bar(current, total, last_time_stamp=float, etastr=None):
|
def progress_bar(current, total, last_time_stamp=float, etastr=None):
|
||||||
'''A function to print a progress bar to the terminal.
|
'''A function to print a progress bar to the terminal.
|
||||||
|
|
@ -155,8 +151,16 @@ def add_track_to_playlist(playlist_id, track_uri):
|
||||||
track_uri: The URI of the track to add to the playlist'''
|
track_uri: The URI of the track to add to the playlist'''
|
||||||
sp.playlist_add_items(playlist_id, [track_uri])
|
sp.playlist_add_items(playlist_id, [track_uri])
|
||||||
|
|
||||||
|
def main():
|
||||||
|
# load .env file
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
# because im lazy
|
||||||
|
global VERBOSE_LOGGING
|
||||||
|
global sp
|
||||||
|
|
||||||
|
# Define your playlist IDs
|
||||||
|
LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID')
|
||||||
|
|
||||||
# Parse command-line arguments
|
# Parse command-line arguments
|
||||||
VERBOSE_LOGGING = "-v" in sys.argv or "--verbose" in sys.argv
|
VERBOSE_LOGGING = "-v" in sys.argv or "--verbose" in sys.argv
|
||||||
|
|
@ -258,3 +262,7 @@ if __name__ == "__main__":
|
||||||
except Exception:
|
except Exception:
|
||||||
#except e:
|
#except e:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
|
||||||
|
|
@ -101,6 +101,7 @@ def get_languagages(repo, access_token):
|
||||||
print(f"[{repo}] Error fetching languages ", end="\n")
|
print(f"[{repo}] Error fetching languages ", end="\n")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def main():
|
||||||
# Path to the projects.json file
|
# Path to the projects.json file
|
||||||
projects_json_path = os.path.expanduser("~/.cache/gh-projects/projects.json")
|
projects_json_path = os.path.expanduser("~/.cache/gh-projects/projects.json")
|
||||||
# create the directory if it doesn't exist
|
# create the directory if it doesn't exist
|
||||||
|
|
@ -163,3 +164,7 @@ print("Updated projects.json\nUploading to cdn...")
|
||||||
os.system(f"rclone copy {projects_json_path} cdn:cdn/api/projects/")
|
os.system(f"rclone copy {projects_json_path} cdn:cdn/api/projects/")
|
||||||
|
|
||||||
print("Uploaded projects.json to cdn")
|
print("Uploaded projects.json to cdn")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue