mirror of
				https://github.com/JonasunderscoreJones/turbo-octo-potato.git
				synced 2025-10-25 19:19:19 +02:00 
			
		
		
		
	Compare commits
	
		
			No commits in common. "4fd2404f36c96e794699217b767dd14fc535544f" and "58c5233413ed332e602d5dc6ac5b928043105eb4" have entirely different histories.
		
	
	
		
			4fd2404f36
			...
			58c5233413
		
	
		
					 8 changed files with 115 additions and 454 deletions
				
			
		|  | @ -1,92 +0,0 @@ | |||
| import requests | ||||
| import os | ||||
| import dotenv | ||||
| 
 | ||||
| # Load the environment variables | ||||
| dotenv.load_dotenv() | ||||
| 
 | ||||
| # Configuration: Set your GitHub and Forgejo credentials and URLs | ||||
| GITHUB_USERNAME = os.getenv('GITHUB_USERNAME') | ||||
| GITHUB_TOKEN = os.getenv('GITHUB_TOKEN') | ||||
| FORGEJO_USERNAME = os.getenv('FORGEJO_USERNAME') | ||||
| FORGEJO_API_URL = os.getenv('FORGEJO_API_URL') | ||||
| FORGEJO_TOKEN = os.getenv('FORGEJO_TOKEN') | ||||
| 
 | ||||
| REPO_BLACKLIST = ["ZtereoMUSIC", "nicer-skies", "epr_grader"] | ||||
| 
 | ||||
| # Fetch repositories from GitHub | ||||
| def get_github_repositories(): | ||||
|     github_url = f'https://api.github.com/users/{GITHUB_USERNAME}/repos' | ||||
|     headers = {'Authorization': f'token {GITHUB_TOKEN}'} | ||||
|     repos = [] | ||||
|     page = 1 | ||||
| 
 | ||||
|     while True: | ||||
|         response = requests.get(github_url, headers=headers, params={'page': page, 'per_page': 100}) | ||||
|         if response.status_code != 200: | ||||
|             print(f"Error fetching GitHub repositories: {response.text}") | ||||
|             break | ||||
| 
 | ||||
|         data = response.json() | ||||
|         if not data:  # No more repositories | ||||
|             break | ||||
| 
 | ||||
|         repos.extend(data) | ||||
|         page += 1 | ||||
| 
 | ||||
|     return repos | ||||
| 
 | ||||
| # Check if a repository exists on Forgejo | ||||
| def check_forgejo_repo_exists(repo_name): | ||||
|     forgejo_url = f'{FORGEJO_API_URL}/repos/{FORGEJO_USERNAME}/{repo_name}' | ||||
|     headers = {'Authorization': f'token {FORGEJO_TOKEN}'} | ||||
|     response = requests.get(forgejo_url, headers=headers) | ||||
| 
 | ||||
|     if response.status_code == 200: | ||||
|         return True  # Repo exists | ||||
|     elif response.status_code == 404: | ||||
|         return False  # Repo does not exist | ||||
|     else: | ||||
|         print(f"Error checking repository on Forgejo: {response.text}") | ||||
|         return False | ||||
| 
 | ||||
| # Create a mirror repository on Forgejo | ||||
| def create_forgejo_repo_mirror(github_repo): | ||||
|     forgejo_url = f'{FORGEJO_API_URL}/repos/migrate' | ||||
|     headers = {'Authorization': f'token {FORGEJO_TOKEN}', 'Content-Type': 'application/json'} | ||||
|      | ||||
|     # Prepare the payload | ||||
|     payload = { | ||||
|         'clone_addr': github_repo['clone_url'], | ||||
|         'repo_name': github_repo['name'], | ||||
|         'private': github_repo['private'], | ||||
|         'mirror': True, | ||||
|         'description': github_repo.get('description', ''), | ||||
|     } | ||||
|      | ||||
|     response = requests.post(forgejo_url, json=payload, headers=headers) | ||||
|      | ||||
|     if response.status_code == 201: | ||||
|         print(f"Created mirror for {github_repo['name']}") | ||||
|     else: | ||||
|         print(f"Error creating mirror for {github_repo['name']}: {response.text}") | ||||
| 
 | ||||
| # Main script | ||||
| def main(): | ||||
|     print("Fetching GitHub repositories...") | ||||
|     github_repos = get_github_repositories() | ||||
|      | ||||
|     for github_repo in github_repos: | ||||
|         repo_name = github_repo['name'] | ||||
|         print(f"Checking if {repo_name} exists on Forgejo...") | ||||
| 
 | ||||
