import json from flask import ( Flask, make_response, redirect, request, jsonify, render_template, send_from_directory, send_file, ) from flask_cors import CORS import os import dotenv import requests import datetime import qrcode from qrcode.constants import ERROR_CORRECT_L, ERROR_CORRECT_H from ansi2html import Ansi2HTMLConverter from PIL import Image # Import blueprints from blueprints import now, blog, wellknown, api, podcast, acme, spotify from tools import ( isCLI, isCrawler, getAddress, getFilePath, error_response, getClientIP, json_response, getHandshakeScript, get_tools_data, ) from curl import curl_response from cache_helper import ( get_nc_config, get_git_latest_activity, get_projects, get_uptime_status, get_wallet_tokens, get_coin_names, get_wallet_domains, ) app = Flask(__name__) CORS(app) # Register blueprints for module in [now, blog, wellknown, api, podcast, acme, spotify]: app.register_blueprint(module.app) dotenv.load_dotenv() # region Config/Constants # Rate limiting for hosting enquiries EMAIL_REQUEST_COUNT = {} # Track requests by email IP_REQUEST_COUNT = {} # Track requests by IP EMAIL_RATE_LIMIT = 3 # Max 3 requests per email per hour IP_RATE_LIMIT = 5 # Max 5 requests per IP per hour RATE_LIMIT_WINDOW = 3600 # 1 hour in seconds RESTRICTED_ROUTES = ["ascii"] REDIRECT_ROUTES = { "contact": "/#contact", "old": "/now/old", "/meet": "https://cloud.woodburn.au/apps/calendar/appointment/PamrmmspWJZr", "/meeting": "https://cloud.woodburn.au/apps/calendar/appointment/PamrmmspWJZr", "/appointment": "https://cloud.woodburn.au/apps/calendar/appointment/PamrmmspWJZr", } DOWNLOAD_ROUTES = {"pgp": "data/nathanwoodburn.asc"} SITES = [] if os.path.isfile("data/sites.json"): with open("data/sites.json") as file: SITES = json.load(file) # Remove any sites that are not enabled SITES = [site for site in SITES if "enabled" not in site or site["enabled"]] # endregion # region Assets routes @app.route("/assets/") def asset(path): if path.endswith(".json"): return send_from_directory( "templates/assets", path, mimetype="application/json" ) if os.path.isfile("templates/assets/" + path): return send_from_directory("templates/assets", path) # Custom matching for images pathMap = { "img/hns/w": "img/external/HNS/white", "img/hns/b": "img/external/HNS/black", "img/hns": "img/external/HNS/black", } for key in pathMap: if path.startswith(key): tmpPath = path.replace(key, pathMap[key]) if os.path.isfile("templates/assets/" + tmpPath): return send_from_directory("templates/assets", tmpPath) # Try looking in one of the directories filename: str = path.split("/")[-1] if ( filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".svg") ): if os.path.isfile("templates/assets/img/" + filename): return send_from_directory("templates/assets/img", filename) if os.path.isfile("templates/assets/img/favicon/" + filename): return send_from_directory("templates/assets/img/favicon", filename) return error_response(request) @app.route("/fonts/") def fonts(path): if os.path.isfile("templates/assets/fonts/" + path): return send_from_directory("templates/assets/fonts", path) return error_response(request) @app.route("/sitemap") @app.route("/sitemap.xml") def sitemap(): # Remove all .html from sitemap if not os.path.isfile("templates/sitemap.xml"): return error_response(request) with open("templates/sitemap.xml") as file: sitemap = file.read() sitemap = sitemap.replace(".html", "") return make_response(sitemap, 200, {"Content-Type": "application/xml"}) @app.route("/favicon.") def favicon(ext): if ext not in ("png", "svg", "ico"): return error_response(request) return send_from_directory("templates/assets/img/favicon", f"favicon.{ext}") @app.route("/.js") def javascript(name): # Check if file in js directory if not os.path.isfile("templates/assets/js/" + request.path.split("/")[-1]): return error_response(request) return send_from_directory("templates/assets/js", request.path.split("/")[-1]) @app.route("/download/") def download(path): if path not in DOWNLOAD_ROUTES: return error_response(request, message="Invalid download") # Check if file exists path = DOWNLOAD_ROUTES[path] if os.path.isfile(path): return send_file(path) return error_response(request, message="File not found") # endregion # region PWA routes @app.route("/manifest.json") def manifest(): host = request.host # Read as json with open("pwa/manifest.json") as file: manifest = json.load(file) url = f"https://{host}" if host == "localhost:5000" or host == "127.0.0.1:5000": url = "http://127.0.0.1:5000" manifest["start_url"] = url manifest["scope"] = url return jsonify(manifest) @app.route("/sw.js") def serviceWorker(): return send_from_directory("pwa", "sw.js") # endregion # region Misc routes @app.route("/links") def links(): return render_template("link.html") @app.route("/actions.json") def sol_actions(): return jsonify( {"rules": [{"pathPattern": "/donate**", "apiPath": "/api/v1/donate**"}]} ) @app.route("/api/") def api_legacy(function): # Check if function is in api blueprint for rule in app.url_map.iter_rules(): # Check if the redirect route exists if rule.rule == f"/api/v1/{function}": return redirect(f"/api/v1/{function}", code=301) return error_response(request, message="404 Not Found", code=404) # endregion # region Main routes @app.route("/") def index(): # Check if host if podcast.woodburn.au if "podcast.woodburn.au" in request.host: return render_template("podcast.html") loaded = False if request.referrer: # Check if referrer includes nathan.woodburn.au if "nathan.woodburn.au" in request.referrer: loaded = True if request.cookies.get("loaded"): loaded = True # Always load if load is in the query string if request.args.get("load"): loaded = False if isCLI(request): return curl_response(request) if not loaded and not isCrawler(request): # Set cookie resp = make_response( render_template("loading.html").replace( "https://nathan.woodburn.au/loading", "https://nathan.woodburn.au/" ), 200, {"Content-Type": "text/html"}, ) resp.set_cookie("loaded", "true", max_age=604800) return resp # Use cached git data git = get_git_latest_activity() repo_name = git["repo"]["name"].lower() repo_description = git["repo"]["description"] # Use cached projects data projects = get_projects(limit=3) # Use cached uptime status uptime = get_uptime_status() custom = "" if uptime: custom += "" else: custom += "" # Special names if repo_name == "nathanwoodburn.github.io": repo_name = "Nathan.Woodburn/" html_url = git["repo"]["html_url"] repo = '' + repo_name + "" # Get time using cached config nc_config = get_nc_config() timezone_offset = datetime.timedelta(hours=nc_config["time-zone"]) timezone = datetime.timezone(offset=timezone_offset) time = datetime.datetime.now(tz=timezone) time = time.strftime("%B %d") time += """ " HNSaddress = getAddress("HNS") SOLaddress = getAddress("SOL") BTCaddress = getAddress("BTC") ETHaddress = getAddress("ETH") # Set cookie resp = make_response( render_template( "index.html", handshake_scripts=getHandshakeScript(request.host), HNS=HNSaddress, SOL=SOLaddress, BTC=BTCaddress, ETH=ETHaddress, repo=repo, repo_description=repo_description, custom=custom, sites=SITES, projects=projects, time=time, message=nc_config.get("message", ""), ), 200, {"Content-Type": "text/html"}, ) resp.set_cookie("loaded", "true", max_age=604800) return resp # region Donate @app.route("/donate") def donate(): if isCLI(request): return curl_response(request) coinList = os.listdir(".well-known/wallets") coinList = [file for file in coinList if file[0] != "."] coinList.sort() tokenList = get_wallet_tokens() coinNames = get_coin_names() coins = "" default_coins = ["btc", "eth", "hns", "sol", "xrp", "ada", "dot"] for file in coinList: coin_name = coinNames.get(file, file) display_style = "" if file.lower() in default_coins else "display:none;" coins += f'{coin_name}' for token in tokenList: chain_display = f" on {token['chain']}" if token["chain"] != "null" else "" symbol_display = ( f" ({token['symbol']}{chain_display})" if token["symbol"] != token["name"] else chain_display ) coins += f'' crypto = request.args.get("c") if not crypto: instructions = ( "
Donate with cryptocurrency:
Select a coin from the dropdown above." ) return render_template( "donate.html", handshake_scripts=getHandshakeScript(request.host), coins=coins, default_coins=default_coins, crypto=instructions, ) crypto = crypto.upper() token = request.args.get("t") if token: token = token.upper() for t in tokenList: if t["symbol"].upper() == token and t["chain"].upper() == crypto: token = t break if not isinstance(token, dict): token = {"name": "Unknown token", "symbol": token, "chain": crypto} address = "" cryptoHTML = "" proof = "" if os.path.isfile(f".well-known/wallets/.{crypto}.proof"): proof = f'Proof of ownership' if os.path.isfile(f".well-known/wallets/{crypto}"): with open(f".well-known/wallets/{crypto}") as file: address = file.read() coin_display = coinNames.get(crypto, crypto) if not token: cryptoHTML += f"
Donate with {coin_display}:" else: token_symbol = ( f" ({token['symbol']})" if token["symbol"] != token["name"] else "" ) cryptoHTML += ( f"
Donate with {token['name']}{token_symbol} on {crypto}:" ) cryptoHTML += f'
{address}' if proof: cryptoHTML += proof elif token: if "address" in token: address = token["address"] token_symbol = ( f" ({token['symbol']})" if token["symbol"] != token["name"] else "" ) chain_display = f" on {crypto}" if crypto != "NULL" else "" cryptoHTML += ( f"
Donate with {token['name']}{token_symbol}{chain_display}:" ) cryptoHTML += f'
{address}' if proof: cryptoHTML += proof else: cryptoHTML += f"
Invalid offchain token: {token['symbol']}
" else: cryptoHTML += f"
Invalid chain: {crypto}
" domains = get_wallet_domains() if crypto in domains: domain = domains[crypto] cryptoHTML += "
Or send to this domain on compatible wallets:
" cryptoHTML += f'{domain}' if address: cryptoHTML += ( '
QR Code' ) copyScript = '' cryptoHTML += copyScript return render_template( "donate.html", handshake_scripts=getHandshakeScript(request.host), crypto=cryptoHTML, coins=coins, default_coins=default_coins, ) @app.route("/address/") def qraddress(address): qr = qrcode.QRCode( version=1, error_correction=ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(address) qr.make(fit=True) qr_image = qr.make_image(fill_color="#110033", back_color="white") # Save the QR code image to a temporary file qr_image_path = "/tmp/qr_code.png" qr_image.save(qr_image_path) # type: ignore # Return the QR code image as a response return send_file(qr_image_path, mimetype="image/png") @app.route("/qrcode/") @app.route("/qr/") def qrcodee(data): qr = qrcode.QRCode(error_correction=ERROR_CORRECT_H, box_size=10, border=2) qr.add_data(data) qr.make() qr_image: Image.Image = qr.make_image( fill_color="black", back_color="white" ).convert("RGB") # type: ignore # Add logo logo = Image.open("templates/assets/img/favicon/logo.png") basewidth = qr_image.size[0] // 3 wpercent = basewidth / float(logo.size[0]) hsize = int((float(logo.size[1]) * float(wpercent))) logo = logo.resize((basewidth, hsize), Image.Resampling.LANCZOS) pos = ( (qr_image.size[0] - logo.size[0]) // 2, (qr_image.size[1] - logo.size[1]) // 2, ) qr_image.paste(logo, pos, mask=logo) qr_image.save("/tmp/qr_code.png") return send_file("/tmp/qr_code.png", mimetype="image/png") # endregion @app.route("/supersecretpath") def supersecretpath(): ascii_art = "" if os.path.isfile("data/ascii.txt"): with open("data/ascii.txt") as file: ascii_art = file.read() converter = Ansi2HTMLConverter() ascii_art_html = converter.convert(ascii_art) return render_template("ascii.html", ascii_art=ascii_art_html) @app.route("/hosting/send-enquiry", methods=["POST"]) def hosting_post(): global EMAIL_REQUEST_COUNT global IP_REQUEST_COUNT if not request.is_json or not request.json: return json_response(request, "No JSON data provided", 415) # Keys # email, cpus, memory, disk, backups, message required_keys = ["email", "cpus", "memory", "disk", "backups", "message"] for key in required_keys: if key not in request.json: return json_response(request, f"Missing key: {key}", 400) email = request.json["email"] ip = getClientIP(request) print(f"Hosting enquiry from {email} ({ip})") # Check rate limits current_time = datetime.datetime.now().timestamp() # Check email rate limit if email in EMAIL_REQUEST_COUNT: if ( current_time - EMAIL_REQUEST_COUNT[email]["last_reset"] ) > RATE_LIMIT_WINDOW: # Reset counter if the time window has passed EMAIL_REQUEST_COUNT[email] = {"count": 1, "last_reset": current_time} else: # Increment counter EMAIL_REQUEST_COUNT[email]["count"] += 1 if EMAIL_REQUEST_COUNT[email]["count"] > EMAIL_RATE_LIMIT: return json_response( request, "Rate limit exceeded. Please try again later.", 429 ) else: # First request for this email EMAIL_REQUEST_COUNT[email] = {"count": 1, "last_reset": current_time} # Check IP rate limit if ip in IP_REQUEST_COUNT: if (current_time - IP_REQUEST_COUNT[ip]["last_reset"]) > RATE_LIMIT_WINDOW: # Reset counter if the time window has passed IP_REQUEST_COUNT[ip] = {"count": 1, "last_reset": current_time} else: # Increment counter IP_REQUEST_COUNT[ip]["count"] += 1 if IP_REQUEST_COUNT[ip]["count"] > IP_RATE_LIMIT: return json_response( request, "Rate limit exceeded. Please try again later.", 429 ) else: # First request for this IP IP_REQUEST_COUNT[ip] = {"count": 1, "last_reset": current_time} cpus = request.json["cpus"] memory = request.json["memory"] disk = request.json["disk"] backups = request.json["backups"] message = request.json["message"] # Try to convert to correct types try: cpus = int(cpus) memory = float(memory) disk = int(disk) backups = backups in [True, "true", "True", 1, "1", "yes", "Yes"] message = str(message) email = str(email) except ValueError: return json_response(request, "Invalid data types", 400) # Basic validation if not isinstance(cpus, int) or cpus < 1 or cpus > 64: return json_response(request, "Invalid CPUs", 400) if not isinstance(memory, float) or memory < 0.5 or memory > 512: return json_response(request, "Invalid memory", 400) if not isinstance(disk, int) or disk < 10 or disk > 500: return json_response(request, "Invalid disk", 400) if not isinstance(backups, bool): return json_response(request, "Invalid backups", 400) if not isinstance(message, str) or len(message) > 1000: return json_response(request, "Invalid message", 400) if not isinstance(email, str) or len(email) > 100 or "@" not in email: return json_response(request, "Invalid email", 400) # Send to Discord webhook webhook_url = os.getenv("HOSTING_WEBHOOK") if not webhook_url: return json_response(request, "Hosting webhook not set", 500) data = { "content": "", "embeds": [ { "title": "Hosting Enquiry", "description": f"Email: {email}\nCPUs: {cpus}\nMemory: {memory}GB\nDisk: {disk}GB\nBackups: {backups}\nMessage: {message}", "color": 16711680, # Red color } ], } headers = { "Content-Type": "application/json", } response = requests.post(webhook_url, json=data, headers=headers) if response.status_code != 204 and response.status_code != 200: return json_response(request, "Failed to send enquiry", 500) return json_response(request, "Enquiry sent", 200) @app.route("/resume") def resume(): # Check if arg for support is passed support = request.args.get("support") return render_template("resume.html", support=support) @app.route("/resume.pdf") def resume_pdf(): # Check if arg for support is passed support = request.args.get("support") if support: return send_file("data/resume_support.pdf") return send_file("data/resume.pdf") @app.route("/tools") def tools(): if isCLI(request): return curl_response(request) return render_template("tools.html", tools=get_tools_data()) # endregion # region Error Catching # Catch all for GET requests @app.route("/") def catch_all(path: str): if path.lower().replace(".html", "") in RESTRICTED_ROUTES: return error_response(request, message="Restricted route", code=403) # If curl request, return curl response if isCLI(request): return curl_response(request) if path in REDIRECT_ROUTES: return redirect(REDIRECT_ROUTES[path], code=302) # If file exists, load it if os.path.isfile("templates/" + path): return render_template( path, handshake_scripts=getHandshakeScript(request.host), sites=SITES ) # Try with .html if os.path.isfile("templates/" + path + ".html"): return render_template( path + ".html", handshake_scripts=getHandshakeScript(request.host), sites=SITES, ) if os.path.isfile("templates/" + path.strip("/") + ".html"): return render_template( path.strip("/") + ".html", handshake_scripts=getHandshakeScript(request.host), sites=SITES, ) # Try to find a file matching if path.count("/") < 1: # Try to find a file matching filename = getFilePath(path, "templates") if filename: return send_file(filename) return error_response(request) @app.errorhandler(404) def not_found(e): return error_response(request) # endregion if __name__ == "__main__": app.run(debug=True, port=5000, host="127.0.0.1")