6 Commits

Author SHA1 Message Date
e489764ff8 fix: Add escape char for curl rendering and format python files
All checks were successful
Build Docker / BuildImage (push) Successful in 1m6s
Check Code Quality / RuffCheck (push) Successful in 1m20s
2025-11-21 23:05:40 +11:00
51c4416d4d fix: Add cahce helper to dockerfile
All checks were successful
Build Docker / BuildImage (push) Successful in 1m4s
Check Code Quality / RuffCheck (push) Successful in 1m6s
2025-11-21 22:55:02 +11:00
copilot-swe-agent[bot]
a78d999a61 Fix trailing whitespace and finalize performance improvements
All checks were successful
Check Code Quality / RuffCheck (push) Successful in 51s
Build Docker / BuildImage (push) Successful in 59s
Co-authored-by: Nathanwoodburn <62039630+Nathanwoodburn@users.noreply.github.com>
2025-11-21 11:34:51 +00:00
copilot-swe-agent[bot]
74afdc1b5b Add caching to blog and now blueprints for improved performance
Co-authored-by: Nathanwoodburn <62039630+Nathanwoodburn@users.noreply.github.com>
2025-11-21 11:31:49 +00:00
copilot-swe-agent[bot]
607fdd4d46 Add caching layer for expensive API calls and file operations
Co-authored-by: Nathanwoodburn <62039630+Nathanwoodburn@users.noreply.github.com>
2025-11-21 11:30:23 +00:00
copilot-swe-agent[bot]
ca01b96e80 Initial plan 2025-11-21 11:21:23 +00:00
18 changed files with 798 additions and 497 deletions

View File

@@ -20,7 +20,7 @@ RUN --mount=type=cache,target=/root/.cache/uv \
# Copy only app source files
COPY blueprints blueprints
COPY main.py server.py curl.py tools.py mail.py ./
COPY main.py server.py curl.py tools.py mail.py cache_helper.py ./
COPY templates templates
COPY data data
COPY pwa pwa
@@ -55,6 +55,7 @@ COPY --from=build --chown=appuser:appgroup /app/server.py /app/
COPY --from=build --chown=appuser:appgroup /app/curl.py /app/
COPY --from=build --chown=appuser:appgroup /app/tools.py /app/
COPY --from=build --chown=appuser:appgroup /app/mail.py /app/
COPY --from=build --chown=appuser:appgroup /app/cache_helper.py /app/
USER appuser
EXPOSE 5000

View File

@@ -1,35 +1,38 @@
import os
import json
if not os.path.exists('.well-known/wallets'):
os.makedirs('.well-known/wallets')
if not os.path.exists(".well-known/wallets"):
os.makedirs(".well-known/wallets")
def addCoin(token:str, name:str, address:str):
with open('.well-known/wallets/'+token.upper(),'w') as f:
def addCoin(token: str, name: str, address: str):
with open(".well-known/wallets/" + token.upper(), "w") as f:
f.write(address)
with open('.well-known/wallets/.coins','r') as f:
with open(".well-known/wallets/.coins", "r") as f:
coins = json.load(f)
coins[token.upper()] = f'{name} ({token.upper()})'
with open('.well-known/wallets/.coins','w') as f:
coins[token.upper()] = f"{name} ({token.upper()})"
with open(".well-known/wallets/.coins", "w") as f:
f.write(json.dumps(coins, indent=4))
def addDomain(token:str, domain:str):
with open('.well-known/wallets/.domains','r') as f:
def addDomain(token: str, domain: str):
with open(".well-known/wallets/.domains", "r") as f:
domains = json.load(f)
domains[token.upper()] = domain
with open('.well-known/wallets/.domains','w') as f:
with open(".well-known/wallets/.domains", "w") as f:
f.write(json.dumps(domains, indent=4))
if __name__ == '__main__':
if __name__ == "__main__":
# Ask user for token
token = input('Enter token symbol: ')
name = input('Enter token name: ')
address = input('Enter wallet address: ')
token = input("Enter token symbol: ")
name = input("Enter token name: ")
address = input("Enter wallet address: ")
addCoin(token, name, address)
if input('Do you want to add a domain? (y/n): ').lower() == 'y':
domain = input('Enter domain: ')
if input("Do you want to add a domain? (y/n): ").lower() == "y":
domain = input("Enter domain: ")
addDomain(token, domain)

View File

@@ -3,7 +3,7 @@ import os
from cloudflare import Cloudflare
from tools import json_response
app = Blueprint('acme', __name__)
app = Blueprint("acme", __name__)
@app.route("/hnsdoh-acme", methods=["POST"])
@@ -23,7 +23,9 @@ def post():
zone = cf.zones.list(name="hnsdoh.com").to_dict()
zone_id = zone["result"][0]["id"] # type: ignore
existing_records = cf.dns.records.list(
zone_id=zone_id, type="TXT", name="_acme-challenge.hnsdoh.com" # type: ignore
zone_id=zone_id,
type="TXT",
name="_acme-challenge.hnsdoh.com", # type: ignore
).to_dict()
record_id = existing_records["result"][0]["id"] # type: ignore
cf.dns.records.delete(dns_record_id=record_id, zone_id=zone_id)

View File