|         if repo_name in REPO_BLACKLIST: | ||||
|             print(f"Repository {repo_name} is blacklisted. Skipping.") | ||||
|         elif not check_forgejo_repo_exists(repo_name): | ||||
|             print(f"Repository {repo_name} does not exist on Forgejo. Creating mirror...") | ||||
|             create_forgejo_repo_mirror(github_repo) | ||||
|         else: | ||||
|             print(f"Repository {repo_name} already exists on Forgejo. Skipping.") | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     main() | ||||
|  | @ -1,30 +0,0 @@ | |||
| import pyautogui | ||||
| import time | ||||
| 
 | ||||
| # Constant delay in seconds | ||||
| DELAY = 0.125 | ||||
| DELAY = 0 | ||||
| 
 | ||||
| # Define the main function to perform the clicks | ||||
| def perform_clicks(): | ||||
|     while True: | ||||
|         # Click on pixel (300, 150) | ||||
|         pyautogui.click(300, 150) | ||||
|         time.sleep(DELAY) | ||||
| 
 | ||||
|         # Click on pixel (960, 530) | ||||
|         pyautogui.click(960, 530) | ||||
|         time.sleep(DELAY) | ||||
| 
 | ||||
|         # Click on pixel (960, 530) again | ||||
|         pyautogui.click(960, 530) | ||||
|         time.sleep(DELAY) | ||||
| 
 | ||||
|         # Click on pixel (960, 555) | ||||
|         pyautogui.click(960, 555) | ||||
|         time.sleep(DELAY) | ||||
| 
 | ||||
| # Start the clicking loop | ||||
| if __name__ == "__main__": | ||||
|     time.sleep(5) | ||||
|     perform_clicks() | ||||
							
								
								
									
										10
									
								
								rpopfetch.py
									
										
									
									
									
								
							
							
						
						
									
										10
									
								
								rpopfetch.py
									
										
									
									
									
								
							|  | @ -247,14 +247,9 @@ def fetch_monthly_page(wiki_link, subreddit_name): | |||
|         print(f"Error fetching Reddit wiki page: {e}") | ||||
|         return None | ||||
| 
 | ||||
| def main(): | ||||
| UPLOAD_TO_CDN = True if "--cdn" in sys.argv else False | ||||
| SEND_WEBHOOK = False if "--no-webhook" in sys.argv else False if "-nwh" in sys.argv else True | ||||
| 
 | ||||
|     # because im lazy | ||||
|     global reddit | ||||
|     global progress | ||||
| 
 | ||||
| # reddit infos | ||||
| subreddit_name = "kpop" | ||||
| wiki_page_name = "upcoming-releases/archive" | ||||
|  | @ -299,7 +294,7 @@ def main(): | |||
| 
 | ||||
|         # sleep for 2 seconds to avoid getting rate limited | ||||
|         # reddit api is awful | ||||
|             # time.sleep(2) | ||||
|         time.sleep(2) | ||||
| 
 | ||||
|         try: | ||||
|             # fetch the monthly page and parse it | ||||
|  | @ -336,6 +331,3 @@ def main(): | |||
| 
 | ||||
| if SEND_WEBHOOK: | ||||
|     rpop_webhook.send_webhook() | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
|  |  | |||
|  | @ -1,5 +0,0 @@ | |||
| # import and run spotify_scripts/likedsongsync2.py | ||||
| from spotify_scripts.likedsongsync2 import main | ||||
| 
 | ||||
| def run(): | ||||
|     main() | ||||
|  | @ -1,165 +0,0 @@ | |||
| import json | ||||
| import time | ||||
| import importlib | ||||
| import os | ||||
| import logging | ||||
| import threading | ||||
| import sys | ||||
| import requests | ||||
| 
 | ||||
| # Set up logging | ||||
| logging.basicConfig(level=logging.INFO) | ||||
| 
 | ||||
| class ScriptConfig: | ||||
|     def __init__(self, name, filename, method, interval, retry_on_error, launch_flags): | ||||
|         self.name = name | ||||
|         self.filename = filename | ||||
|         self.method = method | ||||
|         self.interval = interval | ||||
|         self.retry_on_error = retry_on_error | ||||
|         self.launch_flags = launch_flags  # This will store the flags | ||||
|         self.failure_count = 0  # Track the failure count | ||||
| 
 | ||||
|     def __repr__(self): | ||||
|         return f"ScriptConfig(name={self.name}, filename={self.filename}, method={self.method}, interval={self.interval}, retry_on_error={self.retry_on_error}, launch_flags={self.launch_flags}, failure_count={self.failure_count})" | ||||
| 
 | ||||
| 
 | ||||
| class ScriptRunner: | ||||
|     def __init__(self, config_file): | ||||
|         self.scripts = [] | ||||
|         self.load_config(config_file) | ||||
|         # Retrieve the Discord webhook URLs from the environment variables | ||||
|         self.logs_webhook_url = os.getenv('SCRIPT_LOGS_DISCORD_WEBHOOK_URL') | ||||
|         self.errors_webhook_url = os.getenv('SCRIPT_ERROR_DISCORD_WEBHOOK_URL') | ||||
| 
 | ||||
|     def load_config(self, config_file): | ||||
|         """Load script configurations from a JSON file.""" | ||||
|         if not os.path.exists(config_file): | ||||
|             logging.error(f"Config file '{config_file}' not found.") | ||||
|             return | ||||
| 
 | ||||
|         with open(config_file, 'r') as f: | ||||
|             data = json.load(f) | ||||
|             for item in data: | ||||
|                 # Read launch_flags as a list (it could be an empty list if no flags are present) | ||||
|                 launch_flags = item.get('launch_flags', []) | ||||
| 
 | ||||
|                 script_config = ScriptConfig( | ||||
|                     name=item['name'], | ||||
|                     filename=item['filename'], | ||||
|                     method=item['method'], | ||||
|                     interval=item['interval'], | ||||
|                     retry_on_error=item['retry_on_error'].lower() == 'true', | ||||
|                     launch_flags=launch_flags  # Store launch flags | ||||
|                 ) | ||||
|                 self.scripts.append(script_config) | ||||
| 
 | ||||
|     def send_to_discord(self, webhook_url, message): | ||||
|         """Send a message to a Discord webhook.""" | ||||
|         if webhook_url: | ||||
|             try: | ||||
|                 payload = {'content': message} | ||||
|                 requests.post(webhook_url, json=payload) | ||||
|             except Exception as e: | ||||
|                 logging.error(f"Failed to send message to Discord: {e}") | ||||
| 
 | ||||
|     def run_script(self, script_config): | ||||
|         """Run the script as per configuration.""" | ||||
|         script_path = os.path.join(os.getcwd(), script_config.filename) | ||||
| 
 | ||||
|         if not os.path.exists(script_path): | ||||
|             logging.error(f"Script file '{script_config.filename}' not found in the current directory.") | ||||
|             return | ||||
| 
 | ||||
|         try: | ||||
|             # Import the script as a module | ||||
|             script_name = script_config.filename[:-3]  # Strip ".py" from the filename | ||||
|             module = importlib.import_module(script_name) | ||||
| 
 | ||||
|             # Temporarily modify sys.argv to simulate command-line arguments | ||||
|             original_argv = sys.argv | ||||
|             sys.argv = [script_name] + script_config.launch_flags  # Simulate command-line args | ||||
| 
 | ||||
|             # Check if method exists | ||||
|             if hasattr(module, script_config.method): | ||||
|                 method = getattr(module, script_config.method) | ||||
|                 retry_count = 0 | ||||
|                 while True: | ||||
|                     try: | ||||
|                         logging.info(f"Running {script_config.name} with flags: {script_config.launch_flags}...") | ||||
|                         # Call the method without needing to pass arguments (sys.argv is used) | ||||
|                         method() | ||||
|                         # Log successful run to Discord | ||||
|                         if self.logs_webhook_url: | ||||
|                             self.send_to_discord(self.logs_webhook_url, f"Successfully ran {script_config.name}") | ||||
|                         logging.info(f"Completed {script_config.name}.") | ||||
|                         time.sleep(script_config.interval)  # Sleep after execution before next run | ||||
|                         script_config.failure_count = 0  # Reset failure count after a successful run | ||||
|                     except Exception as e: | ||||
|                         logging.error(f"Error running {script_config.name}: {e}") | ||||
|                         script_config.failure_count += 1 | ||||
|                         if script_config.failure_count >= 3:  # Notify on third failure | ||||
|                             error_message = f"{script_config.name} has failed 3 times. Last error: {e}" | ||||
|                             if self.errors_webhook_url: | ||||
|                                 self.send_to_discord(self.errors_webhook_url, error_message) | ||||
|                             logging.error(f"Max retry attempts reached for {script_config.name}. Error sent to Discord.") | ||||
|                             return 500 # return error code | ||||
|                         if script_config.retry_on_error: | ||||
|                             retry_count += 1 | ||||
|                             if retry_count >= 3:  # Retry 3 times | ||||
|                                 logging.error(f"Max retry attempts reached for {script_config.name}.") | ||||
|                                 return 500 | ||||
|                             logging.info(f"Retrying {script_config.name}... ({retry_count}/3)") | ||||
|                         else: | ||||
|                             break | ||||
|             else: | ||||
|                 logging.error(f"Method '{script_config.method}' not found in {script_config.filename}") | ||||
| 
 | ||||
|             # Restore the original sys.argv | ||||
|             sys.argv = original_argv | ||||
|         except Exception as e: | ||||
|             logging.error(f"Failed to run script {script_config.filename}: {e}") | ||||
|             return 404 # return not found if script not found | ||||
| 
 | ||||
|         return 200 # return success | ||||
| 
 | ||||
