feat: Add some better checks
All checks were successful
Build Docker / BuildImage (push) Successful in 50s

This commit is contained in:
2025-08-25 17:23:24 +10:00
parent 33fd8136a7
commit 08f80ddb5c
4 changed files with 98 additions and 64 deletions

137
server.py
View File

@@ -16,6 +16,7 @@ import requests
from cloudflare import Cloudflare
import datetime
import qrcode
from qrcode.constants import ERROR_CORRECT_L, ERROR_CORRECT_H
import re
import binascii
import base64
@@ -51,7 +52,7 @@ handshake_scripts = '<script src="https://nathan.woodburn/handshake.js" domain="
restricted = ["ascii"]
redirects = {
"contact":"/#contact"
"contact": "/#contact"
}
downloads = {
"pgp": "data/nathanwoodburn.asc"
@@ -134,6 +135,7 @@ def send_report(path):
return render_template("404.html"), 404
def getClientIP(request):
x_forwarded_for = request.headers.get("X-Forwarded-For")
if x_forwarded_for:
@@ -142,6 +144,7 @@ def getClientIP(request):
ip = request.remote_addr
return ip
def getVersion():
# if .git exists, get the latest commit hash
if os.path.isdir(".git"):
@@ -198,6 +201,7 @@ def faviconPNG():
def faviconSVG():
return send_from_directory("templates/assets/img/favicon", "favicon.svg")
@app.route("/favicon.ico")
def faviconICO():
return send_from_directory("templates/assets/img/favicon", "favicon.ico")
@@ -279,11 +283,12 @@ def manifest():
url = f"https://{host}"
if host == "localhost:5000" or host == "127.0.0.1:5000":
url = "http://127.0.0.1:5000"
manifest["start_url"] = url
manifest["scope"] = url
manifest["scope"] = url
return jsonify(manifest)
@app.route("/sw.js")
def pw():
return send_from_directory("pwa", "sw.js")
@@ -326,7 +331,6 @@ def donateAPI():
"X-Blockchain-Ids": "solana:5eykt4UsFv8P8NJdTREpY1vzqKqZKvdp"
}
response = make_response(jsonify(data), 200, headers)
if request.method == "OPTIONS":
response.headers["Access-Control-Allow-Origin"] = "*"
@@ -370,17 +374,18 @@ def donateAmountPost(amount):
amount = float(amount)
except:
return jsonify({"message": "Error: Invalid amount"}), 400, headers
if amount < 0.0001:
return jsonify({"message": "Error: Amount too small"}), 400, headers
# Create transaction
sender = Pubkey.from_string(sender)
receiver = Pubkey.from_string("AJsPEEe6S7XSiVcdZKbeV8GRp1QuhFUsG8mLrqL4XgiU")
receiver = Pubkey.from_string(
"AJsPEEe6S7XSiVcdZKbeV8GRp1QuhFUsG8mLrqL4XgiU")
transfer_ix = transfer(
TransferParams(
from_pubkey=sender, to_pubkey=receiver, lamports=int(amount * 1000000000)
from_pubkey=sender, to_pubkey=receiver, lamports=int(
amount * 1000000000)
)
)
solana_client = Client("https://api.mainnet-beta.solana.com")
@@ -403,11 +408,12 @@ def donateAmountPost(amount):
# endregion
#region Other API routes
# region Other API routes
@app.route("/api/version")
def version():
return jsonify({"version": getVersion()})
@app.route("/api/help")
def help():
return jsonify({
@@ -424,8 +430,6 @@ def help():
})
@app.route("/api/time")
def time():
timezone_offset = datetime.timedelta(hours=ncConfig["time-zone"])
@@ -436,18 +440,20 @@ def time():
"timestamp": time.timestamp(),
"timezone": ncConfig["time-zone"],
"timeISO": time.isoformat()
})
})
@app.route("/api/timezone")
def timezone():
return jsonify({"timezone": ncConfig["time-zone"]})
@app.route("/api/timezone", methods=["POST"])
def timezonePost():
# Refresh config
global ncConfig
conf = requests.get("https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json")
conf = requests.get(
"https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json")
if conf.status_code != 200:
return jsonify({"message": "Error: Could not get timezone"})
if not conf.json():
@@ -455,14 +461,16 @@ def timezonePost():
conf = conf.json()
if "time-zone" not in conf:
return jsonify({"message": "Error: Could not get timezone"})
ncConfig = conf
return jsonify({"message": "Successfully pulled latest timezone", "timezone": ncConfig["time-zone"]})
@app.route("/api/message")
def nc():
return jsonify({"message": ncConfig["message"]})
@app.route("/api/ip")
def ip():
return jsonify({"ip": getClientIP(request)})
@@ -479,6 +487,12 @@ def email():
# Check if api key sent
data = request.json
if not data:
return jsonify({
"status": 400,
"error": "Bad request JSON Data missing"
})
if "key" not in data:
return jsonify({
"status": 401,
@@ -493,6 +507,7 @@ def email():
return sendEmail(data)
@app.route("/api/v1/project")
def getCurrentProject():
try:
@@ -516,17 +531,15 @@ def getCurrentProject():
}
}
print("Error getting git data")
return jsonify({
"repo_name": repo_name,
"repo_description": repo_description,
"git": git,
})
#endregion
# endregion
# endregion
@@ -548,16 +561,15 @@ def index():
loaded = True
if request.cookies.get("loaded"):
loaded = True
# Always load if load is in the query string
if request.args.get("load"):
loaded = False
# Check if crawler
if request.headers and request.headers.get("User-Agent"):
# Check if curl
if "curl" in request.headers.get("User-Agent"):
if "curl" in request.headers.get("User-Agent", "curl"):
return jsonify(
{
"message": "Welcome to Nathan.Woodburn/! This is a personal website. For more information, visit https://nathan.woodburn.au",
@@ -568,8 +580,8 @@ def index():
)
if "Googlebot" not in request.headers.get(
"User-Agent"
) and "Bingbot" not in request.headers.get("User-Agent"):
"User-Agent", ""
) and "Bingbot" not in request.headers.get("User-Agent", ""):
# Check if cookie is set
if not loaded:
# Set cookie
@@ -631,9 +643,11 @@ def index():
or project["avatar_url"] == ""
):
project["avatar_url"] = "/favicon.png"
project["name"] = project["name"].replace("_", " ").replace("-", " ")
project["name"] = project["name"].replace(
"_", " ").replace("-", " ")
# Sort by last updated
projectsList = sorted(projects, key=lambda x: x["updated_at"], reverse=True)
projectsList = sorted(
projects, key=lambda x: x["updated_at"], reverse=True)
projects = []
projectNames = []
projectNum = 0
@@ -646,7 +660,8 @@ def index():
custom = ""
# Check for downtime
uptime = requests.get("https://uptime.woodburn.au/api/status-page/main/badge")
uptime = requests.get(
"https://uptime.woodburn.au/api/status-page/main/badge")
uptime = uptime.content.count(b"Up") > 1
if uptime:
@@ -754,7 +769,7 @@ def now_path(path):
):
handshake_scripts = ""
return now.render_now_page(path,handshake_scripts)
return now.render_now_page(path, handshake_scripts)
@app.route("/old")
@@ -787,6 +802,7 @@ def now_old():
"now/old.html", handshake_scripts=handshake_scripts, now_pages=html
)
@app.route("/now.rss")
@app.route("/now.xml")
@app.route("/rss.xml")
@@ -805,18 +821,22 @@ def now_rss():
rss += "</channel></rss>"
return make_response(rss, 200, {"Content-Type": "application/rss+xml"})
@app.route("/now.json")
def now_json():
now_pages = now.list_now_page_files()
host = "https://" + request.host
if ":" in request.host:
host = "http://" + request.host
now_pages = [{"url":host+"/now/"+page.strip(".html"), "date":datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime("%A, %B %d, %Y"), "title":"What's Happening "+datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime("%A, %B %d, %Y")} for page in now_pages]
now_pages = [{"url": host+"/now/"+page.strip(".html"), "date": datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime(
"%A, %B %d, %Y"), "title": "What's Happening "+datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime("%A, %B %d, %Y")} for page in now_pages]
return jsonify(now_pages)
# endregion
# region blog Pages
@app.route("/blog")
@app.route("/blog/")
def blog_page():
@@ -846,12 +866,11 @@ def blog_path(path):
):
handshake_scripts = ""
return blog.render_blog_page(path,handshake_scripts)
return blog.render_blog_page(path, handshake_scripts)
# endregion
# region Donate
@app.route("/donate")
def donate():
@@ -980,10 +999,10 @@ def donate():
@app.route("/address/<path:address>")
def addressQR(address:str):
def addressQR(address):
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
error_correction=ERROR_CORRECT_L,
box_size=10,
border=4,
)
@@ -993,7 +1012,7 @@ def addressQR(address:str):
# Save the QR code image to a temporary file
qr_image_path = "/tmp/qr_code.png"
qr_image.save(qr_image_path)
qr_image.save(qr_image_path) # type: ignore
# Return the QR code image as a response
return send_file(qr_image_path, mimetype="image/png")
@@ -1001,24 +1020,25 @@ def addressQR(address:str):
@app.route("/qrcode/<path:data>")
@app.route("/qr/<path:data>")
def qr(data:str):
qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_H,box_size=10,border=2)
def qr_code(data):
qr = qrcode.QRCode(
error_correction=ERROR_CORRECT_H, box_size=10, border=2)
qr.add_data(data)
qr.make()
qr_image:Image.Image = qr.make_image(fill_color="black", back_color="white").convert('RGB')
qr_image: Image.Image = qr.make_image(
fill_color="black", back_color="white").convert('RGB') # type: ignore
# Add logo
logo = Image.open("templates/assets/img/favicon/logo.png")
basewidth = qr_image.size[0]//3
wpercent = (basewidth / float(logo.size[0]))
hsize = int((float(logo.size[1]) * float(wpercent)))
logo = logo.resize((basewidth, hsize),Image.Resampling.LANCZOS)
logo = logo.resize((basewidth, hsize), Image.Resampling.LANCZOS)
pos = ((qr_image.size[0] - logo.size[0]) // 2,
(qr_image.size[1] - logo.size[1]) // 2)
(qr_image.size[1] - logo.size[1]) // 2)
qr_image.paste(logo, pos, mask=logo)
qr_image.save("/tmp/qr_code.png")
return send_file("/tmp/qr_code.png", mimetype="image/png")
@@ -1037,6 +1057,7 @@ def supersecretpath():
ascii_art_html = converter.convert(ascii_art)
return render_template("ascii.html", ascii_art=ascii_art_html)
@app.route("/download/<path:path>")
def download(path):
# Check if file exists
@@ -1046,15 +1067,16 @@ def download(path):
return send_file(path)
return render_template("404.html"), 404
@app.route("/.well-known/<path:path>")
def wellknown(path):
return send_from_directory(".well-known", path)
@app.route("/hosting/send-enquiry", methods=["POST"])
def hosting_send_enquiry():
global email_request_count
global ip_request_count
global ip_request_count
if not request.json:
return jsonify({"status": "error", "message": "No JSON data provided"}), 400
@@ -1065,31 +1087,32 @@ def hosting_send_enquiry():
for key in required_keys:
if key not in request.json:
return jsonify({"status": "error", "message": f"Missing key: {key}"}), 400
email = request.json["email"]
ip = getClientIP(request)
print(f"Hosting enquiry from {email} ({ip})")
# Check rate limits
current_time = datetime.datetime.now().timestamp()
# Check email rate limit
if email in email_request_count:
if (current_time - email_request_count[email]["last_reset"]) > RATE_LIMIT_WINDOW:
# Reset counter if the time window has passed
email_request_count[email] = {"count": 1, "last_reset": current_time}
email_request_count[email] = {
"count": 1, "last_reset": current_time}
else:
# Increment counter
email_request_count[email]["count"] += 1
if email_request_count[email]["count"] > EMAIL_RATE_LIMIT:
return jsonify({
"status": "error",
"status": "error",
"message": f"Rate limit exceeded. Please try again later."
}), 429
else:
# First request for this email
email_request_count[email] = {"count": 1, "last_reset": current_time}
# Check IP rate limit
if ip in ip_request_count:
if (current_time - ip_request_count[ip]["last_reset"]) > RATE_LIMIT_WINDOW:
@@ -1100,19 +1123,19 @@ def hosting_send_enquiry():
ip_request_count[ip]["count"] += 1
if ip_request_count[ip]["count"] > IP_RATE_LIMIT:
return jsonify({
"status": "error",
"status": "error",
"message": "Rate limit exceeded. Please try again later."
}), 429
else:
# First request for this IP
ip_request_count[ip] = {"count": 1, "last_reset": current_time}
cpus = request.json["cpus"]
memory = request.json["memory"]
disk = request.json["disk"]
backups = request.json["backups"]
message = request.json["message"]
# Try to convert to correct types
try:
cpus = int(cpus)
@@ -1124,7 +1147,6 @@ def hosting_send_enquiry():
except:
return jsonify({"status": "error", "message": "Invalid data types"}), 400
# Basic validation
if not isinstance(cpus, int) or cpus < 1 or cpus > 64:
return jsonify({"status": "error", "message": "Invalid CPUs"}), 400
@@ -1139,8 +1161,6 @@ def hosting_send_enquiry():
if not isinstance(email, str) or len(email) > 100 or "@" not in email:
return jsonify({"status": "error", "message": "Invalid email"}), 400
# Send to Discord webhook
webhook_url = os.getenv("HOSTING_WEBHOOK")
if not webhook_url:
@@ -1206,7 +1226,7 @@ def catch_all(path: str):
if request.headers:
# Check if curl
if "curl" in request.headers.get("User-Agent"):
if "curl" in request.headers.get("User-Agent", "curl"):
return jsonify(
{
"status": 404,
@@ -1216,6 +1236,7 @@ def catch_all(path: str):
), 404
return render_template("404.html"), 404
@app.route("/resume.pdf")
def resume_pdf():
# Check if file exists
@@ -1248,11 +1269,11 @@ def hnsdoh_acme():
cf = Cloudflare(api_token=os.getenv("CF_TOKEN"))
zone = cf.zones.list(name="hnsdoh.com").to_dict()
zone_id = zone["result"][0]["id"]
zone_id = zone["result"][0]["id"] # type: ignore
existing_records = cf.dns.records.list(
zone_id=zone_id, type="TXT", name="_acme-challenge.hnsdoh.com"
zone_id=zone_id, type="TXT", name="_acme-challenge.hnsdoh.com" # type: ignore
).to_dict()
record_id = existing_records["result"][0]["id"]
record_id = existing_records["result"][0]["id"] # type: ignore
cf.dns.records.delete(dns_record_id=record_id, zone_id=zone_id)
cf.dns.records.create(
zone_id=zone_id,
@@ -1321,7 +1342,7 @@ def podsync():
def not_found(e):
if request.headers:
# Check if curl
if "curl" in request.headers.get("User-Agent"):
if "curl" in request.headers.get("User-Agent", "curl"):
return jsonify(
{
"status": 404,