@@ -8,6 +8,7 @@ from tools import getClientIP, getGitCommit, json_response, parse_date, get_tool
from blueprints import sol
from dateutil import parser as date_parser
from blueprints.spotify import get_spotify_track
from cache_helper import get_nc_config, get_git_latest_activity
# Constants
HTTP_OK = 200
@@ -17,51 +18,47 @@ HTTP_NOT_FOUND = 404
HTTP_UNSUPPORTED_MEDIA = 415
HTTP_SERVER_ERROR = 500
app = Blueprint('api', __name__, url_prefix='/api/v1')
app = Blueprint("api", __name__, url_prefix="/api/v1")
# Register solana blueprint
app.register_blueprint(sol.app)
# Load configuration
NC_CONFIG = requests.get(
"https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json"
).json()
if 'time-zone' not in NC_CONFIG:
NC_CONFIG['time-zone'] = 10
@app.route("/", strict_slashes=False)
@app.route("/help")
def help():
"""Provide API documentation and help."""
return jsonify({
"message": "Welcome to Nathan.Woodburn/ API! This is a personal website. For more information, visit https://nathan.woodburn.au",
"endpoints": {
"/time": "Get the current time",
"/timezone": "Get the current timezone",
"/message": "Get the message from the config",
"/project": "Get the current project from git",
"/version": "Get the current version of the website",
"/page_date?url=URL&verbose=BOOL": "Get the last modified date of a webpage (verbose is optional, default false)",
"/tools": "Get a list of tools used by Nathan Woodburn",
"/playing": "Get the currently playing Spotify track",
"/status": "Just check if the site is up",
"/ping": "Just check if the site is up",
"/ip": "Get your IP address",
"/headers": "Get your request headers",
"/help": "Get this help message"
},
"base_url": "/api/v1",
"version": getGitCommit(),
"ip": getClientIP(request),
"status": HTTP_OK
})
return jsonify(
{
"message": "Welcome to Nathan.Woodburn/ API! This is a personal website. For more information, visit https://nathan.woodburn.au",
"endpoints": {
"/time": "Get the current time",
"/timezone": "Get the current timezone",
"/message": "Get the message from the config",
"/project": "Get the current project from git",
"/version": "Get the current version of the website",
"/page_date?url=URL&verbose=BOOL": "Get the last modified date of a webpage (verbose is optional, default false)",
"/tools": "Get a list of tools used by Nathan Woodburn",
"/playing": "Get the currently playing Spotify track",
"/status": "Just check if the site is up",
"/ping": "Just check if the site is up",
"/ip": "Get your IP address",
"/headers": "Get your request headers",
"/help": "Get this help message",
},
"base_url": "/api/v1",
"version": getGitCommit(),
"ip": getClientIP(request),
"status": HTTP_OK,
}
)
@app.route("/status")
@app.route("/ping")
def status():
return json_response(request, "200 OK", HTTP_OK)
@app.route("/version")
def version():
"""Get the current version of the website."""
@@ -71,46 +68,48 @@ def version():
@app.route("/time")
def time():
"""Get the current time in the configured timezone."""
timezone_offset = datetime.timedelta(hours=NC_CONFIG["time-zone"])
nc_config = get_nc_config()
timezone_offset = datetime.timedelta(hours=nc_config["time-zone"])
timezone = datetime.timezone(offset=timezone_offset)
current_time = datetime.datetime.now(tz=timezone)
return jsonify({
"timestring": current_time.strftime("%A, %B %d, %Y %I:%M %p"),
"timestamp": current_time.timestamp(),
"timezone": NC_CONFIG["time-zone"],
"timeISO": current_time.isoformat(),
"ip": getClientIP(request),
"status": HTTP_OK
})
return jsonify(
{
"timestring": current_time.strftime("%A, %B %d, %Y %I:%M %p"),
"timestamp": current_time.timestamp(),
"timezone": nc_config["time-zone"],
"timeISO": current_time.isoformat(),
"ip": getClientIP(request),
"status": HTTP_OK,
}
)
@app.route("/timezone")
def timezone():
"""Get the current timezone setting."""
return jsonify({
"timezone": NC_CONFIG["time-zone"],
"ip": getClientIP(request),
"status": HTTP_OK
})
nc_config = get_nc_config()
return jsonify(
{
"timezone": nc_config["time-zone"],
"ip": getClientIP(request),
"status": HTTP_OK,
}
)
@app.route("/message")
def message():
"""Get the message from the configuration."""
return jsonify({
"message": NC_CONFIG["message"],
"ip": getClientIP(request),
"status": HTTP_OK
})
nc_config = get_nc_config()
return jsonify(
{"message": nc_config["message"], "ip": getClientIP(request), "status": HTTP_OK}
)
@app.route("/ip")
def ip():
"""Get the client's IP address."""
return jsonify({
"ip": getClientIP(request),
"status": HTTP_OK
})
return jsonify({"ip": getClientIP(request), "status": HTTP_OK})
@app.route("/email", methods=["POST"])
@@ -118,7 +117,9 @@ def email_post():
"""Send an email via the API (requires API key)."""
# Verify json
if not request.is_json:
return json_response(request, "415 Unsupported Media Type", HTTP_UNSUPPORTED_MEDIA)
return json_response(
request, "415 Unsupported Media Type", HTTP_UNSUPPORTED_MEDIA
)
# Check if api key sent
data = request.json
@@ -138,35 +139,27 @@ def email_post():
@app.route("/project")
def project():
"""Get information about the current git project."""
gitinfo = {
"website": None,
}
try:
git = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1",
headers={"Authorization": os.getenv("git_token")},
)
git = git.json()
git = git[0]
repo_name = git["repo"]["name"]
repo_name = repo_name.lower()
repo_description = git["repo"]["description"]
gitinfo["name"] = repo_name
gitinfo["description"] = repo_description
gitinfo["url"] = git["repo"]["html_url"]
if "website" in git["repo"]:
gitinfo["website"] = git["repo"]["website"]
except Exception as e:
print(f"Error getting git data: {e}")
return json_response(request, "500 Internal Server Error", HTTP_SERVER_ERROR)
git = get_git_latest_activity()
repo_name = git["repo"]["name"].lower()
repo_description = git["repo"]["description"]
gitinfo = {
"name": repo_name,
"description": repo_description,
"url": git["repo"]["html_url"],
"website": git["repo"].get("website"),
}
return jsonify(
{
"repo_name": repo_name,
"repo_description": repo_description,
"repo": gitinfo,
"ip": getClientIP(request),
"status": HTTP_OK,
}
)
return jsonify({
"repo_name": repo_name,
"repo_description": repo_description,
"repo": gitinfo,
"ip": getClientIP(request),
"status": HTTP_OK
})
@app.route("/tools")
def tools():
@@ -179,6 +172,7 @@ def tools():
return json_response(request, {"tools": tools}, HTTP_OK)
@app.route("/playing")
def playing():
"""Get the currently playing Spotify track."""
@@ -201,15 +195,11 @@ def headers():
# Remove from headers
toremove.append(key)
for key in toremove:
headers.pop(key)
return jsonify({
"headers": headers,
"ip": getClientIP(request),
"status": HTTP_OK
})
return jsonify({"headers": headers, "ip": getClientIP(request), "status": HTTP_OK})
@app.route("/page_date")
def page_date():
@@ -226,33 +216,33 @@ def page_date():
r = requests.get(url, timeout=5)
r.raise_for_status()
except requests.exceptions.RequestException as e:
return json_response(request, f"400 Bad Request 'url' unreachable: {e}", HTTP_BAD_REQUEST)
return json_response(
request, f"400 Bad Request 'url' unreachable: {e}", HTTP_BAD_REQUEST
)
page_text = r.text
# Remove ordinal suffixes globally
page_text = re.sub(r'(\d+)(st|nd|rd|th)', r'\1', page_text, flags=re.IGNORECASE)
page_text = re.sub(r"(\d+)(st|nd|rd|th)", r"\1", page_text, flags=re.IGNORECASE)
# Remove HTML comments
page_text = re.sub(r'<!--.*?-->', '', page_text, flags=re.DOTALL)
page_text = re.sub(r"<!--.*?-->", "", page_text, flags=re.DOTALL)
date_patterns = [
r'(\d{4})[/-](\d{1,2})[/-](\d{1,2})', # YYYY-MM-DD
r'(\d{1,2})[/-](\d{1,2})[/-](\d{4})', # DD-MM-YYYY
r'(?:Last updated:|Updated:|Updated last:)?\s*(\d{1,2})\s+([A-Za-z]{3,9})[, ]?\s*(\d{4})', # DD Month YYYY
r'(?:\b\w+\b\s+){0,3}([A-Za-z]{3,9})\s+(\d{1,2}),?\s*(\d{4})', # Month DD, YYYY with optional words
r'\b(\d{4})(\d{2})(\d{2})\b', # YYYYMMDD
r'(?:Last updated:|Updated:|Last update)?\s*([A-Za-z]{3,9})\s+(\d{4})', # Month YYYY only
r"(\d{4})[/-](\d{1,2})[/-](\d{1,2})", # YYYY-MM-DD
r"(\d{1,2})[/-](\d{1,2})[/-](\d{4})", # DD-MM-YYYY
r"(?:Last updated:|Updated:|Updated last:)?\s*(\d{1,2})\s+([A-Za-z]{3,9})[, ]?\s*(\d{4})", # DD Month YYYY
r"(?:\b\w+\b\s+){0,3}([A-Za-z]{3,9})\s+(\d{1,2}),?\s*(\d{4})", # Month DD, YYYY with optional words
r"\b(\d{4})(\d{2})(\d{2})\b", # YYYYMMDD
r"(?:Last updated:|Updated:|Last update)?\s*([A-Za-z]{3,9})\s+(\d{4})", # Month YYYY only
]
# Structured data patterns
json_date_patterns = {
r'"datePublished"\s*:\s*"([^"]+)"': "published",
r'"dateModified"\s*:\s*"([^"]+)"': "modified",
r'<meta\s+(?:[^>]*?)property\s*=\s*"article:published_time"\s+content\s*=\s*"([^"]+)"': "published",
r'<meta\s+(?:[^>]*?)property\s*=\s*"article:modified_time"\s+content\s*=\s*"([^"]+)"': "modified",
r'<time\s+datetime\s*=\s*"([^"]+)"': "published"
r'<time\s+datetime\s*=\s*"([^"]+)"': "published",
}
found_dates = []
@@ -270,7 +260,7 @@ def page_date():
for match in re.findall(pattern, page_text):
try:
dt = date_parser.isoparse(match)
formatted_date = dt.strftime('%Y-%m-%d')
formatted_date = dt.strftime("%Y-%m-%d")
found_dates.append([[formatted_date], -1, date_type])
except (ValueError, TypeError):
continue
@@ -279,7 +269,9 @@ def page_date():
return json_response(request, "Date not found on page", HTTP_BAD_REQUEST)
today = datetime.date.today()
tolerance_date = today + datetime.timedelta(days=1) # Allow for slight future dates (e.g., time zones)
tolerance_date = today + datetime.timedelta(
days=1
) # Allow for slight future dates (e.g., time zones)
# When processing dates
processed_dates = []
for date_groups, pattern_format, date_type in found_dates:
@@ -300,18 +292,32 @@ def page_date():
date_obj = {"date": dt.strftime("%Y-%m-%d"), "type": date_type}
if verbose:
if pattern_format == -1:
date_obj.update({"source": "metadata", "pattern_used": pattern_format, "raw": date_groups[0]})
date_obj.update(
{
"source": "metadata",
"pattern_used": pattern_format,
"raw": date_groups[0],
}
)
else:
date_obj.update({"source": "content", "pattern_used": pattern_format, "raw": " ".join(date_groups)})
date_obj.update(
{
"source": "content",
"pattern_used": pattern_format,
"raw": " ".join(date_groups),
}
)
processed_dates.append(date_obj)
if not processed_dates:
if verbose:
return jsonify({
"message": "No valid dates found on page",
"found_dates": found_dates,
"processed_dates": processed_dates
}), HTTP_BAD_REQUEST
return jsonify(
{
"message": "No valid dates found on page",
"found_dates": found_dates,
"processed_dates": processed_dates,
}
), HTTP_BAD_REQUEST
return json_response(request, "No valid dates found on page", HTTP_BAD_REQUEST)
# Sort dates and return latest
processed_dates.sort(key=lambda x: x["date"])

