Add caching layer for expensive API calls and file operations

Co-authored-by: Nathanwoodburn <62039630+Nathanwoodburn@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-11-21 11:30:23 +00:00
parent ca01b96e80
commit 607fdd4d46
5 changed files with 327 additions and 197 deletions

View File

@@ -8,6 +8,7 @@ from tools import getClientIP, getGitCommit, json_response, parse_date, get_tool
from blueprints import sol
from dateutil import parser as date_parser
from blueprints.spotify import get_spotify_track
from cache_helper import get_nc_config, get_git_latest_activity
# Constants
HTTP_OK = 200
@@ -21,14 +22,6 @@ app = Blueprint('api', __name__, url_prefix='/api/v1')
# Register solana blueprint
app.register_blueprint(sol.app)
# Load configuration
NC_CONFIG = requests.get(
"https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json"
).json()
if 'time-zone' not in NC_CONFIG:
NC_CONFIG['time-zone'] = 10
@app.route("/", strict_slashes=False)
@app.route("/help")
@@ -71,13 +64,14 @@ def version():
@app.route("/time")
def time():
"""Get the current time in the configured timezone."""
timezone_offset = datetime.timedelta(hours=NC_CONFIG["time-zone"])
nc_config = get_nc_config()
timezone_offset = datetime.timedelta(hours=nc_config["time-zone"])
timezone = datetime.timezone(offset=timezone_offset)
current_time = datetime.datetime.now(tz=timezone)
return jsonify({
"timestring": current_time.strftime("%A, %B %d, %Y %I:%M %p"),
"timestamp": current_time.timestamp(),
"timezone": NC_CONFIG["time-zone"],
"timezone": nc_config["time-zone"],
"timeISO": current_time.isoformat(),
"ip": getClientIP(request),
"status": HTTP_OK
@@ -87,8 +81,9 @@ def time():
@app.route("/timezone")
def timezone():
"""Get the current timezone setting."""
nc_config = get_nc_config()
return jsonify({
"timezone": NC_CONFIG["time-zone"],
"timezone": nc_config["time-zone"],
"ip": getClientIP(request),
"status": HTTP_OK
})
@@ -97,8 +92,9 @@ def timezone():
@app.route("/message")
def message():
"""Get the message from the configuration."""
nc_config = get_nc_config()
return jsonify({
"message": NC_CONFIG["message"],
"message": nc_config["message"],
"ip": getClientIP(request),
"status": HTTP_OK
})
@@ -138,27 +134,16 @@ def email_post():
@app.route("/project")
def project():
"""Get information about the current git project."""
gitinfo = {
"website": None,
}
try:
git = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1",
headers={"Authorization": os.getenv("git_token")},
)
git = git.json()
git = git[0]
repo_name = git["repo"]["name"]
repo_name = repo_name.lower()
git = get_git_latest_activity()
repo_name = git["repo"]["name"].lower()
repo_description = git["repo"]["description"]
gitinfo["name"] = repo_name
gitinfo["description"] = repo_description
gitinfo["url"] = git["repo"]["html_url"]
if "website" in git["repo"]:
gitinfo["website"] = git["repo"]["website"]
except Exception as e:
print(f"Error getting git data: {e}")
return json_response(request, "500 Internal Server Error", HTTP_SERVER_ERROR)
gitinfo = {
"name": repo_name,
"description": repo_description,
"url": git["repo"]["html_url"],
"website": git["repo"].get("website"),
}
return jsonify({
"repo_name": repo_name,

250
cache_helper.py Normal file
View File

@@ -0,0 +1,250 @@
"""
Cache helper module for expensive API calls and configuration.
Provides centralized caching with TTL for external API calls.
"""
import datetime
import os
import json
import requests
from functools import lru_cache
# Cache storage for NC_CONFIG with timestamp
_nc_config_cache = {"data": None, "timestamp": 0}
_nc_config_ttl = 3600 # 1 hour cache
def get_nc_config():
"""
Get NC_CONFIG with caching (1 hour TTL).
Falls back to default config on error.
Returns:
dict: Configuration dictionary
"""
global _nc_config_cache
current_time = datetime.datetime.now().timestamp()
# Check if cache is valid
if _nc_config_cache["data"] and (current_time - _nc_config_cache["timestamp"]) < _nc_config_ttl:
return _nc_config_cache["data"]
# Fetch new config
try:
config = requests.get(
"https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json",
timeout=5
).json()
_nc_config_cache = {"data": config, "timestamp": current_time}
return config
except Exception as e:
print(f"Error fetching NC_CONFIG: {e}")
# Return cached data if available, otherwise default
if _nc_config_cache["data"]:
return _nc_config_cache["data"]
return {"time-zone": 10, "message": ""}
# Cache storage for git data
_git_data_cache = {"data": None, "timestamp": 0}
_git_data_ttl = 300 # 5 minutes cache
def get_git_latest_activity():
"""
Get latest git activity with caching (5 minute TTL).
Returns:
dict: Git activity data or default values
"""
global _git_data_cache
current_time = datetime.datetime.now().timestamp()
# Check if cache is valid
if _git_data_cache["data"] and (current_time - _git_data_cache["timestamp"]) < _git_data_ttl:
return _git_data_cache["data"]
# Fetch new data
try:
git = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1",
headers={"Authorization": os.getenv("GIT_AUTH") or os.getenv("git_token") or ""},
timeout=5
)
git_data = git.json()
if git_data and len(git_data) > 0:
result = git_data[0]
_git_data_cache = {"data": result, "timestamp": current_time}
return result
except Exception as e:
print(f"Error fetching git data: {e}")
# Return cached or default
if _git_data_cache["data"]:
return _git_data_cache["data"]
return {
"repo": {
"html_url": "https://nathan.woodburn.au",
"name": "nathanwoodburn.github.io",
"description": "Personal website",
}
}
# Cache storage for projects
_projects_cache = {"data": None, "timestamp": 0}
_projects_ttl = 7200 # 2 hours cache
def get_projects(limit=3):
"""
Get projects list with caching (2 hour TTL).
Args:
limit (int): Number of projects to return
Returns:
list: List of project dictionaries
"""
global _projects_cache
current_time = datetime.datetime.now().timestamp()
# Check if cache is valid
if _projects_cache["data"] and (current_time - _projects_cache["timestamp"]) < _projects_ttl:
return _projects_cache["data"][:limit]
# Fetch new data
try:
projects = []
projectsreq = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos",
timeout=5
)
projects = projectsreq.json()
# Check for pagination
pageNum = 2
while 'rel="next"' in projectsreq.headers.get("link", ""):
projectsreq = requests.get(
f"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos?page={pageNum}",
timeout=5
)
projects += projectsreq.json()
pageNum += 1
# Safety limit
if pageNum > 10:
break
# Process projects
for project in projects:
if project.get("avatar_url") in ("https://git.woodburn.au/", ""):
project["avatar_url"] = "/favicon.png"
project["name"] = project["name"].replace("_", " ").replace("-", " ")
# Sort by last updated
projects_sorted = sorted(projects, key=lambda x: x.get("updated_at", ""), reverse=True)
# Remove duplicates by name
seen_names = set()
unique_projects = []
for project in projects_sorted:
if project["name"] not in seen_names:
unique_projects.append(project)
seen_names.add(project["name"])
_projects_cache = {"data": unique_projects, "timestamp": current_time}
return unique_projects[:limit]
except Exception as e:
print(f"Error fetching projects: {e}")
if _projects_cache["data"]:
return _projects_cache["data"][:limit]
return []
# Cache storage for uptime status
_uptime_cache = {"data": None, "timestamp": 0}
_uptime_ttl = 300 # 5 minutes cache
def get_uptime_status():
"""
Get uptime status with caching (5 minute TTL).
Returns:
bool: True if services are up, False otherwise
"""
global _uptime_cache
current_time = datetime.datetime.now().timestamp()
# Check if cache is valid
if _uptime_cache["data"] is not None and (current_time - _uptime_cache["timestamp"]) < _uptime_ttl:
return _uptime_cache["data"]
# Fetch new data
try:
uptime = requests.get(
"https://uptime.woodburn.au/api/status-page/main/badge",
timeout=5
)
content = uptime.content.decode("utf-8").lower()
status = "maintenance" in content or uptime.content.count(b"Up") > 1
_uptime_cache = {"data": status, "timestamp": current_time}
return status
except Exception as e:
print(f"Error fetching uptime: {e}")
# Return cached or default (assume up)
if _uptime_cache["data"] is not None:
return _uptime_cache["data"]
return True
# Cached wallet data loaders
@lru_cache(maxsize=1)
def get_wallet_tokens():
"""
Get wallet tokens with caching.
Returns:
list: List of token dictionaries
"""
try:
with open(".well-known/wallets/.tokens") as file:
return json.load(file)
except Exception as e:
print(f"Error loading tokens: {e}")
return []
@lru_cache(maxsize=1)
def get_coin_names():
"""
Get coin names with caching.
Returns:
dict: Dictionary of coin names
"""
try:
with open(".well-known/wallets/.coins") as file:
return json.load(file)
except Exception as e:
print(f"Error loading coin names: {e}")
return {}
@lru_cache(maxsize=1)
def get_wallet_domains():
"""
Get wallet domains with caching.
Returns:
dict: Dictionary of wallet domains
"""
try:
if os.path.isfile(".well-known/wallets/.domains"):
with open(".well-known/wallets/.domains") as file:
return json.load(file)
except Exception as e:
print(f"Error loading domains: {e}")
return {}

55
curl.py
View File

@@ -2,8 +2,8 @@ from flask import render_template
from tools import getAddress, get_tools_data, getClientIP
import os
from functools import lru_cache
import requests
from blueprints.spotify import get_spotify_track
from cache_helper import get_git_latest_activity, get_projects as get_projects_cached
MAX_WIDTH = 80
@@ -24,61 +24,26 @@ def get_header():
with open("templates/header.ascii", "r") as f:
return f.read()
@lru_cache(maxsize=1)
@lru_cache(maxsize=16)
def get_current_project():
git = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1",
headers={"Authorization": os.getenv("GIT_AUTH") if os.getenv("GIT_AUTH") else os.getenv("git_token")},
)
git = git.json()
git = git[0]
repo_name = git["repo"]["name"]
repo_name = repo_name.lower()
git = get_git_latest_activity()
repo_name = git["repo"]["name"].lower()
repo_description = git["repo"]["description"]
if not repo_description:
return f"{repo_name}"
return f"{repo_name} - {repo_description}"
return f"[1;36m{repo_name}[0m"
return f"[1;36m{repo_name}[0m - [1m{repo_description}[0m"
@lru_cache(maxsize=1)
@lru_cache(maxsize=16)
def get_projects():
projectsreq = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos"
)
projects = projectsreq.json()
# Check for next page
pageNum = 1
while 'rel="next"' in projectsreq.headers["link"]:
projectsreq = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos?page="
+ str(pageNum)
)
projects += projectsreq.json()
pageNum += 1
# Sort by last updated
projectsList = sorted(
projects, key=lambda x: x["updated_at"], reverse=True)
projects_data = get_projects_cached(limit=5)
projects = ""
projectNum = 0
includedNames = []
while len(includedNames) < 5 and projectNum < len(projectsList):
# Avoid duplicates
if projectsList[projectNum]["name"] in includedNames:
projectNum += 1
continue
includedNames.append(projectsList[projectNum]["name"])
project = projectsList[projectNum]
projects += f"""{project['name']} - {project['description'] if project['description'] else 'No description'}
for project in projects_data:
projects += f"""[1m{project['name']}[0m - {project['description'] if project['description'] else 'No description'}
{project['html_url']}
"""
projectNum += 1
return projects
def curl_response(request):
# Check if <path>.ascii exists
path = clean_path(request.path)

150
server.py
View File

@@ -33,6 +33,15 @@ from tools import (
get_tools_data,
)
from curl import curl_response
from cache_helper import (
get_nc_config,
get_git_latest_activity,
get_projects,
get_uptime_status,
get_wallet_tokens,
get_coin_names,
get_wallet_domains,
)
app = Flask(__name__)
CORS(app)
@@ -70,13 +79,6 @@ if os.path.isfile("data/sites.json"):
# Remove any sites that are not enabled
SITES = [site for site in SITES if "enabled" not in site or site["enabled"]]
PROJECTS = []
PROJECTS_UPDATED = 0
NC_CONFIG = requests.get(
"https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json"
).json()
# endregion
# region Assets routes
@@ -226,9 +228,6 @@ def api_legacy(function):
@app.route("/")
def index():
global PROJECTS
global PROJECTS_UPDATED
# Check if host if podcast.woodburn.au
if "podcast.woodburn.au" in request.host:
return render_template("podcast.html")
@@ -259,81 +258,22 @@ def index():
resp.set_cookie("loaded", "true", max_age=604800)
return resp
try:
git = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1",
headers={"Authorization": os.getenv("GIT_AUTH")},
)
git = git.json()
git = git[0]
repo_name = git["repo"]["name"]
repo_name = repo_name.lower()
# Use cached git data
git = get_git_latest_activity()
repo_name = git["repo"]["name"].lower()
repo_description = git["repo"]["description"]
except Exception as e:
repo_name = "nathanwoodburn.github.io"
repo_description = "Personal website"
git = {
"repo": {
"html_url": "https://nathan.woodburn.au",
"name": "nathanwoodburn.github.io",
"description": "Personal website",
}
}
print(f"Error getting git data: {e}")
# Get only repo names for the newest updates
if (
PROJECTS == []
or PROJECTS_UPDATED
< (datetime.datetime.now() - datetime.timedelta(hours=2)).timestamp()
):
projectsreq = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos"
)
PROJECTS = projectsreq.json()
# Check for next page
pageNum = 1
while 'rel="next"' in projectsreq.headers["link"]:
projectsreq = requests.get(
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos?page="
+ str(pageNum)
)
PROJECTS += projectsreq.json()
pageNum += 1
for project in PROJECTS:
if (
project["avatar_url"] == "https://git.woodburn.au/"
or project["avatar_url"] == ""
):
project["avatar_url"] = "/favicon.png"
project["name"] = project["name"].replace("_", " ").replace("-", " ")
# Sort by last updated
projectsList = sorted(PROJECTS, key=lambda x: x["updated_at"], reverse=True)
PROJECTS = []
projectNames = []
projectNum = 0
while len(PROJECTS) < 3:
if projectsList[projectNum]["name"] not in projectNames:
PROJECTS.append(projectsList[projectNum])
projectNames.append(projectsList[projectNum]["name"])
projectNum += 1
PROJECTS_UPDATED = datetime.datetime.now().timestamp()
# Use cached projects data
projects = get_projects(limit=3)
# Use cached uptime status
uptime = get_uptime_status()
custom = ""
# Check for downtime
uptime = requests.get("https://uptime.woodburn.au/api/status-page/main/badge")
if "maintenance" in uptime.content.decode("utf-8").lower():
uptime = True
else:
uptime = uptime.content.count(b"Up") > 1
if uptime:
custom += "<style>#downtime{display:none !important;}</style>"
else:
custom += "<style>#downtime{opacity:1;}</style>"
# Special names
if repo_name == "nathanwoodburn.github.io":
repo_name = "Nathan.Woodburn/"
@@ -341,8 +281,9 @@ def index():
html_url = git["repo"]["html_url"]
repo = '<a href="' + html_url + '" target="_blank">' + repo_name + "</a>"
# Get time
timezone_offset = datetime.timedelta(hours=NC_CONFIG["time-zone"])
# Get time using cached config
nc_config = get_nc_config()
timezone_offset = datetime.timedelta(hours=nc_config["time-zone"])
timezone = datetime.timezone(offset=timezone_offset)
time = datetime.datetime.now(tz=timezone)
@@ -365,7 +306,7 @@ def index():
setInterval(updateClock, 1000);
}
"""
time += f"startClock({NC_CONFIG['time-zone']});"
time += f"startClock({nc_config['time-zone']});"
time += "</script>"
HNSaddress = getAddress("HNS")
@@ -385,9 +326,9 @@ def index():
repo_description=repo_description,
custom=custom,
sites=SITES,
projects=PROJECTS,
projects=projects,
time=time,
message=NC_CONFIG.get("message", ""),
message=nc_config.get("message", ""),
),
200,
{"Content-Type": "text/html"},
@@ -409,31 +350,21 @@ def donate():
coinList = [file for file in coinList if file[0] != "."]
coinList.sort()
tokenList = []
with open(".well-known/wallets/.tokens") as file:
tokenList = file.read()
tokenList = json.loads(tokenList)
coinNames = {}
with open(".well-known/wallets/.coins") as file:
coinNames = file.read()
coinNames = json.loads(coinNames)
tokenList = get_wallet_tokens()
coinNames = get_coin_names()
coins = ""
default_coins = ["btc", "eth", "hns", "sol", "xrp", "ada", "dot"]
for file in coinList:
if file in coinNames:
coins += f'<a class="dropdown-item" style="{"display:none;" if file.lower() not in default_coins else ""}" href="?c={file.lower()}">{coinNames[file]}</a>'
else:
coins += f'<a class="dropdown-item" style="{"display:none;" if file.lower() not in default_coins else ""}" href="?c={file.lower()}">{file}</a>'
coin_name = coinNames.get(file, file)
display_style = "" if file.lower() in default_coins else "display:none;"
coins += f'<a class="dropdown-item" style="{display_style}" href="?c={file.lower()}">{coin_name}</a>'
for token in tokenList:
if token["chain"] != "null":
coins += f'<a class="dropdown-item" style="display:none;" href="?t={token["symbol"].lower()}&c={token["chain"].lower()}">{token["name"]} ({token["symbol"] + " on " if token["symbol"] != token["name"] else ""}{token["chain"]})</a>'
else:
coins += f'<a class="dropdown-item" style="display:none;" href="?t={token["symbol"].lower()}&c={token["chain"].lower()}">{token["name"]} ({token["symbol"] if token["symbol"] != token["name"] else ""})</a>'
chain_display = f" on {token['chain']}" if token["chain"] != "null" else ""
symbol_display = f" ({token['symbol']}{chain_display})" if token["symbol"] != token["name"] else chain_display
coins += f'<a class="dropdown-item" style="display:none;" href="?t={token["symbol"].lower()}&c={token["chain"].lower()}">{token["name"]}{symbol_display}</a>'
crypto = request.args.get("c")
if not crypto:
@@ -460,7 +391,6 @@ def donate():
token = {"name": "Unknown token", "symbol": token, "chain": crypto}
address = ""
domain = ""
cryptoHTML = ""
proof = ""
@@ -470,10 +400,12 @@ def donate():
if os.path.isfile(f".well-known/wallets/{crypto}"):
with open(f".well-known/wallets/{crypto}") as file:
address = file.read()
coin_display = coinNames.get(crypto, crypto)
if not token:
cryptoHTML += f"<br>Donate with {coinNames[crypto] if crypto in coinNames else crypto}:"
cryptoHTML += f"<br>Donate with {coin_display}:"
else:
cryptoHTML += f"<br>Donate with {token['name']} {'(' + token['symbol'] + ') ' if token['symbol'] != token['name'] else ''}on {crypto}:"
token_symbol = f" ({token['symbol']})" if token['symbol'] != token['name'] else ""
cryptoHTML += f"<br>Donate with {token['name']}{token_symbol} on {crypto}:"
cryptoHTML += f'<br><code data-bs-toggle="tooltip" data-bss-tooltip="" id="crypto-address" class="address" style="color: rgb(242,90,5);display: inline-block;" data-bs-original-title="Click to copy">{address}</code>'
if proof:
@@ -481,7 +413,9 @@ def donate():
elif token:
if "address" in token:
address = token["address"]
cryptoHTML += f"<br>Donate with {token['name']} {'(' + token['symbol'] + ')' if token['symbol'] != token['name'] else ''}{' on ' + crypto if crypto != 'NULL' else ''}:"
token_symbol = f" ({token['symbol']})" if token['symbol'] != token['name'] else ""
chain_display = f" on {crypto}" if crypto != 'NULL' else ""
cryptoHTML += f"<br>Donate with {token['name']}{token_symbol}{chain_display}:"
cryptoHTML += f'<br><code data-bs-toggle="tooltip" data-bss-tooltip="" id="crypto-address" class="address" style="color: rgb(242,90,5);display: inline-block;" data-bs-original-title="Click to copy">{address}</code>'
if proof:
cryptoHTML += proof
@@ -490,16 +424,12 @@ def donate():
else:
cryptoHTML += f"<br>Invalid chain: {crypto}<br>"
if os.path.isfile(".well-known/wallets/.domains"):
# Get json of all domains
with open(".well-known/wallets/.domains") as file:
domains = file.read()
domains = json.loads(domains)
domains = get_wallet_domains()
if crypto in domains:
domain = domains[crypto]
cryptoHTML += "<br>Or send to this domain on compatible wallets:<br>"
cryptoHTML += f'<code data-bs-toggle="tooltip" data-bss-tooltip="" id="crypto-domain" class="address" style="color: rgb(242,90,5);display: block;" data-bs-original-title="Click to copy">{domain}</code>'
if address:
cryptoHTML += (
'<br><img src="/address/'

View File

@@ -1,6 +1,6 @@
from flask import Request, render_template, jsonify, make_response
import os
from functools import lru_cache as cache
from functools import lru_cache
import datetime
from typing import Optional, Dict, Union, Tuple
import re
@@ -56,7 +56,7 @@ def getClientIP(request: Request) -> str:
ip = "unknown"
return ip
@cache
@lru_cache(maxsize=1)
def getGitCommit() -> str:
"""
Get the current git commit hash.
@@ -115,7 +115,7 @@ def isCrawler(request: Request) -> bool:
return any(crawler in user_agent for crawler in CRAWLERS)
return False
@cache
@lru_cache(maxsize=128)
def isDev(host: str) -> bool:
"""
Check if the host indicates a development environment.
@@ -135,7 +135,7 @@ def isDev(host: str) -> bool:
return True
return False
@cache
@lru_cache(maxsize=128)
def getHandshakeScript(host: str) -> str:
"""
Get the handshake script HTML snippet.
@@ -150,7 +150,7 @@ def getHandshakeScript(host: str) -> str:
return ""
return '<script src="https://nathan.woodburn/handshake.js" domain="nathan.woodburn" async></script><script src="https://nathan.woodburn/https.js" async></script>'
@cache
@lru_cache(maxsize=64)
def getAddress(coin: str) -> str:
"""
Get the wallet address for a cryptocurrency.
@@ -169,7 +169,7 @@ def getAddress(coin: str) -> str:
return address
@cache
@lru_cache(maxsize=256)
def getFilePath(name: str, path: str) -> Optional[str]:
"""
Find a file in a directory tree.