From e489764ff8a59bfee9415aeee1188001a3d80b8d Mon Sep 17 00:00:00 2001 From: Nathan Woodburn Date: Fri, 21 Nov 2025 23:05:40 +1100 Subject: [PATCH] fix: Add escape char for curl rendering and format python files --- addCoin.py | 43 +++++----- blueprints/acme.py | 6 +- blueprints/api.py | 185 ++++++++++++++++++++++------------------ blueprints/blog.py | 73 +++++++++------- blueprints/now.py | 58 ++++++++----- blueprints/podcast.py | 3 +- blueprints/sol.py | 29 +++++-- blueprints/spotify.py | 21 +++-- blueprints/template.py | 4 +- blueprints/wellknown.py | 13 ++- cache_helper.py | 42 ++++++--- cleanSite.py | 25 +++--- curl.py | 101 ++++++++++++++++------ mail.py | 69 ++++++--------- main.py | 26 ++++-- server.py | 24 ++++-- tools.py | 48 ++++++----- 17 files changed, 461 insertions(+), 309 deletions(-) diff --git a/addCoin.py b/addCoin.py index 903ddd0..dc69767 100644 --- a/addCoin.py +++ b/addCoin.py @@ -1,35 +1,38 @@ import os import json -if not os.path.exists('.well-known/wallets'): - os.makedirs('.well-known/wallets') +if not os.path.exists(".well-known/wallets"): + os.makedirs(".well-known/wallets") -def addCoin(token:str, name:str, address:str): - with open('.well-known/wallets/'+token.upper(),'w') as f: + +def addCoin(token: str, name: str, address: str): + with open(".well-known/wallets/" + token.upper(), "w") as f: f.write(address) - - with open('.well-known/wallets/.coins','r') as f: + + with open(".well-known/wallets/.coins", "r") as f: coins = json.load(f) - - coins[token.upper()] = f'{name} ({token.upper()})' - with open('.well-known/wallets/.coins','w') as f: + + coins[token.upper()] = f"{name} ({token.upper()})" + with open(".well-known/wallets/.coins", "w") as f: f.write(json.dumps(coins, indent=4)) -def addDomain(token:str, domain:str): - with open('.well-known/wallets/.domains','r') as f: + +def addDomain(token: str, domain: str): + with open(".well-known/wallets/.domains", "r") as f: domains = json.load(f) - + domains[token.upper()] = domain - with open('.well-known/wallets/.domains','w') as f: + with open(".well-known/wallets/.domains", "w") as f: f.write(json.dumps(domains, indent=4)) -if __name__ == '__main__': + +if __name__ == "__main__": # Ask user for token - token = input('Enter token symbol: ') - name = input('Enter token name: ') - address = input('Enter wallet address: ') + token = input("Enter token symbol: ") + name = input("Enter token name: ") + address = input("Enter wallet address: ") addCoin(token, name, address) - if input('Do you want to add a domain? (y/n): ').lower() == 'y': - domain = input('Enter domain: ') - addDomain(token, domain) \ No newline at end of file + if input("Do you want to add a domain? (y/n): ").lower() == "y": + domain = input("Enter domain: ") + addDomain(token, domain) diff --git a/blueprints/acme.py b/blueprints/acme.py index bf1e3c2..2e283fe 100644 --- a/blueprints/acme.py +++ b/blueprints/acme.py @@ -3,7 +3,7 @@ import os from cloudflare import Cloudflare from tools import json_response -app = Blueprint('acme', __name__) +app = Blueprint("acme", __name__) @app.route("/hnsdoh-acme", methods=["POST"]) @@ -23,7 +23,9 @@ def post(): zone = cf.zones.list(name="hnsdoh.com").to_dict() zone_id = zone["result"][0]["id"] # type: ignore existing_records = cf.dns.records.list( - zone_id=zone_id, type="TXT", name="_acme-challenge.hnsdoh.com" # type: ignore + zone_id=zone_id, + type="TXT", + name="_acme-challenge.hnsdoh.com", # type: ignore ).to_dict() record_id = existing_records["result"][0]["id"] # type: ignore cf.dns.records.delete(dns_record_id=record_id, zone_id=zone_id) diff --git a/blueprints/api.py b/blueprints/api.py index f7c98cc..e3bf440 100644 --- a/blueprints/api.py +++ b/blueprints/api.py @@ -18,7 +18,7 @@ HTTP_NOT_FOUND = 404 HTTP_UNSUPPORTED_MEDIA = 415 HTTP_SERVER_ERROR = 500 -app = Blueprint('api', __name__, url_prefix='/api/v1') +app = Blueprint("api", __name__, url_prefix="/api/v1") # Register solana blueprint app.register_blueprint(sol.app) @@ -27,34 +27,38 @@ app.register_blueprint(sol.app) @app.route("/help") def help(): """Provide API documentation and help.""" - return jsonify({ - "message": "Welcome to Nathan.Woodburn/ API! This is a personal website. For more information, visit https://nathan.woodburn.au", - "endpoints": { - "/time": "Get the current time", - "/timezone": "Get the current timezone", - "/message": "Get the message from the config", - "/project": "Get the current project from git", - "/version": "Get the current version of the website", - "/page_date?url=URL&verbose=BOOL": "Get the last modified date of a webpage (verbose is optional, default false)", - "/tools": "Get a list of tools used by Nathan Woodburn", - "/playing": "Get the currently playing Spotify track", - "/status": "Just check if the site is up", - "/ping": "Just check if the site is up", - "/ip": "Get your IP address", - "/headers": "Get your request headers", - "/help": "Get this help message" - }, - "base_url": "/api/v1", - "version": getGitCommit(), - "ip": getClientIP(request), - "status": HTTP_OK - }) + return jsonify( + { + "message": "Welcome to Nathan.Woodburn/ API! This is a personal website. For more information, visit https://nathan.woodburn.au", + "endpoints": { + "/time": "Get the current time", + "/timezone": "Get the current timezone", + "/message": "Get the message from the config", + "/project": "Get the current project from git", + "/version": "Get the current version of the website", + "/page_date?url=URL&verbose=BOOL": "Get the last modified date of a webpage (verbose is optional, default false)", + "/tools": "Get a list of tools used by Nathan Woodburn", + "/playing": "Get the currently playing Spotify track", + "/status": "Just check if the site is up", + "/ping": "Just check if the site is up", + "/ip": "Get your IP address", + "/headers": "Get your request headers", + "/help": "Get this help message", + }, + "base_url": "/api/v1", + "version": getGitCommit(), + "ip": getClientIP(request), + "status": HTTP_OK, + } + ) + @app.route("/status") @app.route("/ping") def status(): return json_response(request, "200 OK", HTTP_OK) + @app.route("/version") def version(): """Get the current version of the website.""" @@ -68,45 +72,44 @@ def time(): timezone_offset = datetime.timedelta(hours=nc_config["time-zone"]) timezone = datetime.timezone(offset=timezone_offset) current_time = datetime.datetime.now(tz=timezone) - return jsonify({ - "timestring": current_time.strftime("%A, %B %d, %Y %I:%M %p"), - "timestamp": current_time.timestamp(), - "timezone": nc_config["time-zone"], - "timeISO": current_time.isoformat(), - "ip": getClientIP(request), - "status": HTTP_OK - }) + return jsonify( + { + "timestring": current_time.strftime("%A, %B %d, %Y %I:%M %p"), + "timestamp": current_time.timestamp(), + "timezone": nc_config["time-zone"], + "timeISO": current_time.isoformat(), + "ip": getClientIP(request), + "status": HTTP_OK, + } + ) @app.route("/timezone") def timezone(): """Get the current timezone setting.""" nc_config = get_nc_config() - return jsonify({ - "timezone": nc_config["time-zone"], - "ip": getClientIP(request), - "status": HTTP_OK - }) + return jsonify( + { + "timezone": nc_config["time-zone"], + "ip": getClientIP(request), + "status": HTTP_OK, + } + ) @app.route("/message") def message(): """Get the message from the configuration.""" nc_config = get_nc_config() - return jsonify({ - "message": nc_config["message"], - "ip": getClientIP(request), - "status": HTTP_OK - }) + return jsonify( + {"message": nc_config["message"], "ip": getClientIP(request), "status": HTTP_OK} + ) @app.route("/ip") def ip(): """Get the client's IP address.""" - return jsonify({ - "ip": getClientIP(request), - "status": HTTP_OK - }) + return jsonify({"ip": getClientIP(request), "status": HTTP_OK}) @app.route("/email", methods=["POST"]) @@ -114,7 +117,9 @@ def email_post(): """Send an email via the API (requires API key).""" # Verify json if not request.is_json: - return json_response(request, "415 Unsupported Media Type", HTTP_UNSUPPORTED_MEDIA) + return json_response( + request, "415 Unsupported Media Type", HTTP_UNSUPPORTED_MEDIA + ) # Check if api key sent data = request.json @@ -137,7 +142,7 @@ def project(): git = get_git_latest_activity() repo_name = git["repo"]["name"].lower() repo_description = git["repo"]["description"] - + gitinfo = { "name": repo_name, "description": repo_description, @@ -145,13 +150,16 @@ def project(): "website": git["repo"].get("website"), } - return jsonify({ - "repo_name": repo_name, - "repo_description": repo_description, - "repo": gitinfo, - "ip": getClientIP(request), - "status": HTTP_OK - }) + return jsonify( + { + "repo_name": repo_name, + "repo_description": repo_description, + "repo": gitinfo, + "ip": getClientIP(request), + "status": HTTP_OK, + } + ) + @app.route("/tools") def tools(): @@ -161,9 +169,10 @@ def tools(): except Exception as e: print(f"Error getting tools data: {e}") return json_response(request, "500 Internal Server Error", HTTP_SERVER_ERROR) - + return json_response(request, {"tools": tools}, HTTP_OK) + @app.route("/playing") def playing(): """Get the currently playing Spotify track.""" @@ -185,16 +194,12 @@ def headers(): if key.startswith("X-"): # Remove from headers toremove.append(key) - for key in toremove: headers.pop(key) - return jsonify({ - "headers": headers, - "ip": getClientIP(request), - "status": HTTP_OK - }) + return jsonify({"headers": headers, "ip": getClientIP(request), "status": HTTP_OK}) + @app.route("/page_date") def page_date(): @@ -211,33 +216,33 @@ def page_date(): r = requests.get(url, timeout=5) r.raise_for_status() except requests.exceptions.RequestException as e: - return json_response(request, f"400 Bad Request 'url' unreachable: {e}", HTTP_BAD_REQUEST) + return json_response( + request, f"400 Bad Request 'url' unreachable: {e}", HTTP_BAD_REQUEST + ) page_text = r.text # Remove ordinal suffixes globally - page_text = re.sub(r'(\d+)(st|nd|rd|th)', r'\1', page_text, flags=re.IGNORECASE) + page_text = re.sub(r"(\d+)(st|nd|rd|th)", r"\1", page_text, flags=re.IGNORECASE) # Remove HTML comments - page_text = re.sub(r'', '', page_text, flags=re.DOTALL) + page_text = re.sub(r"", "", page_text, flags=re.DOTALL) date_patterns = [ - r'(\d{4})[/-](\d{1,2})[/-](\d{1,2})', # YYYY-MM-DD - r'(\d{1,2})[/-](\d{1,2})[/-](\d{4})', # DD-MM-YYYY - r'(?:Last updated:|Updated:|Updated last:)?\s*(\d{1,2})\s+([A-Za-z]{3,9})[, ]?\s*(\d{4})', # DD Month YYYY - r'(?:\b\w+\b\s+){0,3}([A-Za-z]{3,9})\s+(\d{1,2}),?\s*(\d{4})', # Month DD, YYYY with optional words - r'\b(\d{4})(\d{2})(\d{2})\b', # YYYYMMDD - r'(?:Last updated:|Updated:|Last update)?\s*([A-Za-z]{3,9})\s+(\d{4})', # Month YYYY only + r"(\d{4})[/-](\d{1,2})[/-](\d{1,2})", # YYYY-MM-DD + r"(\d{1,2})[/-](\d{1,2})[/-](\d{4})", # DD-MM-YYYY + r"(?:Last updated:|Updated:|Updated last:)?\s*(\d{1,2})\s+([A-Za-z]{3,9})[, ]?\s*(\d{4})", # DD Month YYYY + r"(?:\b\w+\b\s+){0,3}([A-Za-z]{3,9})\s+(\d{1,2}),?\s*(\d{4})", # Month DD, YYYY with optional words + r"\b(\d{4})(\d{2})(\d{2})\b", # YYYYMMDD + r"(?:Last updated:|Updated:|Last update)?\s*([A-Za-z]{3,9})\s+(\d{4})", # Month YYYY only ] - - # Structured data patterns json_date_patterns = { r'"datePublished"\s*:\s*"([^"]+)"': "published", r'"dateModified"\s*:\s*"([^"]+)"': "modified", r']*?)property\s*=\s*"article:published_time"\s+content\s*=\s*"([^"]+)"': "published", r']*?)property\s*=\s*"article:modified_time"\s+content\s*=\s*"([^"]+)"': "modified", - r' tag containing numbered steps - paragraphs = soup.find_all('p') + paragraphs = soup.find_all("p") for p in paragraphs: content = p.decode_contents() # type: ignore # Check for likely numbered step structure - if re.search(r'1\.\s', content): + if re.search(r"1\.\s", content): # Split into pre-list and numbered steps # Match:
, optional whitespace, then a number and dot - parts = re.split(r'(?:)?\s*(\d+)\.\s', content) + parts = re.split(r"(?:)?\s*(\d+)\.\s", content) # Result: [pre-text, '1', step1, '2', step2, ..., '10', step10] pre_text = parts[0].strip() @@ -85,10 +88,9 @@ def fix_numbered_lists(html): # Assemble the ordered list ol_items = [] for i in range(0, len(steps), 2): - if i+1 < len(steps): - step_html = steps[i+1].strip() - ol_items.append( - f"
  • {step_html}
  • ") + if i + 1 < len(steps): + step_html = steps[i + 1].strip() + ol_items.append(f"
  • {step_html}
  • ") # Build the final list HTML ol_html = "
      \n" + "\n".join(ol_items) + "\n
    " @@ -97,7 +99,7 @@ def fix_numbered_lists(html): new_html = f"{pre_text}
    \n{ol_html}" if pre_text else ol_html # Replace old

    with parsed version - new_fragment = BeautifulSoup(new_html, 'html.parser') + new_fragment = BeautifulSoup(new_html, "html.parser") p.replace_with(new_fragment) break # Only process the first matching

    @@ -134,16 +136,23 @@ def index(): blog_pages = list_page_files() # Create a html list of pages blog_pages = [ - {"name": page.replace("_", " "), "url": f"/blog/{page}", "download": f"/blog/{page}.md"} for page in blog_pages + { + "name": page.replace("_", " "), + "url": f"/blog/{page}", + "download": f"/blog/{page}.md", + } + for page in blog_pages ] # Render the template - return jsonify({ - "status": 200, - "message": "Check out my various blog postsa", - "ip": getClientIP(request), - "blogs": blog_pages - }), 200 + return jsonify( + { + "status": 200, + "message": "Check out my various blog postsa", + "ip": getClientIP(request), + "blogs": blog_pages, + } + ), 200 @app.route("/") @@ -158,14 +167,16 @@ def path(path): # Get the title from the file name title = path.replace("_", " ") - return jsonify({ - "status": 200, - "message": f"Blog post: {title}", - "ip": getClientIP(request), - "title": title, - "content": content, - "download": f"/blog/{path}.md" - }), 200 + return jsonify( + { + "status": 200, + "message": f"Blog post: {title}", + "ip": getClientIP(request), + "title": title, + "content": content, + "download": f"/blog/{path}.md", + } + ), 200 @app.route("/.md") @@ -175,4 +186,4 @@ def path_md(path): return render_template("404.html"), 404 # Return the raw markdown file - return content, 200, {'Content-Type': 'text/plain; charset=utf-8'} + return content, 200, {"Content-Type": "text/plain; charset=utf-8"} diff --git a/blueprints/now.py b/blueprints/now.py index e7ddf17..764dd28 100644 --- a/blueprints/now.py +++ b/blueprints/now.py @@ -8,7 +8,7 @@ from bs4 import BeautifulSoup import re # Create blueprint -app = Blueprint('now', __name__, url_prefix='/now') +app = Blueprint("now", __name__, url_prefix="/now") @lru_cache(maxsize=16) @@ -55,7 +55,10 @@ def render(date, handshake_scripts=None): date_formatted = datetime.datetime.strptime(date, "%y_%m_%d") date_formatted = date_formatted.strftime("%A, %B %d, %Y") - return render_template(f"now/{date}.html", DATE=date_formatted, handshake_scripts=handshake_scripts) + return render_template( + f"now/{date}.html", DATE=date_formatted, handshake_scripts=handshake_scripts + ) + def render_curl(date=None): # If the date is not available, render the latest page @@ -71,12 +74,12 @@ def render_curl(date=None): # Format the date nicely date_formatted = datetime.datetime.strptime(date, "%y_%m_%d") date_formatted = date_formatted.strftime("%A, %B %d, %Y") - + # Load HTML with open(f"templates/now/{date}.html", "r", encoding="utf-8") as f: raw_html = f.read().replace("{{ date }}", date_formatted) - soup = BeautifulSoup(raw_html, 'html.parser') - + soup = BeautifulSoup(raw_html, "html.parser") + posts = [] # Find divs matching your pattern @@ -86,12 +89,12 @@ def render_curl(date=None): for div in divs: # header could be h1/h2/h3 inside the div - header_tag = div.find(["h1", "h2", "h3"]) # type: ignore + header_tag = div.find(["h1", "h2", "h3"]) # type: ignore # content is usually one or more

    tags inside the div - p_tags = div.find_all("p") # type: ignore + p_tags = div.find_all("p") # type: ignore if header_tag and p_tags: - header_text = header_tag.get_text(strip=True) # type: ignore + header_text = header_tag.get_text(strip=True) # type: ignore content_lines = [] for p in p_tags: @@ -99,15 +102,15 @@ def render_curl(date=None): text = p.get_text(strip=False) # Extract any links in the paragraph - links = [a.get("href") for a in p.find_all("a", href=True)] # type: ignore + links = [a.get("href") for a in p.find_all("a", href=True)] # type: ignore # Set max width for text wrapping - + # Wrap text manually wrapped_lines = [] for line in text.splitlines(): while len(line) > MAX_WIDTH: # Find last space within max_width - split_at = line.rfind(' ', 0, MAX_WIDTH) + split_at = line.rfind(" ", 0, MAX_WIDTH) if split_at == -1: split_at = MAX_WIDTH wrapped_lines.append(line[:split_at].rstrip()) @@ -116,7 +119,7 @@ def render_curl(date=None): text = "\n".join(wrapped_lines) if links: - text += "\nLinks: " + ", ".join(links) # type: ignore + text += "\nLinks: " + ", ".join(links) # type: ignore content_lines.append(text) @@ -128,8 +131,9 @@ def render_curl(date=None): for post in posts: response += f"{post['header']}\n\n{post['content']}\n\n" - return render_template("now.ascii", date=date_formatted, content=response, header=get_header()) - + return render_template( + "now.ascii", date=date_formatted, content=response, header=get_header() + ) @app.route("/", strict_slashes=False) @@ -157,8 +161,9 @@ def old(): date_fmt = datetime.datetime.strptime(date, "%y_%m_%d") date_fmt = date_fmt.strftime("%A, %B %d, %Y") response += f"{date_fmt} - /now/{link}\n" - return render_template("now.ascii", date="Old Now Pages", content=response, header=get_header()) - + return render_template( + "now.ascii", date="Old Now Pages", content=response, header=get_header() + ) html = '

    " return render_template( - "now/old.html", handshake_scripts=getHandshakeScript(request.host), now_pages=html + "now/old.html", + handshake_scripts=getHandshakeScript(request.host), + now_pages=html, ) @@ -189,7 +196,7 @@ def rss(): link = page.strip(".html") date = datetime.datetime.strptime(link, "%y_%m_%d") date = date.strftime("%A, %B %d, %Y") - rss += f'What\'s Happening {date}{host}/now/{link}Latest updates for {date}{host}/now/{link}' + rss += f"What's Happening {date}{host}/now/{link}Latest updates for {date}{host}/now/{link}" rss += "" return make_response(rss, 200, {"Content-Type": "application/rss+xml"}) @@ -200,6 +207,17 @@ def json(): host = "https://" + request.host if ":" in request.host: host = "http://" + request.host - now_pages = [{"url": host+"/now/"+page.strip(".html"), "date": datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime( - "%A, %B %d, %Y"), "title": "What's Happening "+datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime("%A, %B %d, %Y")} for page in now_pages] + now_pages = [ + { + "url": host + "/now/" + page.strip(".html"), + "date": datetime.datetime.strptime( + page.strip(".html"), "%y_%m_%d" + ).strftime("%A, %B %d, %Y"), + "title": "What's Happening " + + datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime( + "%A, %B %d, %Y" + ), + } + for page in now_pages + ] return jsonify(now_pages) diff --git a/blueprints/podcast.py b/blueprints/podcast.py index 3411d9f..8a801ba 100644 --- a/blueprints/podcast.py +++ b/blueprints/podcast.py @@ -2,7 +2,8 @@ from flask import Blueprint, make_response, request from tools import error_response import requests -app = Blueprint('podcast', __name__) +app = Blueprint("podcast", __name__) + @app.route("/ID1") def index(): diff --git a/blueprints/sol.py b/blueprints/sol.py index 7ad7b94..4fdb555 100644 --- a/blueprints/sol.py +++ b/blueprints/sol.py @@ -9,12 +9,12 @@ import binascii import base64 import os -app = Blueprint('sol', __name__) +app = Blueprint("sol", __name__) SOLANA_HEADERS = { "Content-Type": "application/json", "X-Action-Version": "2.4.2", - "X-Blockchain-Ids": "solana:5eykt4UsFv8P8NJdTREpY1vzqKqZKvdp" + "X-Blockchain-Ids": "solana:5eykt4UsFv8P8NJdTREpY1vzqKqZKvdp", } SOLANA_ADDRESS = None @@ -23,15 +23,19 @@ if os.path.isfile(".well-known/wallets/SOL"): address = file.read() SOLANA_ADDRESS = Pubkey.from_string(address.strip()) + def create_transaction(sender_address: str, amount: float) -> str: if SOLANA_ADDRESS is None: - raise ValueError("SOLANA_ADDRESS is not set. Please ensure the .well-known/wallets/SOL file exists and contains a valid address.") + raise ValueError( + "SOLANA_ADDRESS is not set. Please ensure the .well-known/wallets/SOL file exists and contains a valid address." + ) # Create transaction sender = Pubkey.from_string(sender_address) transfer_ix = transfer( TransferParams( - from_pubkey=sender, to_pubkey=SOLANA_ADDRESS, lamports=int( - amount * 1000000000) + from_pubkey=sender, + to_pubkey=SOLANA_ADDRESS, + lamports=int(amount * 1000000000), ) ) solana_client = Client("https://api.mainnet-beta.solana.com") @@ -50,10 +54,14 @@ def create_transaction(sender_address: str, amount: float) -> str: base64_string = base64.b64encode(raw_bytes).decode("utf-8") return base64_string + def get_solana_address() -> str: if SOLANA_ADDRESS is None: - raise ValueError("SOLANA_ADDRESS is not set. Please ensure the .well-known/wallets/SOL file exists and contains a valid address.") - return str(SOLANA_ADDRESS) + raise ValueError( + "SOLANA_ADDRESS is not set. Please ensure the .well-known/wallets/SOL file exists and contains a valid address." + ) + return str(SOLANA_ADDRESS) + @app.route("/donate", methods=["GET", "OPTIONS"]) def sol_donate(): @@ -103,7 +111,6 @@ def sol_donate_amount(amount): @app.route("/donate/", methods=["POST"]) def sol_donate_post(amount): - if not request.json: return jsonify({"message": "Error: No JSON data provided"}), 400, SOLANA_HEADERS @@ -122,4 +129,8 @@ def sol_donate_post(amount): return jsonify({"message": "Error: Amount too small"}), 400, SOLANA_HEADERS transaction = create_transaction(sender, amount) - return jsonify({"message": "Success", "transaction": transaction}), 200, SOLANA_HEADERS \ No newline at end of file + return ( + jsonify({"message": "Success", "transaction": transaction}), + 200, + SOLANA_HEADERS, + ) diff --git a/blueprints/spotify.py b/blueprints/spotify.py index 8b55252..0076c8f 100644 --- a/blueprints/spotify.py +++ b/blueprints/spotify.py @@ -5,7 +5,7 @@ import requests import time import base64 -app = Blueprint('spotify', __name__, url_prefix='/spotify') +app = Blueprint("spotify", __name__, url_prefix="/spotify") CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID") CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET") @@ -21,6 +21,7 @@ ACCESS_TOKEN = None REFRESH_TOKEN = os.getenv("SPOTIFY_REFRESH_TOKEN") TOKEN_EXPIRES = 0 + def refresh_access_token(): """Refresh Spotify access token when expired.""" global ACCESS_TOKEN, TOKEN_EXPIRES @@ -52,6 +53,7 @@ def refresh_access_token(): TOKEN_EXPIRES = time.time() + token_info.get("expires_in", 3600) return ACCESS_TOKEN + @app.route("/login") def login(): auth_query = ( @@ -60,6 +62,7 @@ def login(): ) return redirect(auth_query) + @app.route("/callback") def callback(): code = request.args.get("code") @@ -76,12 +79,14 @@ def callback(): response = requests.post(SPOTIFY_TOKEN_URL, data=data) token_info = response.json() if "access_token" not in token_info: - return json_response(request, {"error": "Failed to obtain token", "details": token_info}, 400) + return json_response( + request, {"error": "Failed to obtain token", "details": token_info}, 400 + ) access_token = token_info["access_token"] me = requests.get( "https://api.spotify.com/v1/me", - headers={"Authorization": f"Bearer {access_token}"} + headers={"Authorization": f"Bearer {access_token}"}, ).json() if me.get("id") != ALLOWED_SPOTIFY_USER_ID: @@ -93,12 +98,14 @@ def callback(): print("Refresh Token:", REFRESH_TOKEN) return redirect(url_for("spotify.currently_playing")) + @app.route("/", strict_slashes=False) @app.route("/playing") def currently_playing(): """Public endpoint showing your current track.""" track = get_spotify_track() - return json_response(request, {"spotify":track}, 200) + return json_response(request, {"spotify": track}, 200) + def get_spotify_track(): """Internal function to get current playing track without HTTP context.""" @@ -124,7 +131,7 @@ def get_spotify_track(): "album_name": data["item"]["album"]["name"], "album_art": data["item"]["album"]["images"][0]["url"], "is_playing": data["is_playing"], - "progress_ms": data.get("progress_ms",0), - "duration_ms": data["item"].get("duration_ms",1) + "progress_ms": data.get("progress_ms", 0), + "duration_ms": data["item"].get("duration_ms", 1), } - return track \ No newline at end of file + return track diff --git a/blueprints/template.py b/blueprints/template.py index 10243a8..e0c1834 100644 --- a/blueprints/template.py +++ b/blueprints/template.py @@ -1,9 +1,9 @@ from flask import Blueprint, request from tools import json_response -app = Blueprint('template', __name__) +app = Blueprint("template", __name__) @app.route("/", strict_slashes=False) def index(): - return json_response(request, "Success", 200) \ No newline at end of file + return json_response(request, "Success", 200) diff --git a/blueprints/wellknown.py b/blueprints/wellknown.py index 05edc96..4e5149d 100644 --- a/blueprints/wellknown.py +++ b/blueprints/wellknown.py @@ -1,8 +1,15 @@ -from flask import Blueprint, make_response, request, jsonify, send_from_directory, redirect +from flask import ( + Blueprint, + make_response, + request, + jsonify, + send_from_directory, + redirect, +) from tools import error_response import os -app = Blueprint('well-known', __name__, url_prefix='/.well-known') +app = Blueprint("well-known", __name__, url_prefix="/.well-known") @app.route("/") @@ -12,7 +19,7 @@ def index(path): @app.route("/wallets/") def wallets(path): - if path[0] == "." and 'proof' not in path: + if path[0] == "." and "proof" not in path: return send_from_directory( ".well-known/wallets", path, mimetype="application/json" ) diff --git a/cache_helper.py b/cache_helper.py index cf47cc2..b12a9e0 100644 --- a/cache_helper.py +++ b/cache_helper.py @@ -2,6 +2,7 @@ Cache helper module for expensive API calls and configuration. Provides centralized caching with TTL for external API calls. """ + import datetime import os import json @@ -26,14 +27,17 @@ def get_nc_config(): current_time = datetime.datetime.now().timestamp() # Check if cache is valid - if _nc_config_cache["data"] and (current_time - _nc_config_cache["timestamp"]) < _nc_config_ttl: + if ( + _nc_config_cache["data"] + and (current_time - _nc_config_cache["timestamp"]) < _nc_config_ttl + ): return _nc_config_cache["data"] # Fetch new config try: config = requests.get( "https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json", - timeout=5 + timeout=5, ).json() _nc_config_cache = {"data": config, "timestamp": current_time} return config @@ -61,15 +65,20 @@ def get_git_latest_activity(): current_time = datetime.datetime.now().timestamp() # Check if cache is valid - if _git_data_cache["data"] and (current_time - _git_data_cache["timestamp"]) < _git_data_ttl: + if ( + _git_data_cache["data"] + and (current_time - _git_data_cache["timestamp"]) < _git_data_ttl + ): return _git_data_cache["data"] # Fetch new data try: git = requests.get( "https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1", - headers={"Authorization": os.getenv("GIT_AUTH") or os.getenv("git_token") or ""}, - timeout=5 + headers={ + "Authorization": os.getenv("GIT_AUTH") or os.getenv("git_token") or "" + }, + timeout=5, ) git_data = git.json() if git_data and len(git_data) > 0: @@ -111,15 +120,17 @@ def get_projects(limit=3): current_time = datetime.datetime.now().timestamp() # Check if cache is valid - if _projects_cache["data"] and (current_time - _projects_cache["timestamp"]) < _projects_ttl: + if ( + _projects_cache["data"] + and (current_time - _projects_cache["timestamp"]) < _projects_ttl + ): return _projects_cache["data"][:limit] # Fetch new data try: projects = [] projectsreq = requests.get( - "https://git.woodburn.au/api/v1/users/nathanwoodburn/repos", - timeout=5 + "https://git.woodburn.au/api/v1/users/nathanwoodburn/repos", timeout=5 ) projects = projectsreq.json() @@ -128,7 +139,7 @@ def get_projects(limit=3): while 'rel="next"' in projectsreq.headers.get("link", ""): projectsreq = requests.get( f"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos?page={pageNum}", - timeout=5 + timeout=5, ) projects += projectsreq.json() pageNum += 1 @@ -143,7 +154,9 @@ def get_projects(limit=3): project["name"] = project["name"].replace("_", " ").replace("-", " ") # Sort by last updated - projects_sorted = sorted(projects, key=lambda x: x.get("updated_at", ""), reverse=True) + projects_sorted = sorted( + projects, key=lambda x: x.get("updated_at", ""), reverse=True + ) # Remove duplicates by name seen_names = set() @@ -178,14 +191,16 @@ def get_uptime_status(): current_time = datetime.datetime.now().timestamp() # Check if cache is valid - if _uptime_cache["data"] is not None and (current_time - _uptime_cache["timestamp"]) < _uptime_ttl: + if ( + _uptime_cache["data"] is not None + and (current_time - _uptime_cache["timestamp"]) < _uptime_ttl + ): return _uptime_cache["data"] # Fetch new data try: uptime = requests.get( - "https://uptime.woodburn.au/api/status-page/main/badge", - timeout=5 + "https://uptime.woodburn.au/api/status-page/main/badge", timeout=5 ) content = uptime.content.decode("utf-8").lower() status = "maintenance" in content or uptime.content.count(b"Up") > 1 @@ -247,4 +262,3 @@ def get_wallet_domains(): except Exception as e: print(f"Error loading domains: {e}") return {} - diff --git a/cleanSite.py b/cleanSite.py index ae75423..cfbf033 100644 --- a/cleanSite.py +++ b/cleanSite.py @@ -1,36 +1,37 @@ import os -def cleanSite(path:str): + +def cleanSite(path: str): # Check if the file is sitemap.xml - if path.endswith('sitemap.xml'): + if path.endswith("sitemap.xml"): # Open the file - with open(path, 'r') as f: + with open(path, "r") as f: # Read the content content = f.read() # Replace all .html with empty string - content = content.replace('.html', '') + content = content.replace(".html", "") # Write the content back to the file - with open(path, 'w') as f: + with open(path, "w") as f: f.write(content) # Skip the file return - + # If the file is not an html file, skip it - if not path.endswith('.html'): + if not path.endswith(".html"): if os.path.isdir(path): for file in os.listdir(path): - cleanSite(path + '/' + file) + cleanSite(path + "/" + file) return # Open the file - with open(path, 'r') as f: + with open(path, "r") as f: # Read and remove all .html content = f.read().replace('.html"', '"') # Write the cleaned content back to the file - with open(path, 'w') as f: + with open(path, "w") as f: f.write(content) -for file in os.listdir('templates'): - cleanSite('templates/' + file) \ No newline at end of file +for file in os.listdir("templates"): + cleanSite("templates/" + file) diff --git a/curl.py b/curl.py index 9b4f257..046ccf2 100644 --- a/curl.py +++ b/curl.py @@ -8,7 +8,8 @@ from cache_helper import get_git_latest_activity, get_projects as get_projects_c MAX_WIDTH = 80 -def clean_path(path:str): + +def clean_path(path: str): path = path.strip("/ ").lower() # Strip any .html extension if path.endswith(".html"): @@ -19,19 +20,21 @@ def clean_path(path:str): path = "index" return path + @lru_cache(maxsize=1) def get_header(): with open("templates/header.ascii", "r") as f: return f.read() + @lru_cache(maxsize=16) def get_current_project(): git = get_git_latest_activity() repo_name = git["repo"]["name"].lower() - repo_description = git["repo"]["description"] + repo_description = git["repo"]["description"] if not repo_description: - return f"[1;36m{repo_name}[0m" - return f"[1;36m{repo_name}[0m - [1m{repo_description}[0m" + return f"{repo_name}" + return f"{repo_name} - {repo_description}" @lru_cache(maxsize=16) @@ -39,59 +42,107 @@ def get_projects(): projects_data = get_projects_cached(limit=5) projects = "" for project in projects_data: - projects += f"""[1m{project['name']}[0m - {project['description'] if project['description'] else 'No description'} -{project['html_url']} + projects += f"""{project["name"]} - {project["description"] if project["description"] else "No description"} +{project["html_url"]} """ return projects + + def curl_response(request): # Check if .ascii exists path = clean_path(request.path) - + # Handle special cases if path == "index": # Get current project - return render_template("index.ascii",repo=get_current_project(), ip=getClientIP(request), spotify=get_spotify_track()), 200, {'Content-Type': 'text/plain; charset=utf-8'} + return ( + render_template( + "index.ascii", + repo=get_current_project(), + ip=getClientIP(request), + spotify=get_spotify_track(), + ), + 200, + {"Content-Type": "text/plain; charset=utf-8"}, + ) if path == "projects": # Get projects - return render_template("projects.ascii",header=get_header(),projects=get_projects()), 200, {'Content-Type': 'text/plain; charset=utf-8'} + return ( + render_template( + "projects.ascii", header=get_header(), projects=get_projects() + ), + 200, + {"Content-Type": "text/plain; charset=utf-8"}, + ) if path == "donate": # Get donation info - return render_template("donate.ascii",header=get_header(), - HNS=getAddress("HNS"), BTC=getAddress("BTC"), - SOL=getAddress("SOL"), ETH=getAddress("ETH") - ), 200, {'Content-Type': 'text/plain; charset=utf-8'} - + return ( + render_template( + "donate.ascii", + header=get_header(), + HNS=getAddress("HNS"), + BTC=getAddress("BTC"), + SOL=getAddress("SOL"), + ETH=getAddress("ETH"), + ), + 200, + {"Content-Type": "text/plain; charset=utf-8"}, + ) + if path == "donate/more": coinList = os.listdir(".well-known/wallets") coinList = [file for file in coinList if file[0] != "."] coinList.sort() - return render_template("donate_more.ascii",header=get_header(), - coins=coinList - ), 200, {'Content-Type': 'text/plain; charset=utf-8'} + return ( + render_template("donate_more.ascii", header=get_header(), coins=coinList), + 200, + {"Content-Type": "text/plain; charset=utf-8"}, + ) # For other donation pages, fall back to ascii if it exists if path.startswith("donate/"): coin = path.split("/")[1] address = getAddress(coin) if address != "": - return render_template("donate_coin.ascii",header=get_header(),coin=coin.upper(),address=address), 200, {'Content-Type': 'text/plain; charset=utf-8'} - + return ( + render_template( + "donate_coin.ascii", + header=get_header(), + coin=coin.upper(), + address=address, + ), + 200, + {"Content-Type": "text/plain; charset=utf-8"}, + ) + if path == "tools": tools = get_tools_data() - return render_template("tools.ascii",header=get_header(),tools=tools), 200, {'Content-Type': 'text/plain; charset=utf-8'} + return ( + render_template("tools.ascii", header=get_header(), tools=tools), + 200, + {"Content-Type": "text/plain; charset=utf-8"}, + ) if os.path.exists(f"templates/{path}.ascii"): - return render_template(f"{path}.ascii",header=get_header()), 200, {'Content-Type': 'text/plain; charset=utf-8'} - + return ( + render_template(f"{path}.ascii", header=get_header()), + 200, + {"Content-Type": "text/plain; charset=utf-8"}, + ) + # Fallback to html if it exists if os.path.exists(f"templates/{path}.html"): return render_template(f"{path}.html") - + # Return curl error page error = { "code": 404, - "message": "The requested resource was not found on this server." + "message": "The requested resource was not found on this server.", } - return render_template("error.ascii",header=get_header(),error=error), 404, {'Content-Type': 'text/plain; charset=utf-8'} \ No newline at end of file + return ( + render_template("error.ascii", header=get_header(), error=error), + 404, + {"Content-Type": "text/plain; charset=utf-8"}, + ) diff --git a/mail.py b/mail.py index 7602622..5b94be8 100644 --- a/mail.py +++ b/mail.py @@ -21,95 +21,76 @@ import os # "body":"G'\''day\nThis is a test email from my website api\n\nRegards,\nNathan.Woodburn/" # }' + def validateSender(email): domains = os.getenv("EMAIL_DOMAINS") if not domains: return False - + domains = domains.split(",") for domain in domains: if re.match(r".+@" + domain, email): return True - + return False + def sendEmail(data): fromEmail = "noreply@woodburn.au" if "from" in data: fromEmail = data["from"] if not validateSender(fromEmail): - return jsonify({ - "status": 400, - "message": "Bad request 'from' email invalid" - }) - + return jsonify({"status": 400, "message": "Bad request 'from' email invalid"}) if "to" not in data: - return jsonify({ - "status": 400, - "message": "Bad request 'to' json data missing" - }) + return jsonify({"status": 400, "message": "Bad request 'to' json data missing"}) to = data["to"] if "subject" not in data: - return jsonify({ - "status": 400, - "message": "Bad request 'subject' json data missing" - }) + return jsonify( + {"status": 400, "message": "Bad request 'subject' json data missing"} + ) subject = data["subject"] if "body" not in data: - return jsonify({ - "status": 400, - "message": "Bad request 'body' json data missing" - }) + return jsonify( + {"status": 400, "message": "Bad request 'body' json data missing"} + ) body = data["body"] if not re.match(r"[^@]+@[^@]+\.[^@]+", to): raise ValueError("Invalid recipient email address.") - + if not subject: raise ValueError("Subject cannot be empty.") - + if not body: raise ValueError("Body cannot be empty.") - + fromName = "Nathan Woodburn" - if 'sender' in data: - fromName = data['sender'] + if "sender" in data: + fromName = data["sender"] # Create the email message msg = MIMEMultipart() - msg['From'] = formataddr((fromName, fromEmail)) - msg['To'] = to - msg['Subject'] = subject - msg.attach(MIMEText(body, 'plain')) - + msg["From"] = formataddr((fromName, fromEmail)) + msg["To"] = to + msg["Subject"] = subject + msg.attach(MIMEText(body, "plain")) + # Sending the email try: host = os.getenv("EMAIL_SMTP") user = os.getenv("EMAIL_USER") password = os.getenv("EMAIL_PASS") if host is None or user is None or password is None: - return jsonify({ - "status": 500, - "error": "Email server not configured" - }) + return jsonify({"status": 500, "error": "Email server not configured"}) with smtplib.SMTP_SSL(host, 465) as server: server.login(user, password) server.sendmail(fromEmail, to, msg.as_string()) print("Email sent successfully.") - return jsonify({ - "status": 200, - "message": "Send email successfully" - }) + return jsonify({"status": 200, "message": "Send email successfully"}) except Exception as e: - return jsonify({ - "status": 500, - "error": "Sending email failed", - "exception":e - }) - - \ No newline at end of file + return jsonify({"status": 500, "error": "Sending email failed", "exception": e}) diff --git a/main.py b/main.py index 670c33e..fbd01c8 100644 --- a/main.py +++ b/main.py @@ -11,15 +11,16 @@ class GunicornApp(BaseApplication): def load_config(self): for key, value in self.options.items(): - if key in self.cfg.settings and value is not None: # type: ignore - self.cfg.set(key.lower(), value) # type: ignore + if key in self.cfg.settings and value is not None: # type: ignore + self.cfg.set(key.lower(), value) # type: ignore def load(self): return self.application -if __name__ == '__main__': - workers = os.getenv('WORKERS') - threads = os.getenv('THREADS') + +if __name__ == "__main__": + workers = os.getenv("WORKERS") + threads = os.getenv("THREADS") if workers is None: workers = 1 if threads is None: @@ -27,10 +28,17 @@ if __name__ == '__main__': workers = int(workers) threads = int(threads) options = { - 'bind': '0.0.0.0:5000', - 'workers': workers, - 'threads': threads, + "bind": "0.0.0.0:5000", + "workers": workers, + "threads": threads, } gunicorn_app = GunicornApp(app, options) - print('Starting server with ' + str(workers) + ' workers and ' + str(threads) + ' threads', flush=True) + print( + "Starting server with " + + str(workers) + + " workers and " + + str(threads) + + " threads", + flush=True, + ) gunicorn_app.run() diff --git a/server.py b/server.py index dd35bb5..a0080b9 100644 --- a/server.py +++ b/server.py @@ -363,7 +363,11 @@ def donate(): for token in tokenList: chain_display = f" on {token['chain']}" if token["chain"] != "null" else "" - symbol_display = f" ({token['symbol']}{chain_display})" if token["symbol"] != token["name"] else chain_display + symbol_display = ( + f" ({token['symbol']}{chain_display})" + if token["symbol"] != token["name"] + else chain_display + ) coins += f'' crypto = request.args.get("c") @@ -404,8 +408,12 @@ def donate(): if not token: cryptoHTML += f"
    Donate with {coin_display}:" else: - token_symbol = f" ({token['symbol']})" if token['symbol'] != token['name'] else "" - cryptoHTML += f"
    Donate with {token['name']}{token_symbol} on {crypto}:" + token_symbol = ( + f" ({token['symbol']})" if token["symbol"] != token["name"] else "" + ) + cryptoHTML += ( + f"
    Donate with {token['name']}{token_symbol} on {crypto}:" + ) cryptoHTML += f'
    {address}' if proof: @@ -413,9 +421,13 @@ def donate(): elif token: if "address" in token: address = token["address"] - token_symbol = f" ({token['symbol']})" if token['symbol'] != token['name'] else "" - chain_display = f" on {crypto}" if crypto != 'NULL' else "" - cryptoHTML += f"
    Donate with {token['name']}{token_symbol}{chain_display}:" + token_symbol = ( + f" ({token['symbol']})" if token["symbol"] != token["name"] else "" + ) + chain_display = f" on {crypto}" if crypto != "NULL" else "" + cryptoHTML += ( + f"
    Donate with {token['name']}{token_symbol}{chain_display}:" + ) cryptoHTML += f'
    {address}' if proof: cryptoHTML += proof diff --git a/tools.py b/tools.py index a571df5..159cb1f 100644 --- a/tools.py +++ b/tools.py @@ -24,17 +24,10 @@ CRAWLERS = [ "Exabot", "facebot", "ia_archiver", - "Twitterbot" + "Twitterbot", ] -CLI_AGENTS = [ - "curl", - "hurl", - "xh", - "Posting", - "HTTPie", - "nushell" -] +CLI_AGENTS = ["curl", "hurl", "xh", "Posting", "HTTPie", "nushell"] def getClientIP(request: Request) -> str: @@ -56,6 +49,7 @@ def getClientIP(request: Request) -> str: ip = "unknown" return ip + @lru_cache(maxsize=1) def getGitCommit() -> str: """ @@ -115,6 +109,7 @@ def isCrawler(request: Request) -> bool: return any(crawler in user_agent for crawler in CRAWLERS) return False + @lru_cache(maxsize=128) def isDev(host: str) -> bool: """ @@ -135,6 +130,7 @@ def isDev(host: str) -> bool: return True return False + @lru_cache(maxsize=128) def getHandshakeScript(host: str) -> str: """ @@ -150,6 +146,7 @@ def getHandshakeScript(host: str) -> str: return "" return '' + @lru_cache(maxsize=64) def getAddress(coin: str) -> str: """ @@ -187,7 +184,9 @@ def getFilePath(name: str, path: str) -> Optional[str]: return None -def json_response(request: Request, message: Union[str, Dict] = "404 Not Found", code: int = 404): +def json_response( + request: Request, message: Union[str, Dict] = "404 Not Found", code: int = 404 +): """ Create a JSON response with standard formatting. @@ -205,17 +204,20 @@ def json_response(request: Request, message: Union[str, Dict] = "404 Not Found", message["ip"] = getClientIP(request) return jsonify(message), code - return jsonify({ - "status": code, - "message": message, - "ip": getClientIP(request), - }), code + return jsonify( + { + "status": code, + "message": message, + "ip": getClientIP(request), + } + ), code + def error_response( request: Request, message: str = "404 Not Found", code: int = 404, - force_json: bool = False + force_json: bool = False, ) -> Union[Tuple[Dict, int], object]: """ Create an error response in JSON or HTML format. @@ -233,10 +235,12 @@ def error_response( return json_response(request, message, code) # Check if .html exists in templates - template_name = f"{code}.html" if os.path.isfile( - f"templates/{code}.html") else "404.html" - response = make_response(render_template( - template_name, code=code, message=message), code) + template_name = ( + f"{code}.html" if os.path.isfile(f"templates/{code}.html") else "404.html" + ) + response = make_response( + render_template(template_name, code=code, message=message), code + ) # Add message to response headers response.headers["X-Error-Message"] = message @@ -260,8 +264,7 @@ def parse_date(date_groups: list[str]) -> str | None: date_str = " ".join(date_groups).strip() # Remove ordinal suffixes - date_str = re.sub(r'(\d+)(st|nd|rd|th)', r'\1', - date_str, flags=re.IGNORECASE) + date_str = re.sub(r"(\d+)(st|nd|rd|th)", r"\1", date_str, flags=re.IGNORECASE) # Parse with dateutil, default day=1 if missing dt = parse(date_str, default=datetime.datetime(1900, 1, 1)) @@ -275,6 +278,7 @@ def parse_date(date_groups: list[str]) -> str | None: except (ValueError, TypeError): return None + def get_tools_data(): with open("data/tools.json", "r") as f: return json.load(f)