View File

@@ -3,63 +3,83 @@ from flask import Blueprint, render_template, request, jsonify
import markdown
from bs4 import BeautifulSoup
import re
from functools import lru_cache
from tools import isCLI, getClientIP, getHandshakeScript
app = Blueprint('blog', __name__, url_prefix='/blog')
app = Blueprint("blog", __name__, url_prefix="/blog")
@lru_cache(maxsize=32)
def list_page_files():
blog_pages = os.listdir("data/blog")
# Sort pages by modified time, newest first
blog_pages.sort(
key=lambda x: os.path.getmtime(os.path.join("data/blog", x)), reverse=True)
key=lambda x: os.path.getmtime(os.path.join("data/blog", x)), reverse=True
)
# Remove .md extension
blog_pages = [page.removesuffix(".md")
for page in blog_pages if page.endswith(".md")]
blog_pages = [
page.removesuffix(".md") for page in blog_pages if page.endswith(".md")
]
return blog_pages
def render_page(date, handshake_scripts=None):
# Convert md to html
@lru_cache(maxsize=64)
def get_blog_content(date):
"""Get and cache blog content."""
if not os.path.exists(f"data/blog/{date}.md"):
return render_template("404.html"), 404
return None
with open(f"data/blog/{date}.md", "r") as f:
content = f.read()
return f.read()
@lru_cache(maxsize=64)
def render_markdown_to_html(content):
"""Convert markdown to HTML with caching."""
html = markdown.markdown(
content, extensions=["sane_lists", "codehilite", "fenced_code"]
)
# Add target="_blank" to all links
html = html.replace('<a href="', '<a target="_blank" href="')
html = html.replace("<h4", "<h4 style='margin-bottom:0px;'")
html = fix_numbered_lists(html)
return html
def render_page(date, handshake_scripts=None):
# Get cached content
content = get_blog_content(date)
if content is None:
return render_template("404.html"), 404
# Get the title from the file name
title = date.removesuffix(".md").replace("_", " ")
# Convert the md to html
content = markdown.markdown(
content, extensions=['sane_lists', 'codehilite', 'fenced_code'])
# Add target="_blank" to all links
content = content.replace('<a href="', '<a target="_blank" href="')
content = content.replace("<h4", "<h4 style='margin-bottom:0px;'")
content = fix_numbered_lists(content)
# Convert the md to html (cached)
html_content = render_markdown_to_html(content)
return render_template(
"blog/template.html",
title=title,
content=content,
content=html_content,
handshake_scripts=handshake_scripts,
)
def fix_numbered_lists(html):
soup = BeautifulSoup(html, 'html.parser')
soup = BeautifulSoup(html, "html.parser")
# Find the <p> tag containing numbered steps
paragraphs = soup.find_all('p')
paragraphs = soup.find_all("p")
for p in paragraphs:
content = p.decode_contents() # type: ignore
# Check for likely numbered step structure
if re.search(r'1\.\s', content):
if re.search(r"1\.\s", content):
# Split into pre-list and numbered steps
# Match: <br>, optional whitespace, then a number and dot
parts = re.split(r'(?:<br\s*/?>)?\s*(\d+)\.\s', content)
parts = re.split(r"(?:<br\s*/?>)?\s*(\d+)\.\s", content)
# Result: [pre-text, '1', step1, '2', step2, ..., '10', step10]
pre_text = parts[0].strip()
@@ -68,10 +88,9 @@ def fix_numbered_lists(html):
# Assemble the ordered list
ol_items = []
for i in range(0, len(steps), 2):
if i+1 < len(steps):
step_html = steps[i+1].strip()
ol_items.append(
f"<li style='list-style: auto;'>{step_html}</li>")
if i + 1 < len(steps):
step_html = steps[i + 1].strip()
ol_items.append(f"<li style='list-style: auto;'>{step_html}</li>")
# Build the final list HTML
ol_html = "<ol>\n" + "\n".join(ol_items) + "\n</ol>"
@@ -80,7 +99,7 @@ def fix_numbered_lists(html):
new_html = f"{pre_text}<br />\n{ol_html}" if pre_text else ol_html
# Replace old <p> with parsed version
new_fragment = BeautifulSoup(new_html, 'html.parser')
new_fragment = BeautifulSoup(new_html, "html.parser")
p.replace_with(new_fragment)
break # Only process the first matching <p>
@@ -117,16 +136,23 @@ def index():
blog_pages = list_page_files()
# Create a html list of pages
blog_pages = [
{"name": page.replace("_", " "), "url": f"/blog/{page}", "download": f"/blog/{page}.md"} for page in blog_pages
{
"name": page.replace("_", " "),
"url": f"/blog/{page}",
"download": f"/blog/{page}.md",
}
for page in blog_pages
]
# Render the template
return jsonify({
"status": 200,
"message": "Check out my various blog postsa",
"ip": getClientIP(request),
"blogs": blog_pages
}), 200
return jsonify(
{
"status": 200,
"message": "Check out my various blog postsa",
"ip": getClientIP(request),
"blogs": blog_pages,
}
), 200
@app.route("/<path:path>")
@@ -134,31 +160,30 @@ def path(path):
if not isCLI(request):
return render_page(path, handshake_scripts=getHandshakeScript(request.host))
# Convert md to html
if not os.path.exists(f"data/blog/{path}.md"):
# Get cached content
content = get_blog_content(path)
if content is None:
return render_template("404.html"), 404
with open(f"data/blog/{path}.md", "r") as f:
content = f.read()
# Get the title from the file name
title = path.replace("_", " ")
return jsonify({
"status": 200,
"message": f"Blog post: {title}",
"ip": getClientIP(request),
"title": title,
"content": content,
"download": f"/blog/{path}.md"
}), 200
return jsonify(
{
"status": 200,
"message": f"Blog post: {title}",
"ip": getClientIP(request),
"title": title,
"content": content,
"download": f"/blog/{path}.md",
}
), 200
@app.route("/<path:path>.md")
def path_md(path):
if not os.path.exists(f"data/blog/{path}.md"):
content = get_blog_content(path)
if content is None:
return render_template("404.html"), 404
with open(f"data/blog/{path}.md", "r") as f:
content = f.read()
# Return the raw markdown file
return content, 200, {'Content-Type': 'text/plain; charset=utf-8'}
return content, 200, {"Content-Type": "text/plain; charset=utf-8"}

View File

@@ -1,15 +1,17 @@
from flask import Blueprint, render_template, make_response, request, jsonify
import datetime
import os
from functools import lru_cache
from tools import getHandshakeScript, error_response, isCLI
from curl import get_header, MAX_WIDTH
from bs4 import BeautifulSoup
import re
# Create blueprint
app = Blueprint('now', __name__, url_prefix='/now')
app = Blueprint("now", __name__, url_prefix="/now")
@lru_cache(maxsize=16)
def list_page_files():
now_pages = os.listdir("templates/now")
now_pages = [
@@ -19,12 +21,14 @@ def list_page_files():
return now_pages
@lru_cache(maxsize=16)
def list_dates():
now_pages = list_page_files()
now_dates = [page.split(".")[0] for page in now_pages]
return now_dates
@lru_cache(maxsize=8)
def get_latest_date(formatted=False):
if formatted:
date = list_dates()[0]
@@ -51,7 +55,10 @@ def render(date, handshake_scripts=None):
date_formatted = datetime.datetime.strptime(date, "%y_%m_%d")
date_formatted = date_formatted.strftime("%A, %B %d, %Y")
return render_template(f"now/{date}.html", DATE=date_formatted, handshake_scripts=handshake_scripts)
return render_template(
f"now/{date}.html", DATE=date_formatted, handshake_scripts=handshake_scripts
)
def render_curl(date=None):
# If the date is not available, render the latest page
@@ -71,7 +78,7 @@ def render_curl(date=None):
# Load HTML
with open(f"templates/now/{date}.html", "r", encoding="utf-8") as f:
raw_html = f.read().replace("{{ date }}", date_formatted)
soup = BeautifulSoup(raw_html, 'html.parser')
soup = BeautifulSoup(raw_html, "html.parser")
posts = []
@@ -82,12 +89,12 @@ def render_curl(date=None):
for div in divs:
# header could be h1/h2/h3 inside the div
header_tag = div.find(["h1", "h2", "h3"]) # type: ignore
header_tag = div.find(["h1", "h2", "h3"]) # type: ignore
# content is usually one or more <p> tags inside the div
p_tags = div.find_all("p") # type: ignore
p_tags = div.find_all("p") # type: ignore
if header_tag and p_tags:
header_text = header_tag.get_text(strip=True) # type: ignore
header_text = header_tag.get_text(strip=True) # type: ignore
content_lines = []
for p in p_tags:
@@ -95,7 +102,7 @@ def render_curl(date=None):
text = p.get_text(strip=False)
# Extract any <a> links in the paragraph
links = [a.get("href") for a in p.find_all("a", href=True)] # type: ignore
links = [a.get("href") for a in p.find_all("a", href=True)] # type: ignore
# Set max width for text wrapping
# Wrap text manually
@@ -103,7 +110,7 @@ def render_curl(date=None):
for line in text.splitlines():
while len(line) > MAX_WIDTH:
# Find last space within max_width
split_at = line.rfind(' ', 0, MAX_WIDTH)
split_at = line.rfind(" ", 0, MAX_WIDTH)
if split_at == -1:
split_at = MAX_WIDTH
wrapped_lines.append(line[:split_at].rstrip())
@@ -112,7 +119,7 @@ def render_curl(date=None):
text = "\n".join(wrapped_lines)
if links:
text += "\nLinks: " + ", ".join(links) # type: ignore
text += "\nLinks: " + ", ".join(links) # type: ignore
content_lines.append(text)
@@ -124,8 +131,9 @@ def render_curl(date=None):
for post in posts:
response += f"{post['header']}\n\n{post['content']}\n\n"
return render_template("now.ascii", date=date_formatted, content=response, header=get_header())
return render_template(
"now.ascii", date=date_formatted, content=response, header=get_header()
)
@app.route("/", strict_slashes=False)
@@ -153,8 +161,9 @@ def old():
date_fmt = datetime.datetime.strptime(date, "%y_%m_%d")
date_fmt = date_fmt.strftime("%A, %B %d, %Y")
response += f"{date_fmt} - /now/{link}\n"
return render_template("now.ascii", date="Old Now Pages", content=response, header=get_header())
return render_template(
"now.ascii", date="Old Now Pages", content=response, header=get_header()
)
html = '<ul class="list-group">'
html += f'<a style="text-decoration:none;" href="/now"><li class="list-group-item" style="background-color:#000000;color:#ffffff;">{get_latest_date(True)}</li></a>'
@@ -167,7 +176,9 @@ def old():
html += "</ul>"
return render_template(
"now/old.html", handshake_scripts=getHandshakeScript(request.host), now_pages=html
"now/old.html",
handshake_scripts=getHandshakeScript(request.host),
now_pages=html,
)
@@ -185,7 +196,7 @@ def rss():
link = page.strip(".html")
date = datetime.datetime.strptime(link, "%y_%m_%d")
date = date.strftime("%A, %B %d, %Y")
rss += f'<item><title>What\'s Happening {date}</title><link>{host}/now/{link}</link><description>Latest updates for {date}</description><guid>{host}/now/{link}</guid></item>'
rss += f"<item><title>What's Happening {date}</title><link>{host}/now/{link}</link><description>Latest updates for {date}</description><guid>{host}/now/{link}</guid></item>"
rss += "</channel></rss>"
return make_response(rss, 200, {"Content-Type": "application/rss+xml"})
@@ -196,6 +207,17 @@ def json():
host = "https://" + request.host
if ":" in request.host:
host = "http://" + request.host
now_pages = [{"url": host+"/now/"+page.strip(".html"), "date": datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime(
"%A, %B %d, %Y"), "title": "What's Happening "+datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime("%A, %B %d, %Y")} for page in now_pages]
now_pages = [
{
"url": host + "/now/" + page.strip(".html"),
"date": datetime.datetime.strptime(
page.strip(".html"), "%y_%m_%d"
).strftime("%A, %B %d, %Y"),
"title": "What's Happening "
+ datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime(
"%A, %B %d, %Y"
),
}
for page in now_pages
]
return jsonify(now_pages)

View File

@@ -2,7 +2,8 @@ from flask import Blueprint, make_response, request
from tools import error_response
import requests
app = Blueprint('podcast', __name__)
app = Blueprint("podcast", __name__)
@app.route("/ID1")
def index():

View File

@@ -9,12 +9,12 @@ import binascii
import base64
import os
app = Blueprint('sol', __name__)
app = Blueprint("sol", __name__)
SOLANA_HEADERS = {
"Content-Type": "application/json",
"X-Action-Version": "2.4.2",
"X-Blockchain-Ids": "solana:5eykt4UsFv8P8NJdTREpY1vzqKqZKvdp"
"X-Blockchain-Ids": "solana:5eykt4UsFv8P8NJdTREpY1vzqKqZKvdp",
}
SOLANA_ADDRESS = None
@@ -23,15 +23,19 @@ if os.path.isfile(".well-known/wallets/SOL"):
address = file.read()
SOLANA_ADDRESS = Pubkey.from_string(address.strip())
def create_transaction(sender_address: str, amount: float) -> str:
if SOLANA_ADDRESS is None:
raise ValueError("SOLANA_ADDRESS is not set. Please ensure the .well-known/wallets/SOL file exists and contains a valid address.")
raise ValueError(
"SOLANA_ADDRESS is not set. Please ensure the .well-known/wallets/SOL file exists and contains a valid address."
)
# Create transaction
sender = Pubkey.from_string(sender_address)
transfer_ix = transfer(
TransferParams(
from_pubkey=sender, to_pubkey=SOLANA_ADDRESS, lamports=int(
amount * 1000000000)
from_pubkey=sender,
to_pubkey=SOLANA_ADDRESS,
lamports=int(amount * 1000000000),
)
)
solana_client = Client("https://api.mainnet-beta.solana.com")
@@ -50,11 +54,15 @@ def create_transaction(sender_address: str, amount: float) -> str:
base64_string = base64.b64encode(raw_bytes).decode("utf-8")
return base64_string
def get_solana_address() -> str:
if SOLANA_ADDRESS is None:
raise ValueError("SOLANA_ADDRESS is not set. Please ensure the .well-known/wallets/SOL file exists and contains a valid address.")
raise ValueError(
"SOLANA_ADDRESS is not set. Please ensure the .well-known/wallets/SOL file exists and contains a valid address."
)
return str(SOLANA_ADDRESS)
@app.route("/donate", methods=["GET", "OPTIONS"])
def sol_donate():
data = {
@@ -103,7 +111,6 @@ def sol_donate_amount(amount):
@app.route("/donate/<amount>", methods=["POST"])
def sol_donate_post(amount):
if not request.json:
return jsonify({"message": "Error: No JSON data provided"}), 400, SOLANA_HEADERS
@@ -122,4 +129,8 @@ def sol_donate_post(amount):
return jsonify({"message": "Error: Amount too small"}), 400, SOLANA_HEADERS
transaction = create_transaction(sender, amount)
return jsonify({"message": "Success", "transaction": transaction}), 200, SOLANA_HEADERS
return (
jsonify({"message": "Success", "transaction": transaction}),
200,
SOLANA_HEADERS,
)

View File

@@ -5,7 +5,7 @@ import requests
import time
import base64
app = Blueprint('spotify', __name__, url_prefix='/spotify')
app = Blueprint("spotify", __name__, url_prefix="/spotify")
CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
@@ -21,6 +21,7 @@ ACCESS_TOKEN = None
REFRESH_TOKEN = os.getenv("SPOTIFY_REFRESH_TOKEN")
TOKEN_EXPIRES = 0
def refresh_access_token():
"""Refresh Spotify access token when expired."""
global ACCESS_TOKEN, TOKEN_EXPIRES
@@ -52,6 +53,7 @@ def refresh_access_token():
TOKEN_EXPIRES = time.time() + token_info.get("expires_in", 3600)
return ACCESS_TOKEN
@app.route("/login")
def login():
auth_query = (
@@ -60,6 +62,7 @@ def login():
)
return redirect(auth_query)
@app.route("/callback")
def callback():
code = request.args.get("code")
@@ -76,12 +79,14 @@ def callback():
response = requests.post(SPOTIFY_TOKEN_URL, data=data)
token_info = response.json()
if "access_token" not in token_info:
return json_response(request, {"error": "Failed to obtain token", "details": token_info}, 400)
return json_response(
request, {"error": "Failed to obtain token", "details": token_info}, 400
)
access_token = token_info["access_token"]
me = requests.get(
"https://api.spotify.com/v1/me",
headers={"Authorization": f"Bearer {access_token}"}
headers={"Authorization": f"Bearer {access_token}"},
).json()
if me.get("id") != ALLOWED_SPOTIFY_USER_ID:
@@ -93,12 +98,14 @@ def callback():
print("Refresh Token:", REFRESH_TOKEN)
return redirect(url_for("spotify.currently_playing"))
@app.route("/", strict_slashes=False)
@app.route("/playing")
def currently_playing():
"""Public endpoint showing your current track."""
track = get_spotify_track()
return json_response(request, {"spotify":track}, 200)
return json_response(request, {"spotify": track}, 200)
def get_spotify_track():
"""Internal function to get current playing track without HTTP context."""
@@ -124,7 +131,7 @@ def get_spotify_track():
"album_name": data["item"]["album"]["name"],
"album_art": data["item"]["album"]["images"][0]["url"],
"is_playing": data["is_playing"],
"progress_ms": data.get("progress_ms",0),
"duration_ms": data["item"].get("duration_ms",1)
"progress_ms": data.get("progress_ms", 0),
"duration_ms": data["item"].get("duration_ms", 1),
}
return track

View File

@@ -1,7 +1,7 @@
from flask import Blueprint, request
from tools import json_response
app = Blueprint('template', __name__)
app = Blueprint("template", __name__)
@app.route("/", strict_slashes=False)

View File

@@ -1,8 +1,15 @@
from flask import Blueprint, make_response, request, jsonify, send_from_directory, redirect
from flask import (
Blueprint,
make_response,
request,
jsonify,
send_from_directory,
redirect,
)
from tools import error_response
import os
app = Blueprint('well-known', __name__, url_prefix='/.well-known')
app = Blueprint("well-known", __name__, url_prefix="/.well-known")
@app.route("/<path:path>")
@@ -12,7 +19,7 @@ def index(path):
@app.route("/wallets/<path:path>")
def wallets(path):
if path[0] == "." and 'proof' not in path:
if path[0] == "." and "proof" not in path:
return send_from_directory(
".well-known/wallets", path, mimetype="application/json"
)

264
cache_helper.py Normal file
View File

@@ -0,0 +1,264 @@
"""
Cache helper module for expensive API calls and configuration.
Provides centralized caching with TTL for external API calls.
"""
import datetime
import os
import json
import requests
from functools import lru_cache
# Cache storage for NC_CONFIG with timestamp
_nc_config_cache = {"data": None, "timestamp": 0}
_nc_config_ttl = 3600 # 1 hour cache
def get_nc_config():
"""
Get NC_CONFIG with caching (1 hour TTL).
Falls back to default config on error.
Returns:
dict: Configuration dictionary
"""
global _nc_config_cache
current_time = datetime.datetime.now().timestamp()
# Check if cache is valid
if (
_nc_config_cache["data"]
and (current_time - _nc_config_cache["timestamp"]) < _nc_config_ttl
):
return _nc_config_cache["data"]
# Fetch new config
try:
config = requests.get(
"https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json",
timeout=5,
).json()
_nc_config_cache = {"data": config, "timestamp": current_time}
return config
except Exception as e:
print(f"Error fetching NC_CONFIG: {e}")
# Return cached data if available, otherwise default
if _nc_config_cache["data"]:
return _nc_config_cache["data"]
return {"time-zone": 10, "message": ""}
# Cache storage for git data
_git_data_cache = {"data": None, "timestamp": 0}
_git_data_ttl = 300 # 5 minutes cache
def get_git_latest_activity():
"""
Get latest git activity with caching (5 minute TTL).
Returns:
dict: Git activity data or default values
"""
global _git_data_cache
current_time = datetime.datetime.now().timestamp()
# Check if cache is valid
if (
_git_data_cache["data"]
and (current_time - _git_data_cache["timestamp"]) < _git_data_ttl
):
return _git_data_cache["data"]
# Fetch new data
try:
git = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1",
headers={
"Authorization": os.getenv("GIT_AUTH") or os.getenv("git_token") or ""
},
timeout=5,
)
git_data = git.json()
if git_data and len(git_data) > 0:
result = git_data[0]
_git_data_cache = {"data": result, "timestamp": current_time}
return result
except Exception as e:
print(f"Error fetching git data: {e}")
# Return cached or default
if _git_data_cache["data"]:
return _git_data_cache["data"]
return {
"repo": {
"html_url": "https://nathan.woodburn.au",
"name": "nathanwoodburn.github.io",
"description": "Personal website",
}
}
# Cache storage for projects
_projects_cache = {"data": None, "timestamp": 0}
_projects_ttl = 7200 # 2 hours cache
def get_projects(limit=3):
"""
Get projects list with caching (2 hour TTL).
Args:
limit (int): Number of projects to return
Returns:
list: List of project dictionaries
"""
global _projects_cache
current_time = datetime.datetime.now().timestamp()
# Check if cache is valid
if (
_projects_cache["data"]
and (current_time - _projects_cache["timestamp"]) < _projects_ttl
):
return _projects_cache["data"][:limit]
# Fetch new data
try:
projects = []
projectsreq = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos", timeout=5
)
projects = projectsreq.json()
# Check for pagination
pageNum = 2
while 'rel="next"' in projectsreq.headers.get("link", ""):
projectsreq = requests.get(
f"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos?page={pageNum}",
timeout=5,
)
projects += projectsreq.json()
pageNum += 1
# Safety limit
if pageNum > 10:
break
# Process projects
for project in projects:
if project.get("avatar_url") in ("https://git.woodburn.au/", ""):
project["avatar_url"] = "/favicon.png"
project["name"] = project["name"].replace("_", " ").replace("-", " ")
# Sort by last updated
projects_sorted = sorted(
projects, key=lambda x: x.get("updated_at", ""), reverse=True
)
# Remove duplicates by name
seen_names = set()
unique_projects = []
for project in projects_sorted:
if project["name"] not in seen_names:
unique_projects.append(project)
seen_names.add(project["name"])
_projects_cache = {"data": unique_projects, "timestamp": current_time}
return unique_projects[:limit]
except Exception as e:
print(f"Error fetching projects: {e}")
if _projects_cache["data"]:
return _projects_cache["data"][:limit]
return []
# Cache storage for uptime status
_uptime_cache = {"data": None, "timestamp": 0}
_uptime_ttl = 300 # 5 minutes cache
def get_uptime_status():
"""
Get uptime status with caching (5 minute TTL).
Returns:
bool: True if services are up, False otherwise
"""
global _uptime_cache
current_time = datetime.datetime.now().timestamp()
# Check if cache is valid
if (
_uptime_cache["data"] is not None
and (current_time - _uptime_cache["timestamp"]) < _uptime_ttl
):
return _uptime_cache["data"]
# Fetch new data
try:
uptime = requests.get(
"https://uptime.woodburn.au/api/status-page/main/badge", timeout=5
)
content = uptime.content.decode("utf-8").lower()
status = "maintenance" in content or uptime.content.count(b"Up") > 1
_uptime_cache = {"data": status, "timestamp": current_time}
return status
except Exception as e:
print(f"Error fetching uptime: {e}")
# Return cached or default (assume up)
if _uptime_cache["data"] is not None:
return _uptime_cache["data"]
return True
# Cached wallet data loaders
@lru_cache(maxsize=1)
def get_wallet_tokens():
"""
Get wallet tokens with caching.
Returns:
list: List of token dictionaries
"""
try:
with open(".well-known/wallets/.tokens") as file:
return json.load(file)
except Exception as e:
print(f"Error loading tokens: {e}")
return []
@lru_cache(maxsize=1)
def get_coin_names():
"""
Get coin names with caching.
Returns:
dict: Dictionary of coin names
"""
try:
with open(".well-known/wallets/.coins") as file:
return json.load(file)
except Exception as e:
print(f"Error loading coin names: {e}")
return {}
@lru_cache(maxsize=1)
def get_wallet_domains():
"""
Get wallet domains with caching.
Returns:
dict: Dictionary of wallet domains
"""
try:
if os.path.isfile(".well-known/wallets/.domains"):
with open(".well-known/wallets/.domains") as file:
return json.load(file)
except Exception as e:
print(f"Error loading domains: {e}")
return {}

