mirror of
https://github.com/JonasunderscoreJones/turbo-octo-potato.git
synced 2025-10-24 10:39:19 +02:00
Compare commits
10 commits
58c5233413
...
4fd2404f36
| Author | SHA1 | Date | |
|---|---|---|---|
| 4fd2404f36 | |||
| 065aa592f9 | |||
| 8766ffb9a9 | |||
| 58508e1d27 | |||
| 745e78d227 | |||
| 79cb848f92 | |||
| 0c92b155cd | |||
| ce1b6cdba1 | |||
| 30581a554a | |||
| a1de9edebe |
8 changed files with 454 additions and 115 deletions
92
github_forgejo_syncer.py
Normal file
92
github_forgejo_syncer.py
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
import requests
|
||||
import os
|
||||
import dotenv
|
||||
|
||||
# Load the environment variables
|
||||
dotenv.load_dotenv()
|
||||
|
||||
# Configuration: Set your GitHub and Forgejo credentials and URLs
|
||||
GITHUB_USERNAME = os.getenv('GITHUB_USERNAME')
|
||||
GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')
|
||||
FORGEJO_USERNAME = os.getenv('FORGEJO_USERNAME')
|
||||
FORGEJO_API_URL = os.getenv('FORGEJO_API_URL')
|
||||
FORGEJO_TOKEN = os.getenv('FORGEJO_TOKEN')
|
||||
|
||||
REPO_BLACKLIST = ["ZtereoMUSIC", "nicer-skies", "epr_grader"]
|
||||
|
||||
# Fetch repositories from GitHub
|
||||
def get_github_repositories():
|
||||
github_url = f'https://api.github.com/users/{GITHUB_USERNAME}/repos'
|
||||
headers = {'Authorization': f'token {GITHUB_TOKEN}'}
|
||||
repos = []
|
||||
page = 1
|
||||
|
||||
while True:
|
||||
response = requests.get(github_url, headers=headers, params={'page': page, 'per_page': 100})
|
||||
if response.status_code != 200:
|
||||
print(f"Error fetching GitHub repositories: {response.text}")
|
||||
break
|
||||
|
||||
data = response.json()
|
||||
if not data: # No more repositories
|
||||
break
|
||||
|
||||
repos.extend(data)
|
||||
page += 1
|
||||
|
||||
return repos
|
||||
|
||||
# Check if a repository exists on Forgejo
|
||||
def check_forgejo_repo_exists(repo_name):
|
||||
forgejo_url = f'{FORGEJO_API_URL}/repos/{FORGEJO_USERNAME}/{repo_name}'
|
||||
headers = {'Authorization': f'token {FORGEJO_TOKEN}'}
|
||||
response = requests.get(forgejo_url, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
return True # Repo exists
|
||||
elif response.status_code == 404:
|
||||
return False # Repo does not exist
|
||||
else:
|
||||
print(f"Error checking repository on Forgejo: {response.text}")
|
||||
return False
|
||||
|
||||
# Create a mirror repository on Forgejo
|
||||
def create_forgejo_repo_mirror(github_repo):
|
||||
forgejo_url = f'{FORGEJO_API_URL}/repos/migrate'
|
||||
headers = {'Authorization': f'token {FORGEJO_TOKEN}', 'Content-Type': 'application/json'}
|
||||
|
||||
# Prepare the payload
|
||||
payload = {
|
||||
'clone_addr': github_repo['clone_url'],
|
||||
'repo_name': github_repo['name'],
|
||||
'private': github_repo['private'],
|
||||
'mirror': True,
|
||||
'description': github_repo.get('description', ''),
|
||||
}
|
||||
|
||||
response = requests.post(forgejo_url, json=payload, headers=headers)
|
||||
|
||||
if response.status_code == 201:
|
||||
print(f"Created mirror for {github_repo['name']}")
|
||||
else:
|
||||
print(f"Error creating mirror for {github_repo['name']}: {response.text}")
|
||||
|
||||
# Main script
|
||||
def main():
|
||||
print("Fetching GitHub repositories...")
|
||||
github_repos = get_github_repositories()
|
||||
|
||||
for github_repo in github_repos:
|
||||
repo_name = github_repo['name']
|
||||
print(f"Checking if {repo_name} exists on Forgejo...")
|
||||
|
||||
if repo_name in REPO_BLACKLIST:
|
||||
print(f"Repository {repo_name} is blacklisted. Skipping.")
|
||||
elif not check_forgejo_repo_exists(repo_name):
|
||||
print(f"Repository {repo_name} does not exist on Forgejo. Creating mirror...")
|
||||
create_forgejo_repo_mirror(github_repo)
|
||||
else:
|
||||
print(f"Repository {repo_name} already exists on Forgejo. Skipping.")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
30
mydreamsetup_achievements/mydreamsetup_placefurniture.py
Normal file
30
mydreamsetup_achievements/mydreamsetup_placefurniture.py
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
import pyautogui
|
||||
import time
|
||||
|
||||
# Constant delay in seconds
|
||||
DELAY = 0.125
|
||||
DELAY = 0
|
||||
|
||||
# Define the main function to perform the clicks
|
||||
def perform_clicks():
|
||||
while True:
|
||||
# Click on pixel (300, 150)
|
||||
pyautogui.click(300, 150)
|
||||
time.sleep(DELAY)
|
||||
|
||||
# Click on pixel (960, 530)
|
||||
pyautogui.click(960, 530)
|
||||
time.sleep(DELAY)
|
||||
|
||||
# Click on pixel (960, 530) again
|
||||
pyautogui.click(960, 530)
|
||||
time.sleep(DELAY)
|
||||
|
||||
# Click on pixel (960, 555)
|
||||
pyautogui.click(960, 555)
|
||||
time.sleep(DELAY)
|
||||
|
||||
# Start the clicking loop
|
||||
if __name__ == "__main__":
|
||||
time.sleep(5)
|
||||
perform_clicks()
|
||||
128
rpopfetch.py
128
rpopfetch.py
|
|
@ -247,87 +247,95 @@ def fetch_monthly_page(wiki_link, subreddit_name):
|
|||
print(f"Error fetching Reddit wiki page: {e}")
|
||||
return None
|
||||
|
||||
UPLOAD_TO_CDN = True if "--cdn" in sys.argv else False
|
||||
SEND_WEBHOOK = False if "--no-webhook" in sys.argv else False if "-nwh" in sys.argv else True
|
||||
def main():
|
||||
UPLOAD_TO_CDN = True if "--cdn" in sys.argv else False
|
||||
SEND_WEBHOOK = False if "--no-webhook" in sys.argv else False if "-nwh" in sys.argv else True
|
||||
|
||||
# reddit infos
|
||||
subreddit_name = "kpop"
|
||||
wiki_page_name = "upcoming-releases/archive"
|
||||
# because im lazy
|
||||
global reddit
|
||||
global progress
|
||||
|
||||
# reddit instance
|
||||
dotenv.load_dotenv()
|
||||
# reddit infos
|
||||
subreddit_name = "kpop"
|
||||
wiki_page_name = "upcoming-releases/archive"
|
||||
|
||||
reddit = praw.Reddit(
|
||||
client_id=os.getenv('REDDIT_CLIENT_ID'),
|
||||
client_secret=os.getenv('REDDIT_CLIENT_SECRET'),
|
||||
user_agent=os.getenv('REDDIT_USER_AGENT')
|
||||
)
|
||||
# reddit instance
|
||||
dotenv.load_dotenv()
|
||||
|
||||
# fetch subreddit
|
||||
print("Fetching Months...")
|
||||
reddit = praw.Reddit(
|
||||
client_id=os.getenv('REDDIT_CLIENT_ID'),
|
||||
client_secret=os.getenv('REDDIT_CLIENT_SECRET'),
|
||||
user_agent=os.getenv('REDDIT_USER_AGENT')
|
||||
)
|
||||
|
||||
try:
|
||||
subreddit = reddit.subreddit(subreddit_name)
|
||||
except praw.exceptions.PRAWException as e:
|
||||
print(f"Error fetching subreddit: {e}")
|
||||
# fetch subreddit
|
||||
print("Fetching Months...")
|
||||
|
||||
# fetch wiki page
|
||||
content = fetch_main_reddit_wiki_page(subreddit_name, wiki_page_name)
|
||||
try:
|
||||
subreddit = reddit.subreddit(subreddit_name)
|
||||
except praw.exceptions.PRAWException as e:
|
||||
print(f"Error fetching subreddit: {e}")
|
||||
|
||||
print("Done!")
|
||||
# fetch wiki page
|
||||
content = fetch_main_reddit_wiki_page(subreddit_name, wiki_page_name)
|
||||
|
||||
if content:
|
||||
print("Done!")
|
||||
|
||||
json_data = []
|
||||
if content:
|
||||
|
||||
for wiki_link in content[::-1]:
|
||||
json_data = []
|
||||
|
||||
progress = int(content[::-1].index(wiki_link)+1/len(content)*100)
|
||||
for wiki_link in content[::-1]:
|
||||
|
||||
if progress < 10:
|
||||
progress = " " + str(progress)
|
||||
elif progress < 100:
|
||||
progress = " " + str(progress)
|
||||
progress = int(content[::-1].index(wiki_link)+1/len(content)*100)
|
||||
|
||||
#print(" ==>", end="\n")
|
||||
print(f"[{progress}%] Fetching monthly page: " + wiki_link, end="\r")
|
||||
if progress < 10:
|
||||
progress = " " + str(progress)
|
||||
elif progress < 100:
|
||||
progress = " " + str(progress)
|
||||
|
||||
# sleep for 2 seconds to avoid getting rate limited
|
||||
# reddit api is awful
|
||||
time.sleep(2)
|
||||
#print(" ==>", end="\n")
|
||||
print(f"[{progress}%] Fetching monthly page: " + wiki_link, end="\r")
|
||||
|
||||
try:
|
||||
# fetch the monthly page and parse it
|
||||
json_data += fetch_monthly_page(wiki_link, subreddit_name)
|
||||
except Exception as e:
|
||||
# write json_data to file
|
||||
with open(f"{subreddit_name}_upcoming_releases-CANCELED.json", "w") as f:
|
||||
f.write(json.dumps(json_data, indent=4))
|
||||
print("Error fetching monthly page: " + wiki_link)
|
||||
print(e)
|
||||
exit(1)
|
||||
# sleep for 2 seconds to avoid getting rate limited
|
||||
# reddit api is awful
|
||||
# time.sleep(2)
|
||||
|
||||
#print(f"[{progress}%] Parsed monthly page: " + wiki_link + " ", end="\r")
|
||||
try:
|
||||
# fetch the monthly page and parse it
|
||||
json_data += fetch_monthly_page(wiki_link, subreddit_name)
|
||||
except Exception as e:
|
||||
# write json_data to file
|
||||
with open(f"{subreddit_name}_upcoming_releases-CANCELED.json", "w") as f:
|
||||
f.write(json.dumps(json_data, indent=4))
|
||||
print("Error fetching monthly page: " + wiki_link)
|
||||
print(e)
|
||||
exit(1)
|
||||
|
||||
# add a first element to the list that holds the date of the last update
|
||||
json_data.insert(0, {"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) + " UTC"})
|
||||
#print(f"[{progress}%] Parsed monthly page: " + wiki_link + " ", end="\r")
|
||||
|
||||
# add a first element to the list that holds the date of the last update
|
||||
json_data.insert(0, {"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) + " UTC"})
|
||||
|
||||
|
||||
# save json_data to file
|
||||
with open(f"rkpop_data.json", "w") as f:
|
||||
f.write(json.dumps(json_data, indent=4))
|
||||
# save json_data to file
|
||||
with open(f"rkpop_data.json", "w") as f:
|
||||
f.write(json.dumps(json_data, indent=4))
|
||||
|
||||
print("Fetched", len(json_data) - 1, "entries.")
|
||||
print("Fetched", len(json_data) - 1, "entries.")
|
||||
|
||||
cdn_upload_cmd = "rclone copy rkpop_data.json cdn:cdn/api/kcomebacks/"
|
||||
cdn_upload_cmd = "rclone copy rkpop_data.json cdn:cdn/api/kcomebacks/"
|
||||
|
||||
if UPLOAD_TO_CDN:
|
||||
print("Uploading...")
|
||||
os.system(cdn_upload_cmd)
|
||||
elif input("Upload to cdn? [Y/n]") in ["Y", "y", ""]:
|
||||
print("Uploading...")
|
||||
os.system(cdn_upload_cmd)
|
||||
if UPLOAD_TO_CDN:
|
||||
print("Uploading...")
|
||||
os.system(cdn_upload_cmd)
|
||||
elif input("Upload to cdn? [Y/n]") in ["Y", "y", ""]:
|
||||
print("Uploading...")
|
||||
os.system(cdn_upload_cmd)
|
||||
|
||||
|
||||
if SEND_WEBHOOK:
|
||||
rpop_webhook.send_webhook()
|
||||
if SEND_WEBHOOK:
|
||||
rpop_webhook.send_webhook()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
|||
5
run_likedsongsync2.py
Normal file
5
run_likedsongsync2.py
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
# import and run spotify_scripts/likedsongsync2.py
|
||||
from spotify_scripts.likedsongsync2 import main
|
||||
|
||||
def run():
|
||||
main()
|
||||
165
script_interval_runner.py
Normal file
165
script_interval_runner.py
Normal file
|
|
@ -0,0 +1,165 @@
|
|||
import json
|
||||
import time
|
||||
import importlib
|
||||
import os
|
||||
import logging
|
||||
import threading
|
||||
import sys
|
||||
import requests
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
class ScriptConfig:
|
||||
def __init__(self, name, filename, method, interval, retry_on_error, launch_flags):
|
||||
self.name = name
|
||||
self.filename = filename
|
||||
self.method = method
|
||||
self.interval = interval
|
||||
self.retry_on_error = retry_on_error
|
||||
self.launch_flags = launch_flags # This will store the flags
|
||||
self.failure_count = 0 # Track the failure count
|
||||
|
||||
def __repr__(self):
|
||||
return f"ScriptConfig(name={self.name}, filename={self.filename}, method={self.method}, interval={self.interval}, retry_on_error={self.retry_on_error}, launch_flags={self.launch_flags}, failure_count={self.failure_count})"
|
||||
|
||||
|
||||
class ScriptRunner:
|
||||
def __init__(self, config_file):
|
||||
self.scripts = []
|
||||
self.load_config(config_file)
|
||||
# Retrieve the Discord webhook URLs from the environment variables
|
||||
self.logs_webhook_url = os.getenv('SCRIPT_LOGS_DISCORD_WEBHOOK_URL')
|
||||
self.errors_webhook_url = os.getenv('SCRIPT_ERROR_DISCORD_WEBHOOK_URL')
|
||||
|
||||
def load_config(self, config_file):
|
||||
"""Load script configurations from a JSON file."""
|
||||
if not os.path.exists(config_file):
|
||||
logging.error(f"Config file '{config_file}' not found.")
|
||||
return
|
||||
|
||||
with open(config_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
for item in data:
|
||||
# Read launch_flags as a list (it could be an empty list if no flags are present)
|
||||
launch_flags = item.get('launch_flags', [])
|
||||
|
||||
script_config = ScriptConfig(
|
||||
name=item['name'],
|
||||
filename=item['filename'],
|
||||
method=item['method'],
|
||||
interval=item['interval'],
|
||||
retry_on_error=item['retry_on_error'].lower() == 'true',
|
||||
launch_flags=launch_flags # Store launch flags
|
||||
)
|
||||
self.scripts.append(script_config)
|
||||
|
||||
def send_to_discord(self, webhook_url, message):
|
||||
"""Send a message to a Discord webhook."""
|
||||
if webhook_url:
|
||||
try:
|
||||
payload = {'content': message}
|
||||
requests.post(webhook_url, json=payload)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to send message to Discord: {e}")
|
||||
|
||||
def run_script(self, script_config):
|
||||
"""Run the script as per configuration."""
|
||||
script_path = os.path.join(os.getcwd(), script_config.filename)
|
||||
|
||||
if not os.path.exists(script_path):
|
||||
logging.error(f"Script file '{script_config.filename}' not found in the current directory.")
|
||||
return
|
||||
|
||||
try:
|
||||
# Import the script as a module
|
||||
script_name = script_config.filename[:-3] # Strip ".py" from the filename
|
||||
module = importlib.import_module(script_name)
|
||||
|
||||
# Temporarily modify sys.argv to simulate command-line arguments
|
||||
original_argv = sys.argv
|
||||
sys.argv = [script_name] + script_config.launch_flags # Simulate command-line args
|
||||
|
||||
# Check if method exists
|
||||
if hasattr(module, script_config.method):
|
||||
method = getattr(module, script_config.method)
|
||||
retry_count = 0
|
||||
while True:
|
||||
try:
|
||||
logging.info(f"Running {script_config.name} with flags: {script_config.launch_flags}...")
|
||||
# Call the method without needing to pass arguments (sys.argv is used)
|
||||
method()
|
||||
# Log successful run to Discord
|
||||
if self.logs_webhook_url:
|
||||
self.send_to_discord(self.logs_webhook_url, f"Successfully ran {script_config.name}")
|
||||
logging.info(f"Completed {script_config.name}.")
|
||||
time.sleep(script_config.interval) # Sleep after execution before next run
|
||||
script_config.failure_count = 0 # Reset failure count after a successful run
|
||||
except Exception as e:
|
||||
logging.error(f"Error running {script_config.name}: {e}")
|
||||
script_config.failure_count += 1
|
||||
if script_config.failure_count >= 3: # Notify on third failure
|
||||
error_message = f"{script_config.name} has failed 3 times. Last error: {e}"
|
||||
if self.errors_webhook_url:
|
||||
self.send_to_discord(self.errors_webhook_url, error_message)
|
||||
logging.error(f"Max retry attempts reached for {script_config.name}. Error sent to Discord.")
|
||||
return 500 # return error code
|
||||
if script_config.retry_on_error:
|
||||
retry_count += 1
|
||||
if retry_count >= 3: # Retry 3 times
|
||||
logging.error(f"Max retry attempts reached for {script_config.name}.")
|
||||
return 500
|
||||
logging.info(f"Retrying {script_config.name}... ({retry_count}/3)")
|
||||
else:
|
||||
break
|
||||
else:
|
||||
logging.error(f"Method '{script_config.method}' not found in {script_config.filename}")
|
||||
|
||||
# Restore the original sys.argv
|
||||
sys.argv = original_argv
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to run script {script_config.filename}: {e}")
|
||||
return 404 # return not found if script not found
|
||||
|
||||
return 200 # return success
|
||||
|
||||
def run_script_in_thread(self, script_config):
|
||||
"""Run the script in a separate thread with an endless loop."""
|
||||
def target():
|
||||
error_count = 0
|
||||
while error_count < 3:
|
||||
error_code = self.run_script(script_config)
|
||||
if error_code == 404:
|
||||
error_count += 1
|
||||
time.sleep(5)
|
||||
elif error_code == 500:
|
||||
break
|
||||
else:
|
||||
error_count = 0 # Reset error count on success
|
||||
|
||||
logging.error(f"Script {script_config.name} has failed 3 times. Stopping execution.")
|
||||
|
||||
thread = threading.Thread(target=target)
|
||||
thread.daemon = True # Allow the thread to exit when the main program exits
|
||||
thread.start()
|
||||
return thread
|
||||
|
||||
def run_all_scripts(self):
|
||||
"""Start all scripts concurrently in separate threads with an endless loop."""
|
||||
threads = []
|
||||
for script_config in self.scripts:
|
||||
thread = self.run_script_in_thread(script_config)
|
||||
threads.append(thread)
|
||||
|
||||
# The main thread only needs to start the loops, it doesn't need to join since the threads are infinite
|
||||
logging.info("All scripts have been started and are running in their respective threads.")
|
||||
|
||||
# Main execution
|
||||
if __name__ == "__main__":
|
||||
config_file = 'scripts_config.json'
|
||||
runner = ScriptRunner(config_file)
|
||||
runner.run_all_scripts()
|
||||
|
||||
# Keep the main program running (this is important for the daemon threads to keep running)
|
||||
while True:
|
||||
time.sleep(1)
|
||||
26
scripts_config.json
Normal file
26
scripts_config.json
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
[
|
||||
{
|
||||
"name": "Kcomebacks Sync",
|
||||
"filename": "rpopfetch.py",
|
||||
"method": "main",
|
||||
"interval": 86400,
|
||||
"retry_on_error": "true",
|
||||
"launch_flags": ["--cdn"]
|
||||
},
|
||||
{
|
||||
"name": "Project Sync",
|
||||
"filename": "update_projects.py",
|
||||
"method": "main",
|
||||
"interval": 360,
|
||||
"retry_on_error": "false",
|
||||
"launch_flags": ["--cdn"]
|
||||
},
|
||||
{
|
||||
"name": "Likedsongsync 2",
|
||||
"filename": "run_likedsongsync2.py",
|
||||
"method": "run",
|
||||
"interval": 10800,
|
||||
"retry_on_error": "true",
|
||||
"launch_flags": [""]
|
||||
}
|
||||
]
|
||||
|
|
@ -9,11 +9,7 @@ import os
|
|||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
import top_lib
|
||||
|
||||
# load .env file
|
||||
load_dotenv()
|
||||
|
||||
# Define your playlist IDs
|
||||
LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID')
|
||||
|
||||
def progress_bar(current, total, last_time_stamp=float, etastr=None):
|
||||
'''A function to print a progress bar to the terminal.
|
||||
|
|
@ -155,8 +151,16 @@ def add_track_to_playlist(playlist_id, track_uri):
|
|||
track_uri: The URI of the track to add to the playlist'''
|
||||
sp.playlist_add_items(playlist_id, [track_uri])
|
||||
|
||||
def main():
|
||||
# load .env file
|
||||
load_dotenv()
|
||||
|
||||
if __name__ == "__main__":
|
||||
# because im lazy
|
||||
global VERBOSE_LOGGING
|
||||
global sp
|
||||
|
||||
# Define your playlist IDs
|
||||
LIKEDSONGPLAYLIST_ID = os.getenv('LIKEDSONGPLAYLIST_ID')
|
||||
|
||||
# Parse command-line arguments
|
||||
VERBOSE_LOGGING = "-v" in sys.argv or "--verbose" in sys.argv
|
||||
|
|
@ -258,3 +262,7 @@ if __name__ == "__main__":
|
|||
except Exception:
|
||||
#except e:
|
||||
continue
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
|||
|
|
@ -101,65 +101,70 @@ def get_languagages(repo, access_token):
|
|||
print(f"[{repo}] Error fetching languages ", end="\n")
|
||||
return None
|
||||
|
||||
# Path to the projects.json file
|
||||
projects_json_path = os.path.expanduser("~/.cache/gh-projects/projects.json")
|
||||
# create the directory if it doesn't exist
|
||||
os.makedirs(os.path.dirname(projects_json_path), exist_ok=True)
|
||||
def main():
|
||||
# Path to the projects.json file
|
||||
projects_json_path = os.path.expanduser("~/.cache/gh-projects/projects.json")
|
||||
# create the directory if it doesn't exist
|
||||
os.makedirs(os.path.dirname(projects_json_path), exist_ok=True)
|
||||
|
||||
if not DONTDOWNLOAD:
|
||||
# fetch the projects.json file from https://cdn.jonasjones.dev/api/projects/projects.json
|
||||
projects_json_url = "https://cdn.jonasjones.dev/api/projects/projects.json"
|
||||
projects_json = requests.get(projects_json_url)
|
||||
with open(projects_json_path, "wb") as file:
|
||||
file.write(projects_json.content)
|
||||
if not DONTDOWNLOAD:
|
||||
# fetch the projects.json file from https://cdn.jonasjones.dev/api/projects/projects.json
|
||||
projects_json_url = "https://cdn.jonasjones.dev/api/projects/projects.json"
|
||||
projects_json = requests.get(projects_json_url)
|
||||
with open(projects_json_path, "wb") as file:
|
||||
file.write(projects_json.content)
|
||||
|
||||
verboseprint(f"Fetched projects.json file")
|
||||
elif not os.path.exists("~/.cache/gh-projects/projects.json"):
|
||||
FileNotFoundError("File 'projects.json' not found. Cannot proceed without \
|
||||
downloading it. Remove '-dd' or '--dontdownload' from the launch arguments.")
|
||||
else:
|
||||
print("Skipping download of 'projects.json'")
|
||||
verboseprint(f"Fetched projects.json file")
|
||||
elif not os.path.exists("~/.cache/gh-projects/projects.json"):
|
||||
FileNotFoundError("File 'projects.json' not found. Cannot proceed without \
|
||||
downloading it. Remove '-dd' or '--dontdownload' from the launch arguments.")
|
||||
else:
|
||||
print("Skipping download of 'projects.json'")
|
||||
|
||||
# Load the existing projects.json file
|
||||
with open(projects_json_path, "r") as file:
|
||||
projects_data = json.load(file)
|
||||
# Load the existing projects.json file
|
||||
with open(projects_json_path, "r") as file:
|
||||
projects_data = json.load(file)
|
||||
|
||||
if not DONTUPDATEGH:
|
||||
print("Fetching Repo data...")
|
||||
if not DONTUPDATEGH:
|
||||
print("Fetching Repo data...")
|
||||
|
||||
# Update the last_update (Unix timestamp) for each project
|
||||
for project in projects_data:
|
||||
gh_api = project.get("gh_api")
|
||||
if gh_api:
|
||||
last_commit_timestamp = get_last_commit_timestamp(gh_api, GITHUB_API_TOKEN)
|
||||
last_release_version = get_last_release_version(gh_api, GITHUB_API_TOKEN)
|
||||
if last_commit_timestamp:
|
||||
project["last_update"] = last_commit_timestamp
|
||||
else:
|
||||
project["last_update"] = 0
|
||||
if last_release_version:
|
||||
project["version"] = last_release_version.replace("v", "")
|
||||
languages = get_languagages(gh_api, GITHUB_API_TOKEN)
|
||||
if languages:
|
||||
project["languages"] = languages
|
||||
else:
|
||||
print("Skipping Github updates...")
|
||||
# Update the last_update (Unix timestamp) for each project
|
||||
for project in projects_data:
|
||||
gh_api = project.get("gh_api")
|
||||
if gh_api:
|
||||
last_commit_timestamp = get_last_commit_timestamp(gh_api, GITHUB_API_TOKEN)
|
||||
last_release_version = get_last_release_version(gh_api, GITHUB_API_TOKEN)
|
||||
if last_commit_timestamp:
|
||||
project["last_update"] = last_commit_timestamp
|
||||
else:
|
||||
project["last_update"] = 0
|
||||
if last_release_version:
|
||||
project["version"] = last_release_version.replace("v", "")
|
||||
languages = get_languagages(gh_api, GITHUB_API_TOKEN)
|
||||
if languages:
|
||||
project["languages"] = languages
|
||||
else:
|
||||
print("Skipping Github updates...")
|
||||
|
||||
# remove first element
|
||||
projects_data.pop(0)
|
||||
# remove first element
|
||||
projects_data.pop(0)
|
||||
|
||||
# sort projects alphabetically
|
||||
projects_data = sorted(projects_data, key=lambda x: x['last_update'], reverse=True)
|
||||
# sort projects alphabetically
|
||||
projects_data = sorted(projects_data, key=lambda x: x['last_update'], reverse=True)
|
||||
|
||||
# add a first element to the list that holds the date of the last update
|
||||
projects_data.insert(0, {"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) + " UTC"})
|
||||
# add a first element to the list that holds the date of the last update
|
||||
projects_data.insert(0, {"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) + " UTC"})
|
||||
|
||||
# Save the updated data back to the projects.json file
|
||||
with open(projects_json_path, "w") as file:
|
||||
json.dump(projects_data, file, indent=2)
|
||||
# Save the updated data back to the projects.json file
|
||||
with open(projects_json_path, "w") as file:
|
||||
json.dump(projects_data, file, indent=2)
|
||||
|
||||
print("Updated projects.json\nUploading to cdn...")
|
||||
print("Updated projects.json\nUploading to cdn...")
|
||||
|
||||
os.system(f"rclone copy {projects_json_path} cdn:cdn/api/projects/")
|
||||
os.system(f"rclone copy {projects_json_path} cdn:cdn/api/projects/")
|
||||
|
||||
print("Uploaded projects.json to cdn")
|
||||
print("Uploaded projects.json to cdn")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue