diff --git a/blueprints/api.py b/blueprints/api.py index ec41c3b..5d368b3 100644 --- a/blueprints/api.py +++ b/blueprints/api.py @@ -3,8 +3,7 @@ import os import datetime import requests from mail import sendEmail -from sol import create_transaction, get_solana_address -import json +from sol import create_transaction api_bp = Blueprint('api', __name__) @@ -48,7 +47,7 @@ def getGitCommit(): @api_bp.route("/") @api_bp.route("/help") -def api_help_get(): +def help_get(): return jsonify({ "message": "Welcome to Nathan.Woodburn/ API! This is a personal website. For more information, visit https://nathan.woodburn.au", "endpoints": { @@ -64,11 +63,11 @@ def api_help_get(): }) @api_bp.route("/version") -def api_version_get(): +def version_get(): return jsonify({"version": getGitCommit()}) @api_bp.route("/time") -def api_time_get(): +def time_get(): timezone_offset = datetime.timedelta(hours=ncConfig["time-zone"]) timezone = datetime.timezone(offset=timezone_offset) time = datetime.datetime.now(tz=timezone) @@ -80,11 +79,11 @@ def api_time_get(): }) @api_bp.route("/timezone") -def api_timezone_get(): +def timezone_get(): return jsonify({"timezone": ncConfig["time-zone"]}) @api_bp.route("/timezone", methods=["POST"]) -def api_timezone_post(): +def timezone_post(): # Refresh config global ncConfig conf = requests.get( @@ -101,17 +100,17 @@ def api_timezone_post(): return jsonify({"message": "Successfully pulled latest timezone", "timezone": ncConfig["time-zone"]}) @api_bp.route("/message") -def api_message_get(): +def message_get(): return jsonify({"message": ncConfig["message"]}) @api_bp.route("/ip") -def api_ip_get(): +def ip_get(): return jsonify({"ip": getClientIP(request)}) @api_bp.route("/email", methods=["POST"]) -def api_email_post(): +def email_post(): # Verify json if not request.is_json: return jsonify({ @@ -143,7 +142,7 @@ def api_email_post(): @api_bp.route("/project") -def api_project_get(): +def project_get(): try: git = requests.get( "https://git.woodburn.au/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1", diff --git a/server.py b/server.py index af97d1b..48c908f 100644 --- a/server.py +++ b/server.py @@ -18,13 +18,12 @@ import datetime import qrcode from qrcode.constants import ERROR_CORRECT_L, ERROR_CORRECT_H from ansi2html import Ansi2HTMLConverter -from functools import cache from PIL import Image from blueprints.now import now_bp from blueprints.blog import blog_bp from blueprints.wellknown import wk_bp from blueprints.api import api_bp, getGitCommit, getClientIP -from sol import create_transaction +from tools import isCurl, isCrawler, getAddress, getFilePath app = Flask(__name__) CORS(app) @@ -37,67 +36,49 @@ app.register_blueprint(api_bp, url_prefix='/api/v1') dotenv.load_dotenv() +# region Config/Constants + # Rate limiting for hosting enquiries -email_request_count = {} # Track requests by email -ip_request_count = {} # Track requests by IP +EMAIL_REQUEST_COUNT = {} # Track requests by email +IP_REQUEST_COUNT = {} # Track requests by IP EMAIL_RATE_LIMIT = 3 # Max 3 requests per email per hour IP_RATE_LIMIT = 5 # Max 5 requests per IP per hour RATE_LIMIT_WINDOW = 3600 # 1 hour in seconds -handshake_scripts = '' +HANDSHAKE_SCRIPTS = '' -restricted = ["ascii"] -redirects = { +RESTRICTED_ROUTES = ["ascii"] +REDIRECT_ROUTES = { "contact": "/#contact" } -downloads = { +DOWNLOAD_ROUTES = { "pgp": "data/nathanwoodburn.asc" } - -sites = [] +SITES = [] if os.path.isfile("data/sites.json"): with open("data/sites.json") as file: - sites = json.load(file) + SITES = json.load(file) # Remove any sites that are not enabled - sites = [ - site for site in sites if "enabled" not in site or site["enabled"] + SITES = [ + site for site in SITES if "enabled" not in site or site["enabled"] ] -projects = [] -projectsUpdated = 0 +PROJECTS = [] +PROJECTS_UPDATED = 0 - -ncReq = requests.get( +NC_CONFIG = requests.get( "https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json" -) -ncConfig = ncReq.json() - -if 'time-zone' not in ncConfig: - ncConfig['time-zone'] = 10 - -# region Helper Functions - - -@cache -def getAddress(coin: str) -> str: - address = "" - if os.path.isfile(".well-known/wallets/" + coin.upper()): - with open(".well-known/wallets/" + coin.upper()) as file: - address = file.read() - return address - - -def getFilePath(name, path): - for root, dirs, files in os.walk(path): - if name in files: - return os.path.join(root, name) +).json() +if 'time-zone' not in NC_CONFIG: + NC_CONFIG['time-zone'] = 10 # endregion - # region Assets routes + + @app.route("/assets/") def asset_get(path): if path.endswith(".json"): @@ -206,21 +187,17 @@ def links_get(): return render_template("link.html") -@app.route("/generator/") -def generator_get(): - return render_template(request.path.split("/")[-2] + ".html") - @app.route("/api/") -def api_old_get(function): +def api_legacy_get(function): return redirect(f"/api/v1/{function}", code=301) + @app.route("/actions.json") def sol_actions_get(): return jsonify( {"rules": [{"pathPattern": "/donate**", "apiPath": "/api/v1/donate**"}]} ) - # endregion # region Main routes @@ -228,9 +205,9 @@ def sol_actions_get(): @app.route("/") def index_get(): - global handshake_scripts - global projects - global projectsUpdated + global HANDSHAKE_SCRIPTS + global PROJECTS + global PROJECTS_UPDATED # Check if host if podcast.woodburn.au if "podcast.woodburn.au" in request.host: @@ -247,35 +224,27 @@ def index_get(): # Always load if load is in the query string if request.args.get("load"): loaded = False + if isCurl(request): + return jsonify( + { + "message": "Welcome to Nathan.Woodburn/! This is a personal website. For more information, visit https://nathan.woodburn.au", + "ip": getClientIP(request), + "dev": HANDSHAKE_SCRIPTS == "", + "version": getGitCommit() + } + ) - # Check if crawler - if request.headers and request.headers.get("User-Agent"): - # Check if curl - if "curl" in request.headers.get("User-Agent", "curl"): - return jsonify( - { - "message": "Welcome to Nathan.Woodburn/! This is a personal website. For more information, visit https://nathan.woodburn.au", - "ip": getClientIP(request), - "dev": handshake_scripts == "", - "version": getGitCommit() - } - ) - - if "Googlebot" not in request.headers.get( - "User-Agent", "" - ) and "Bingbot" not in request.headers.get("User-Agent", ""): - # Check if cookie is set - if not loaded: - # Set cookie - resp = make_response( - render_template("loading.html").replace( - "https://nathan.woodburn.au/loading", "https://nathan.woodburn.au/" - ), - 200, - {"Content-Type": "text/html"}, - ) - resp.set_cookie("loaded", "true", max_age=604800) - return resp + if not loaded and not isCrawler(request): + # Set cookie + resp = make_response( + render_template("loading.html").replace( + "https://nathan.woodburn.au/loading", "https://nathan.woodburn.au/" + ), + 200, + {"Content-Type": "text/html"}, + ) + resp.set_cookie("loaded", "true", max_age=604800) + return resp try: git = requests.get( @@ -300,14 +269,14 @@ def index_get(): print(f"Error getting git data: {e}") # Get only repo names for the newest updates - if projects == [] or projectsUpdated < (datetime.datetime.now() - datetime.timedelta( + if PROJECTS == [] or PROJECTS_UPDATED < (datetime.datetime.now() - datetime.timedelta( hours=2 )).timestamp(): projectsreq = requests.get( "https://git.woodburn.au/api/v1/users/nathanwoodburn/repos" ) - projects = projectsreq.json() + PROJECTS = projectsreq.json() # Check for next page pageNum = 1 @@ -316,10 +285,10 @@ def index_get(): "https://git.woodburn.au/api/v1/users/nathanwoodburn/repos?page=" + str(pageNum) ) - projects += projectsreq.json() + PROJECTS += projectsreq.json() pageNum += 1 - for project in projects: + for project in PROJECTS: if ( project["avatar_url"] == "https://git.woodburn.au/" or project["avatar_url"] == "" @@ -329,16 +298,16 @@ def index_get(): "_", " ").replace("-", " ") # Sort by last updated projectsList = sorted( - projects, key=lambda x: x["updated_at"], reverse=True) - projects = [] + PROJECTS, key=lambda x: x["updated_at"], reverse=True) + PROJECTS = [] projectNames = [] projectNum = 0 - while len(projects) < 3: + while len(PROJECTS) < 3: if projectsList[projectNum]["name"] not in projectNames: - projects.append(projectsList[projectNum]) + PROJECTS.append(projectsList[projectNum]) projectNames.append(projectsList[projectNum]["name"]) projectNum += 1 - projectsUpdated = datetime.datetime.now().timestamp() + PROJECTS_UPDATED = datetime.datetime.now().timestamp() custom = "" # Check for downtime @@ -363,10 +332,10 @@ def index_get(): or os.getenv("dev") == "true" or request.host == "test.nathan.woodburn.au" ): - handshake_scripts = "" + HANDSHAKE_SCRIPTS = "" # Get time - timezone_offset = datetime.timedelta(hours=ncConfig["time-zone"]) + timezone_offset = datetime.timedelta(hours=NC_CONFIG["time-zone"]) timezone = datetime.timezone(offset=timezone_offset) time = datetime.datetime.now(tz=timezone) @@ -389,7 +358,7 @@ def index_get(): setInterval(updateClock, 1000); } """ - time += f"startClock({ncConfig['time-zone']});" + time += f"startClock({NC_CONFIG['time-zone']});" time += "" HNSaddress = getAddress("HNS") @@ -400,7 +369,7 @@ def index_get(): resp = make_response( render_template( "index.html", - handshake_scripts=handshake_scripts, + handshake_scripts=HANDSHAKE_SCRIPTS, HNS=HNSaddress, SOL=SOLaddress, BTC=BTCaddress, @@ -408,10 +377,10 @@ def index_get(): repo=repo, repo_description=repo_description, custom=custom, - sites=sites, - projects=projects, + sites=SITES, + projects=PROJECTS, time=time, - message=ncConfig["message"], + message=NC_CONFIG["message"], ), 200, {"Content-Type": "text/html"}, @@ -425,7 +394,7 @@ def index_get(): @app.route("/donate") def donate_get(): - global handshake_scripts + global HANDSHAKE_SCRIPTS # If localhost, don't load handshake if ( request.host == "localhost:5000" @@ -433,7 +402,7 @@ def donate_get(): or os.getenv("dev") == "true" or request.host == "test.nathan.woodburn.au" ): - handshake_scripts = "" + HANDSHAKE_SCRIPTS = "" coinList = os.listdir(".well-known/wallets") coinList = [file for file in coinList if file[0] != "."] @@ -472,7 +441,7 @@ def donate_get(): ) return render_template( "donate.html", - handshake_scripts=handshake_scripts, + handshake_scripts=HANDSHAKE_SCRIPTS, coins=coins, default_coins=default_coins, crypto=instructions, @@ -542,7 +511,7 @@ def donate_get(): return render_template( "donate.html", - handshake_scripts=handshake_scripts, + handshake_scripts=HANDSHAKE_SCRIPTS, crypto=cryptoHTML, coins=coins, default_coins=default_coins, @@ -578,7 +547,7 @@ def qrcode_get(data): qr.make() qr_image: Image.Image = qr.make_image( - fill_color="black", back_color="white").convert('RGB') # type: ignore + fill_color="black", back_color="white").convert('RGB') # type: ignore # Add logo logo = Image.open("templates/assets/img/favicon/logo.png") @@ -610,9 +579,10 @@ def supersecretpath_get(): @app.route("/download/") def download_get(path): + if path not in DOWNLOAD_ROUTES: + return render_template("404.html"), 404 # Check if file exists - if path in downloads: - path = downloads[path] + path = DOWNLOAD_ROUTES[path] if os.path.isfile(path): return send_file(path) return render_template("404.html"), 404 @@ -620,8 +590,8 @@ def download_get(path): @app.route("/hosting/send-enquiry", methods=["POST"]) def hosting_post(): - global email_request_count - global ip_request_count + global EMAIL_REQUEST_COUNT + global IP_REQUEST_COUNT if not request.json: return jsonify({"status": "error", "message": "No JSON data provided"}), 400 @@ -641,39 +611,39 @@ def hosting_post(): current_time = datetime.datetime.now().timestamp() # Check email rate limit - if email in email_request_count: - if (current_time - email_request_count[email]["last_reset"]) > RATE_LIMIT_WINDOW: + if email in EMAIL_REQUEST_COUNT: + if (current_time - EMAIL_REQUEST_COUNT[email]["last_reset"]) > RATE_LIMIT_WINDOW: # Reset counter if the time window has passed - email_request_count[email] = { + EMAIL_REQUEST_COUNT[email] = { "count": 1, "last_reset": current_time} else: # Increment counter - email_request_count[email]["count"] += 1 - if email_request_count[email]["count"] > EMAIL_RATE_LIMIT: + EMAIL_REQUEST_COUNT[email]["count"] += 1 + if EMAIL_REQUEST_COUNT[email]["count"] > EMAIL_RATE_LIMIT: return jsonify({ "status": "error", "message": "Rate limit exceeded. Please try again later." }), 429 else: # First request for this email - email_request_count[email] = {"count": 1, "last_reset": current_time} + EMAIL_REQUEST_COUNT[email] = {"count": 1, "last_reset": current_time} # Check IP rate limit - if ip in ip_request_count: - if (current_time - ip_request_count[ip]["last_reset"]) > RATE_LIMIT_WINDOW: + if ip in IP_REQUEST_COUNT: + if (current_time - IP_REQUEST_COUNT[ip]["last_reset"]) > RATE_LIMIT_WINDOW: # Reset counter if the time window has passed - ip_request_count[ip] = {"count": 1, "last_reset": current_time} + IP_REQUEST_COUNT[ip] = {"count": 1, "last_reset": current_time} else: # Increment counter - ip_request_count[ip]["count"] += 1 - if ip_request_count[ip]["count"] > IP_RATE_LIMIT: + IP_REQUEST_COUNT[ip]["count"] += 1 + if IP_REQUEST_COUNT[ip]["count"] > IP_RATE_LIMIT: return jsonify({ "status": "error", "message": "Rate limit exceeded. Please try again later." }), 429 else: # First request for this IP - ip_request_count[ip] = {"count": 1, "last_reset": current_time} + IP_REQUEST_COUNT[ip] = {"count": 1, "last_reset": current_time} cpus = request.json["cpus"] memory = request.json["memory"] @@ -833,7 +803,7 @@ def podcast_podsync_get(): @app.route("/") def catch_all_get(path: str): - global handshake_scripts + global HANDSHAKE_SCRIPTS # If localhost, don't load handshake if ( request.host == "localhost:5000" @@ -841,27 +811,27 @@ def catch_all_get(path: str): or os.getenv("dev") == "true" or request.host == "test.nathan.woodburn.au" ): - handshake_scripts = "" + HANDSHAKE_SCRIPTS = "" - if path.lower().replace(".html", "") in restricted: + if path.lower().replace(".html", "") in RESTRICTED_ROUTES: return render_template("404.html"), 404 - if path in redirects: - return redirect(redirects[path], code=302) + if path in REDIRECT_ROUTES: + return redirect(REDIRECT_ROUTES[path], code=302) # If file exists, load it if os.path.isfile("templates/" + path): - return render_template(path, handshake_scripts=handshake_scripts, sites=sites) + return render_template(path, handshake_scripts=HANDSHAKE_SCRIPTS, sites=SITES) # Try with .html if os.path.isfile("templates/" + path + ".html"): return render_template( - path + ".html", handshake_scripts=handshake_scripts, sites=sites + path + ".html", handshake_scripts=HANDSHAKE_SCRIPTS, sites=SITES ) if os.path.isfile("templates/" + path.strip("/") + ".html"): return render_template( - path.strip("/") + ".html", handshake_scripts=handshake_scripts, sites=sites + path.strip("/") + ".html", handshake_scripts=HANDSHAKE_SCRIPTS, sites=SITES ) # Try to find a file matching @@ -871,16 +841,14 @@ def catch_all_get(path: str): if filename: return send_file(filename) - if request.headers: - # Check if curl - if "curl" in request.headers.get("User-Agent", "curl"): - return jsonify( - { - "status": 404, - "message": "Page not found", - "ip": getClientIP(request), - } - ), 404 + if isCurl(request): + return jsonify( + { + "status": 404, + "message": "Page not found", + "ip": getClientIP(request), + } + ), 404 return render_template("404.html"), 404 # 404 catch all @@ -888,16 +856,14 @@ def catch_all_get(path: str): @app.errorhandler(404) def not_found(e): - if request.headers: - # Check if curl - if "curl" in request.headers.get("User-Agent", "curl"): - return jsonify( - { - "status": 404, - "message": "Page not found", - "ip": getClientIP(request), - } - ), 404 + if isCurl(request): + return jsonify( + { + "status": 404, + "message": "Page not found", + "ip": getClientIP(request), + } + ), 404 return render_template("404.html"), 404 diff --git a/tools.py b/tools.py new file mode 100644 index 0000000..dc6fd17 --- /dev/null +++ b/tools.py @@ -0,0 +1,51 @@ +from flask import Request +import os +from functools import cache + +def isCurl(request: Request) -> bool: + """ + Check if the request is from curl + + Args: + request (Request): The Flask request object + Returns: + bool: True if the request is from curl, False otherwise + + """ + if request.headers and request.headers.get("User-Agent"): + # Check if curl + if "curl" in request.headers.get("User-Agent", "curl"): + return True + return False + +def isCrawler(request: Request) -> bool: + """ + Check if the request is from a web crawler (e.g., Googlebot, Bingbot) + Args: + request (Request): The Flask request object + Returns: + bool: True if the request is from a web crawler, False otherwise + """ + + if request.headers and request.headers.get("User-Agent"): + # Check if Googlebot or Bingbot + if "Googlebot" in request.headers.get( + "User-Agent", "" + ) or "Bingbot" in request.headers.get("User-Agent", ""): + return True + return False + + +@cache +def getAddress(coin: str) -> str: + address = "" + if os.path.isfile(".well-known/wallets/" + coin.upper()): + with open(".well-known/wallets/" + coin.upper()) as file: + address = file.read() + return address + + +def getFilePath(name, path): + for root, dirs, files in os.walk(path): + if name in files: + return os.path.join(root, name) \ No newline at end of file