View File

@@ -1,36 +1,37 @@
import os
def cleanSite(path:str):
def cleanSite(path: str):
# Check if the file is sitemap.xml
if path.endswith('sitemap.xml'):
if path.endswith("sitemap.xml"):
# Open the file
with open(path, 'r') as f:
with open(path, "r") as f:
# Read the content
content = f.read()
# Replace all .html with empty string
content = content.replace('.html', '')
content = content.replace(".html", "")
# Write the content back to the file
with open(path, 'w') as f:
with open(path, "w") as f:
f.write(content)
# Skip the file
return
# If the file is not an html file, skip it
if not path.endswith('.html'):
if not path.endswith(".html"):
if os.path.isdir(path):
for file in os.listdir(path):
cleanSite(path + '/' + file)
cleanSite(path + "/" + file)
return
# Open the file
with open(path, 'r') as f:
with open(path, "r") as f:
# Read and remove all .html
content = f.read().replace('.html"', '"')
# Write the cleaned content back to the file
with open(path, 'w') as f:
with open(path, "w") as f:
f.write(content)
for file in os.listdir('templates'):
cleanSite('templates/' + file)
for file in os.listdir("templates"):
cleanSite("templates/" + file)

132
curl.py
View File

@@ -2,13 +2,14 @@ from flask import render_template
from tools import getAddress, get_tools_data, getClientIP
import os
from functools import lru_cache
import requests
from blueprints.spotify import get_spotify_track
from cache_helper import get_git_latest_activity, get_projects as get_projects_cached
MAX_WIDTH = 80
def clean_path(path:str):
def clean_path(path: str):
path = path.strip("/ ").lower()
# Strip any .html extension
if path.endswith(".html"):
@@ -19,66 +20,35 @@ def clean_path(path:str):
path = "index"
return path
@lru_cache(maxsize=1)
def get_header():
with open("templates/header.ascii", "r") as f:
return f.read()
@lru_cache(maxsize=1)
@lru_cache(maxsize=16)
def get_current_project():
git = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1",
headers={"Authorization": os.getenv("GIT_AUTH") if os.getenv("GIT_AUTH") else os.getenv("git_token")},
)
git = git.json()
git = git[0]
repo_name = git["repo"]["name"]
repo_name = repo_name.lower()
git = get_git_latest_activity()
repo_name = git["repo"]["name"].lower()
repo_description = git["repo"]["description"]
if not repo_description:
return f"{repo_name}"
return f"{repo_name} - {repo_description}"
@lru_cache(maxsize=1)
@lru_cache(maxsize=16)
def get_projects():
projectsreq = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos"
)
projects = projectsreq.json()
# Check for next page
pageNum = 1
while 'rel="next"' in projectsreq.headers["link"]:
projectsreq = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos?page="
+ str(pageNum)
)
projects += projectsreq.json()
pageNum += 1
# Sort by last updated
projectsList = sorted(
projects, key=lambda x: x["updated_at"], reverse=True)
projects_data = get_projects_cached(limit=5)
projects = ""
projectNum = 0
includedNames = []
while len(includedNames) < 5 and projectNum < len(projectsList):
# Avoid duplicates
if projectsList[projectNum]["name"] in includedNames:
projectNum += 1
continue
includedNames.append(projectsList[projectNum]["name"])
project = projectsList[projectNum]
projects += f"""{project['name']} - {project['description'] if project['description'] else 'No description'}
{project['html_url']}
for project in projects_data:
projects += f"""{project["name"]} - {project["description"] if project["description"] else "No description"}
{project["html_url"]}
"""
projectNum += 1
return projects
def curl_response(request):
# Check if <path>.ascii exists
path = clean_path(request.path)
@@ -86,39 +56,81 @@ def curl_response(request):
# Handle special cases
if path == "index":
# Get current project
return render_template("index.ascii",repo=get_current_project(), ip=getClientIP(request), spotify=get_spotify_track()), 200, {'Content-Type': 'text/plain; charset=utf-8'}
return (
render_template(
"index.ascii",
repo=get_current_project(),
ip=getClientIP(request),
spotify=get_spotify_track(),
),
200,
{"Content-Type": "text/plain; charset=utf-8"},
)
if path == "projects":
# Get projects
return render_template("projects.ascii",header=get_header(),projects=get_projects()), 200, {'Content-Type': 'text/plain; charset=utf-8'}
return (
render_template(
"projects.ascii", header=get_header(), projects=get_projects()
),
200,
{"Content-Type": "text/plain; charset=utf-8"},
)
if path == "donate":
# Get donation info
return render_template("donate.ascii",header=get_header(),
HNS=getAddress("HNS"), BTC=getAddress("BTC"),
SOL=getAddress("SOL"), ETH=getAddress("ETH")
), 200, {'Content-Type': 'text/plain; charset=utf-8'}
return (
render_template(
"donate.ascii",
header=get_header(),
HNS=getAddress("HNS"),
BTC=getAddress("BTC"),
SOL=getAddress("SOL"),
ETH=getAddress("ETH"),
),
200,
{"Content-Type": "text/plain; charset=utf-8"},
)
if path == "donate/more":
coinList = os.listdir(".well-known/wallets")
coinList = [file for file in coinList if file[0] != "."]
coinList.sort()
return render_template("donate_more.ascii",header=get_header(),
coins=coinList
), 200, {'Content-Type': 'text/plain; charset=utf-8'}
return (
render_template("donate_more.ascii", header=get_header(), coins=coinList),
200,
{"Content-Type": "text/plain; charset=utf-8"},
)
# For other donation pages, fall back to ascii if it exists
if path.startswith("donate/"):
coin = path.split("/")[1]
address = getAddress(coin)
if address != "":
return render_template("donate_coin.ascii",header=get_header(),coin=coin.upper(),address=address), 200, {'Content-Type': 'text/plain; charset=utf-8'}
return (
render_template(
"donate_coin.ascii",
header=get_header(),
coin=coin.upper(),
address=address,
),
200,
{"Content-Type": "text/plain; charset=utf-8"},
)
if path == "tools":
tools = get_tools_data()
return render_template("tools.ascii",header=get_header(),tools=tools), 200, {'Content-Type': 'text/plain; charset=utf-8'}
return (
render_template("tools.ascii", header=get_header(), tools=tools),
200,
{"Content-Type": "text/plain; charset=utf-8"},
)
if os.path.exists(f"templates/{path}.ascii"):
return render_template(f"{path}.ascii",header=get_header()), 200, {'Content-Type': 'text/plain; charset=utf-8'}
return (
render_template(f"{path}.ascii", header=get_header()),
200,
{"Content-Type": "text/plain; charset=utf-8"},
)
# Fallback to html if it exists
if os.path.exists(f"templates/{path}.html"):
@@ -127,6 +139,10 @@ def curl_response(request):
# Return curl error page
error = {
"code": 404,
"message": "The requested resource was not found on this server."
"message": "The requested resource was not found on this server.",
}
return render_template("error.ascii",header=get_header(),error=error), 404, {'Content-Type': 'text/plain; charset=utf-8'}
return (
render_template("error.ascii", header=get_header(), error=error),
404,
{"Content-Type": "text/plain; charset=utf-8"},
)

57
mail.py
View File

@@ -21,6 +21,7 @@ import os
# "body":"G'\''day\nThis is a test email from my website api\n\nRegards,\nNathan.Woodburn/"
# }'
def validateSender(email):
domains = os.getenv("EMAIL_DOMAINS")
if not domains:
@@ -33,37 +34,29 @@ def validateSender(email):
return False
def sendEmail(data):
fromEmail = "noreply@woodburn.au"
if "from" in data:
fromEmail = data["from"]
if not validateSender(fromEmail):
return jsonify({
"status": 400,
"message": "Bad request 'from' email invalid"
})
return jsonify({"status": 400, "message": "Bad request 'from' email invalid"})
if "to" not in data:
return jsonify({
"status": 400,
"message": "Bad request 'to' json data missing"
})
return jsonify({"status": 400, "message": "Bad request 'to' json data missing"})
to = data["to"]
if "subject" not in data:
return jsonify({
"status": 400,
"message": "Bad request 'subject' json data missing"
})
return jsonify(
{"status": 400, "message": "Bad request 'subject' json data missing"}
)
subject = data["subject"]
if "body" not in data:
return jsonify({
"status": 400,
"message": "Bad request 'body' json data missing"
})
return jsonify(
{"status": 400, "message": "Bad request 'body' json data missing"}
)
body = data["body"]
if not re.match(r"[^@]+@[^@]+\.[^@]+", to):
@@ -76,15 +69,15 @@ def sendEmail(data):
raise ValueError("Body cannot be empty.")
fromName = "Nathan Woodburn"
if 'sender' in data:
fromName = data['sender']
if "sender" in data:
fromName = data["sender"]
# Create the email message
msg = MIMEMultipart()
msg['From'] = formataddr((fromName, fromEmail))
msg['To'] = to
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
msg["From"] = formataddr((fromName, fromEmail))
msg["To"] = to
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
# Sending the email
try:
@@ -92,24 +85,12 @@ def sendEmail(data):
user = os.getenv("EMAIL_USER")
password = os.getenv("EMAIL_PASS")
if host is None or user is None or password is None:
return jsonify({
"status": 500,
"error": "Email server not configured"
})
return jsonify({"status": 500, "error": "Email server not configured"})
with smtplib.SMTP_SSL(host, 465) as server:
server.login(user, password)
server.sendmail(fromEmail, to, msg.as_string())
print("Email sent successfully.")
return jsonify({
"status": 200,
"message": "Send email successfully"
})
return jsonify({"status": 200, "message": "Send email successfully"})
except Exception as e:
return jsonify({
"status": 500,
"error": "Sending email failed",
"exception":e
})
return jsonify({"status": 500, "error": "Sending email failed", "exception": e})

26
main.py
View File

@@ -11,15 +11,16 @@ class GunicornApp(BaseApplication):
def load_config(self):
for key, value in self.options.items():
if key in self.cfg.settings and value is not None: # type: ignore
self.cfg.set(key.lower(), value) # type: ignore
if key in self.cfg.settings and value is not None: # type: ignore
self.cfg.set(key.lower(), value) # type: ignore
def load(self):
return self.application
if __name__ == '__main__':
workers = os.getenv('WORKERS')
threads = os.getenv('THREADS')
if __name__ == "__main__":
workers = os.getenv("WORKERS")
threads = os.getenv("THREADS")
if workers is None:
workers = 1
if threads is None:
@@ -27,10 +28,17 @@ if __name__ == '__main__':
workers = int(workers)
threads = int(threads)
options = {
'bind': '0.0.0.0:5000',
'workers': workers,
'threads': threads,
"bind": "0.0.0.0:5000",
"workers": workers,
"threads": threads,
}
gunicorn_app = GunicornApp(app, options)
print('Starting server with ' + str(workers) + ' workers and ' + str(threads) + ' threads', flush=True)
print(
"Starting server with "
+ str(workers)
+ " workers and "
+ str(threads)
+ " threads",
flush=True,
)
gunicorn_app.run()

170
server.py
View File

@@ -33,6 +33,15 @@ from tools import (
get_tools_data,
)
from curl import curl_response
from cache_helper import (
get_nc_config,
get_git_latest_activity,
get_projects,
get_uptime_status,
get_wallet_tokens,
get_coin_names,
get_wallet_domains,
)
app = Flask(__name__)
CORS(app)
@@ -70,13 +79,6 @@ if os.path.isfile("data/sites.json"):
# Remove any sites that are not enabled
SITES = [site for site in SITES if "enabled" not in site or site["enabled"]]
PROJECTS = []
PROJECTS_UPDATED = 0
NC_CONFIG = requests.get(
"https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json"
).json()
# endregion
# region Assets routes
@@ -226,9 +228,6 @@ def api_legacy(function):
@app.route("/")
def index():
global PROJECTS
global PROJECTS_UPDATED
# Check if host if podcast.woodburn.au
if "podcast.woodburn.au" in request.host:
return render_template("podcast.html")
@@ -259,81 +258,22 @@ def index():
resp.set_cookie("loaded", "true", max_age=604800)
return resp
try:
git = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1",
headers={"Authorization": os.getenv("GIT_AUTH")},
)
git = git.json()
git = git[0]
repo_name = git["repo"]["name"]
repo_name = repo_name.lower()
repo_description = git["repo"]["description"]
except Exception as e:
repo_name = "nathanwoodburn.github.io"
repo_description = "Personal website"
git = {
"repo": {
"html_url": "https://nathan.woodburn.au",
"name": "nathanwoodburn.github.io",
"description": "Personal website",
}
}
print(f"Error getting git data: {e}")
# Use cached git data
git = get_git_latest_activity()
repo_name = git["repo"]["name"].lower()
repo_description = git["repo"]["description"]
# Get only repo names for the newest updates
if (
PROJECTS == []
or PROJECTS_UPDATED
< (datetime.datetime.now() - datetime.timedelta(hours=2)).timestamp()
):
projectsreq = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos"
)
PROJECTS = projectsreq.json()
# Check for next page
pageNum = 1
while 'rel="next"' in projectsreq.headers["link"]:
projectsreq = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos?page="
+ str(pageNum)
)
PROJECTS += projectsreq.json()
pageNum += 1
for project in PROJECTS:
if (
project["avatar_url"] == "https://git.woodburn.au/"
or project["avatar_url"] == ""
):
project["avatar_url"] = "/favicon.png"
project["name"] = project["name"].replace("_", " ").replace("-", " ")
# Sort by last updated
projectsList = sorted(PROJECTS, key=lambda x: x["updated_at"], reverse=True)
PROJECTS = []
projectNames = []
projectNum = 0
while len(PROJECTS) < 3:
if projectsList[projectNum]["name"] not in projectNames:
PROJECTS.append(projectsList[projectNum])
projectNames.append(projectsList[projectNum]["name"])
projectNum += 1
PROJECTS_UPDATED = datetime.datetime.now().timestamp()
# Use cached projects data
projects = get_projects(limit=3)
# Use cached uptime status
uptime = get_uptime_status()
custom = ""
# Check for downtime
uptime = requests.get("https://uptime.woodburn.au/api/status-page/main/badge")
if "maintenance" in uptime.content.decode("utf-8").lower():
uptime = True
else:
uptime = uptime.content.count(b"Up") > 1
if uptime:
custom += "<style>#downtime{display:none !important;}</style>"
else:
custom += "<style>#downtime{opacity:1;}</style>"
# Special names
if repo_name == "nathanwoodburn.github.io":
repo_name = "Nathan.Woodburn/"
@@ -341,8 +281,9 @@ def index():
html_url = git["repo"]["html_url"]
repo = '<a href="' + html_url + '" target="_blank">' + repo_name + "</a>"
# Get time
timezone_offset = datetime.timedelta(hours=NC_CONFIG["time-zone"])
# Get time using cached config
nc_config = get_nc_config()
timezone_offset = datetime.timedelta(hours=nc_config["time-zone"])
timezone = datetime.timezone(offset=timezone_offset)
time = datetime.datetime.now(tz=timezone)
@@ -365,7 +306,7 @@ def index():
setInterval(updateClock, 1000);
}
"""
time += f"startClock({NC_CONFIG['time-zone']});"
time += f"startClock({nc_config['time-zone']});"
time += "</script>"
HNSaddress = getAddress("HNS")
@@ -385,9 +326,9 @@ def index():
repo_description=repo_description,
custom=custom,
sites=SITES,
projects=PROJECTS,
projects=projects,
time=time,
message=NC_CONFIG.get("message", ""),
message=nc_config.get("message", ""),
),
200,
{"Content-Type": "text/html"},
@@ -409,31 +350,25 @@ def donate():
coinList = [file for file in coinList if file[0] != "."]
coinList.sort()
tokenList = []
with open(".well-known/wallets/.tokens") as file:
tokenList = file.read()
tokenList = json.loads(tokenList)
coinNames = {}
with open(".well-known/wallets/.coins") as file:
coinNames = file.read()
coinNames = json.loads(coinNames)
tokenList = get_wallet_tokens()
coinNames = get_coin_names()
coins = ""
default_coins = ["btc", "eth", "hns", "sol", "xrp", "ada", "dot"]
for file in coinList:
if file in coinNames:
coins += f'<a class="dropdown-item" style="{"display:none;" if file.lower() not in default_coins else ""}" href="?c={file.lower()}">{coinNames[file]}</a>'
else:
coins += f'<a class="dropdown-item" style="{"display:none;" if file.lower() not in default_coins else ""}" href="?c={file.lower()}">{file}</a>'
coin_name = coinNames.get(file, file)
display_style = "" if file.lower() in default_coins else "display:none;"
coins += f'<a class="dropdown-item" style="{display_style}" href="?c={file.lower()}">{coin_name}</a>'
for token in tokenList:
if token["chain"] != "null":
coins += f'<a class="dropdown-item" style="display:none;" href="?t={token["symbol"].lower()}&c={token["chain"].lower()}">{token["name"]} ({token["symbol"] + " on " if token["symbol"] != token["name"] else ""}{token["chain"]})</a>'
else:
coins += f'<a class="dropdown-item" style="display:none;" href="?t={token["symbol"].lower()}&c={token["chain"].lower()}">{token["name"]} ({token["symbol"] if token["symbol"] != token["name"] else ""})</a>'
chain_display = f" on {token['chain']}" if token["chain"] != "null" else ""
symbol_display = (
f" ({token['symbol']}{chain_display})"
if token["symbol"] != token["name"]
else chain_display
)
coins += f'<a class="dropdown-item" style="display:none;" href="?t={token["symbol"].lower()}&c={token["chain"].lower()}">{token["name"]}{symbol_display}</a>'
crypto = request.args.get("c")
if not crypto:
@@ -460,7 +395,6 @@ def donate():
token = {"name": "Unknown token", "symbol": token, "chain": crypto}
address = ""
domain = ""
cryptoHTML = ""
proof = ""
@@ -470,10 +404,16 @@ def donate():
if os.path.isfile(f".well-known/wallets/{crypto}"):
with open(f".well-known/wallets/{crypto}") as file:
address = file.read()
coin_display = coinNames.get(crypto, crypto)
if not token:
cryptoHTML += f"<br>Donate with {coinNames[crypto] if crypto in coinNames else crypto}:"
cryptoHTML += f"<br>Donate with {coin_display}:"
else:
cryptoHTML += f"<br>Donate with {token['name']} {'(' + token['symbol'] + ') ' if token['symbol'] != token['name'] else ''}on {crypto}:"
token_symbol = (
f" ({token['symbol']})" if token["symbol"] != token["name"] else ""
)
cryptoHTML += (
f"<br>Donate with {token['name']}{token_symbol} on {crypto}:"
)
cryptoHTML += f'<br><code data-bs-toggle="tooltip" data-bss-tooltip="" id="crypto-address" class="address" style="color: rgb(242,90,5);display: inline-block;" data-bs-original-title="Click to copy">{address}</code>'
if proof:
@@ -481,7 +421,13 @@ def donate():
elif token:
if "address" in token:
address = token["address"]
cryptoHTML += f"<br>Donate with {token['name']} {'(' + token['symbol'] + ')' if token['symbol'] != token['name'] else ''}{' on ' + crypto if crypto != 'NULL' else ''}:"
token_symbol = (
f" ({token['symbol']})" if token["symbol"] != token["name"] else ""
)
chain_display = f" on {crypto}" if crypto != "NULL" else ""
cryptoHTML += (
f"<br>Donate with {token['name']}{token_symbol}{chain_display}:"
)
cryptoHTML += f'<br><code data-bs-toggle="tooltip" data-bss-tooltip="" id="crypto-address" class="address" style="color: rgb(242,90,5);display: inline-block;" data-bs-original-title="Click to copy">{address}</code>'
if proof:
cryptoHTML += proof
@@ -490,16 +436,12 @@ def donate():
else:
cryptoHTML += f"<br>Invalid chain: {crypto}<br>"
if os.path.isfile(".well-known/wallets/.domains"):
# Get json of all domains
with open(".well-known/wallets/.domains") as file:
domains = file.read()
domains = json.loads(domains)
domains = get_wallet_domains()
if crypto in domains:
domain = domains[crypto]
cryptoHTML += "<br>Or send to this domain on compatible wallets:<br>"
cryptoHTML += f'<code data-bs-toggle="tooltip" data-bss-tooltip="" id="crypto-domain" class="address" style="color: rgb(242,90,5);display: block;" data-bs-original-title="Click to copy">{domain}</code>'
if crypto in domains:
domain = domains[crypto]
cryptoHTML += "<br>Or send to this domain on compatible wallets:<br>"
cryptoHTML += f'<code data-bs-toggle="tooltip" data-bss-tooltip="" id="crypto-domain" class="address" style="color: rgb(242,90,5);display: block;" data-bs-original-title="Click to copy">{domain}</code>'
if address:
cryptoHTML += (
'<br><img src="/address/'

View File

@@ -1,6 +1,6 @@
from flask import Request, render_template, jsonify, make_response
import os
from functools import lru_cache as cache
from functools import lru_cache
import datetime
from typing import Optional, Dict, Union, Tuple
import re
@@ -24,17 +24,10 @@ CRAWLERS = [
"Exabot",
"facebot",
"ia_archiver",
"Twitterbot"
"Twitterbot",
]
CLI_AGENTS = [
"curl",
"hurl",
"xh",
"Posting",
"HTTPie",
"nushell"
]
CLI_AGENTS = ["curl", "hurl", "xh", "Posting", "HTTPie", "nushell"]
def getClientIP(request: Request) -> str:
@@ -56,7 +49,8 @@ def getClientIP(request: Request) -> str:
ip = "unknown"
return ip
@cache
@lru_cache(maxsize=1)
def getGitCommit() -> str:
"""
Get the current git commit hash.
@@ -115,7 +109,8 @@ def isCrawler(request: Request) -> bool:
return any(crawler in user_agent for crawler in CRAWLERS)
return False
@cache
@lru_cache(maxsize=128)
def isDev(host: str) -> bool:
"""
Check if the host indicates a development environment.
@@ -135,7 +130,8 @@ def isDev(host: str) -> bool:
return True
return False
@cache
@lru_cache(maxsize=128)
def getHandshakeScript(host: str) -> str:
"""
Get the handshake script HTML snippet.
@@ -150,7 +146,8 @@ def getHandshakeScript(host: str) -> str:
return ""
return '<script src="https://nathan.woodburn/handshake.js" domain="nathan.woodburn" async></script><script src="https://nathan.woodburn/https.js" async></script>'
@cache
@lru_cache(maxsize=64)
def getAddress(coin: str) -> str:
"""
Get the wallet address for a cryptocurrency.
@@ -169,7 +166,7 @@ def getAddress(coin: str) -> str:
return address
@cache
@lru_cache(maxsize=256)
def getFilePath(name: str, path: str) -> Optional[str]:
"""
Find a file in a directory tree.
@@ -187,7 +184,9 @@ def getFilePath(name: str, path: str) -> Optional[str]:
return None
def json_response(request: Request, message: Union[str, Dict] = "404 Not Found", code: int = 404):
def json_response(
request: Request, message: Union[str, Dict] = "404 Not Found", code: int = 404
):
"""
Create a JSON response with standard formatting.
@@ -205,17 +204,20 @@ def json_response(request: Request, message: Union[str, Dict] = "404 Not Found",
message["ip"] = getClientIP(request)
return jsonify(message), code
return jsonify({
"status": code,
"message": message,
"ip": getClientIP(request),
}), code
return jsonify(
{
"status": code,
"message": message,
"ip": getClientIP(request),
}
), code
def error_response(
request: Request,
message: str = "404 Not Found",
code: int = 404,
force_json: bool = False
force_json: bool = False,
) -> Union[Tuple[Dict, int], object]:
"""
Create an error response in JSON or HTML format.
@@ -233,10 +235,12 @@ def error_response(
return json_response(request, message, code)
# Check if <error code>.html exists in templates
template_name = f"{code}.html" if os.path.isfile(
f"templates/{code}.html") else "404.html"
response = make_response(render_template(
template_name, code=code, message=message), code)
template_name = (
f"{code}.html" if os.path.isfile(f"templates/{code}.html") else "404.html"
)
response = make_response(
render_template(template_name, code=code, message=message), code
)
# Add message to response headers
response.headers["X-Error-Message"] = message
@@ -260,8 +264,7 @@ def parse_date(date_groups: list[str]) -> str | None:
date_str = " ".join(date_groups).strip()
# Remove ordinal suffixes
date_str = re.sub(r'(\d+)(st|nd|rd|th)', r'\1',
date_str, flags=re.IGNORECASE)
date_str = re.sub(r"(\d+)(st|nd|rd|th)", r"\1", date_str, flags=re.IGNORECASE)
# Parse with dateutil, default day=1 if missing
dt = parse(date_str, default=datetime.datetime(1900, 1, 1))
@@ -275,6 +278,7 @@ def parse_date(date_groups: list[str]) -> str | None:
except (ValueError, TypeError):
return None
def get_tools_data():
with open("data/tools.json", "r") as f:
return json.load(f)