|     def run_script_in_thread(self, script_config): | ||||
|         """Run the script in a separate thread with an endless loop.""" | ||||
|         def target(): | ||||
|             error_count = 0 | ||||
|             while error_count < 3: | ||||
|                 error_code = self.run_script(script_config) | ||||
|                 if error_code == 404: | ||||
|                     error_count += 1 | ||||
|                     time.sleep(5) | ||||
|                 elif error_code == 500: | ||||
|                     break | ||||
|                 else: | ||||
|                     error_count = 0 # Reset error count on success | ||||
| 
 | ||||
|             logging.error(f"Script {script_config.name} has failed 3 times. Stopping execution.") | ||||
| 
 | ||||
|         thread = threading.Thread(target=target) | ||||
|         thread.daemon = True  # Allow the thread to exit when the main program exits | ||||
|         thread.start() | ||||
|         return thread | ||||
| 
 | ||||
|     def run_all_scripts(self): | ||||
|         """Start all scripts concurrently in separate threads with an endless loop.""" | ||||
|         threads = [] | ||||
|         for script_config in self.scripts: | ||||
|             thread = self.run_script_in_thread(script_config) | ||||
|             threads.append(thread) | ||||
| 
 | ||||
|         # The main thread only needs to start the loops, it doesn't need to join since the threads are infinite | ||||
|         logging.info("All scripts have been started and are running in their respective threads.") | ||||
| 
 | ||||
| # Main execution | ||||
| if __name__ == "__main__": | ||||
|     config_file = 'scripts_config.json' | ||||
|     runner = ScriptRunner(config_file) | ||||
|     runner.run_all_scripts() | ||||
| 
 | ||||
|     # Keep the main program running (this is important for the daemon threads to keep running) | ||||
|     while True: | ||||
|         time.sleep(1) | ||||
|  | @ -1,26 +0,0 @@ | |||
| [ | ||||
|     { | ||||
|         "name": "Kcomebacks Sync", | ||||
|         "filename": "rpopfetch.py", | ||||
|         "method": "main", | ||||
|         "interval": 86400, | ||||
|         "retry_on_error": "true", | ||||
|         "launch_flags": ["--cdn"] | ||||
|     }, | ||||
|     { | ||||
|         "name": "Project Sync", | ||||
|         "filename": "update_projects.py", | ||||
|         "method": "main", | ||||
|         "interval": 360, | ||||
|         "retry_on_error": "false", | ||||
|         "launch_flags": ["--cdn"] | ||||
|     }, | ||||
|     { | ||||
|         "name": "Likedsongsync 2", | ||||
|         "filename": "run_likedsongsync2.py", | ||||
|         "method": "run", | ||||
|         "interval": 10800, | ||||
|         "retry_on_error": "true", | ||||
|         "launch_flags": [""] | ||||
|     } | ||||
| ] | ||||
|  | @ -9,7 +9,11 @@ import os | |||
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | ||||
| import top_lib | ||||
| 
 | ||||
| # load .env file | ||||
| load_dotenv() | ||||
| 
 | ||||
| # Define your playlist IDs | ||||
| LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID') | ||||
| 
 | ||||
| def progress_bar(current, total, last_time_stamp=float, etastr=None): | ||||
|     '''A function to print a progress bar to the terminal. | ||||
|  | @ -151,16 +155,8 @@ def add_track_to_playlist(playlist_id, track_uri): | |||
|     track_uri: The URI of the track to add to the playlist''' | ||||
|     sp.playlist_add_items(playlist_id, [track_uri]) | ||||
| 
 | ||||
| def main(): | ||||
|     # load .env file | ||||
|     load_dotenv() | ||||
| 
 | ||||
|     # because im lazy | ||||
|     global VERBOSE_LOGGING | ||||
|     global sp | ||||
| 
 | ||||
|     # Define your playlist IDs | ||||
|     LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID') | ||||
| if __name__ == "__main__": | ||||
| 
 | ||||
|     # Parse command-line arguments | ||||
|     VERBOSE_LOGGING = "-v" in sys.argv or "--verbose" in sys.argv | ||||
|  | @ -262,7 +258,3 @@ def main(): | |||
|             except Exception: | ||||
|             #except e: | ||||
|                 continue | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
|  |  | |||
|  | @ -101,7 +101,6 @@ def get_languagages(repo, access_token): | |||
|         print(f"[{repo}] Error fetching languages           ", end="\n") | ||||
|         return None | ||||
| 
 | ||||
| def main(): | ||||
| # Path to the projects.json file | ||||
| projects_json_path = os.path.expanduser("~/.cache/gh-projects/projects.json") | ||||
| # create the directory if it doesn't exist | ||||
|  | @ -164,7 +163,3 @@ def main(): | |||
| os.system(f"rclone copy {projects_json_path} cdn:cdn/api/projects/") | ||||
| 
 | ||||
| print("Uploaded projects.json to cdn") | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue