Compare commits
6 Commits
467faff592
...
copilot/im
| Author | SHA1 | Date | |
|---|---|---|---|
|
e489764ff8
|
|||
|
51c4416d4d
|
|||
|
|
a78d999a61 | ||
|
|
74afdc1b5b | ||
|
|
607fdd4d46 | ||
|
|
ca01b96e80 |
@@ -20,7 +20,7 @@ RUN --mount=type=cache,target=/root/.cache/uv \
|
||||
|
||||
# Copy only app source files
|
||||
COPY blueprints blueprints
|
||||
COPY main.py server.py curl.py tools.py mail.py ./
|
||||
COPY main.py server.py curl.py tools.py mail.py cache_helper.py ./
|
||||
COPY templates templates
|
||||
COPY data data
|
||||
COPY pwa pwa
|
||||
@@ -55,6 +55,7 @@ COPY --from=build --chown=appuser:appgroup /app/server.py /app/
|
||||
COPY --from=build --chown=appuser:appgroup /app/curl.py /app/
|
||||
COPY --from=build --chown=appuser:appgroup /app/tools.py /app/
|
||||
COPY --from=build --chown=appuser:appgroup /app/mail.py /app/
|
||||
COPY --from=build --chown=appuser:appgroup /app/cache_helper.py /app/
|
||||
|
||||
USER appuser
|
||||
EXPOSE 5000
|
||||
|
||||
31
addCoin.py
31
addCoin.py
@@ -1,35 +1,38 @@
|
||||
import os
|
||||
import json
|
||||
|
||||
if not os.path.exists('.well-known/wallets'):
|
||||
os.makedirs('.well-known/wallets')
|
||||
if not os.path.exists(".well-known/wallets"):
|
||||
os.makedirs(".well-known/wallets")
|
||||
|
||||
|
||||
def addCoin(token: str, name: str, address: str):
|
||||
with open('.well-known/wallets/'+token.upper(),'w') as f:
|
||||
with open(".well-known/wallets/" + token.upper(), "w") as f:
|
||||
f.write(address)
|
||||
|
||||
with open('.well-known/wallets/.coins','r') as f:
|
||||
with open(".well-known/wallets/.coins", "r") as f:
|
||||
coins = json.load(f)
|
||||
|
||||
coins[token.upper()] = f'{name} ({token.upper()})'
|
||||
with open('.well-known/wallets/.coins','w') as f:
|
||||
coins[token.upper()] = f"{name} ({token.upper()})"
|
||||
with open(".well-known/wallets/.coins", "w") as f:
|
||||
f.write(json.dumps(coins, indent=4))
|
||||
|
||||
|
||||
def addDomain(token: str, domain: str):
|
||||
with open('.well-known/wallets/.domains','r') as f:
|
||||
with open(".well-known/wallets/.domains", "r") as f:
|
||||
domains = json.load(f)
|
||||
|
||||
domains[token.upper()] = domain
|
||||
with open('.well-known/wallets/.domains','w') as f:
|
||||
with open(".well-known/wallets/.domains", "w") as f:
|
||||
f.write(json.dumps(domains, indent=4))
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Ask user for token
|
||||
token = input('Enter token symbol: ')
|
||||
name = input('Enter token name: ')
|
||||
address = input('Enter wallet address: ')
|
||||
token = input("Enter token symbol: ")
|
||||
name = input("Enter token name: ")
|
||||
address = input("Enter wallet address: ")
|
||||
addCoin(token, name, address)
|
||||
|
||||
if input('Do you want to add a domain? (y/n): ').lower() == 'y':
|
||||
domain = input('Enter domain: ')
|
||||
if input("Do you want to add a domain? (y/n): ").lower() == "y":
|
||||
domain = input("Enter domain: ")
|
||||
addDomain(token, domain)
|
||||
@@ -3,7 +3,7 @@ import os
|
||||
from cloudflare import Cloudflare
|
||||
from tools import json_response
|
||||
|
||||
app = Blueprint('acme', __name__)
|
||||
app = Blueprint("acme", __name__)
|
||||
|
||||
|
||||
@app.route("/hnsdoh-acme", methods=["POST"])
|
||||
@@ -23,7 +23,9 @@ def post():
|
||||
zone = cf.zones.list(name="hnsdoh.com").to_dict()
|
||||
zone_id = zone["result"][0]["id"] # type: ignore
|
||||
existing_records = cf.dns.records.list(
|
||||
zone_id=zone_id, type="TXT", name="_acme-challenge.hnsdoh.com" # type: ignore
|
||||
zone_id=zone_id,
|
||||
type="TXT",
|
||||
name="_acme-challenge.hnsdoh.com", # type: ignore
|
||||
).to_dict()
|
||||
record_id = existing_records["result"][0]["id"] # type: ignore
|
||||
cf.dns.records.delete(dns_record_id=record_id, zone_id=zone_id)
|
||||
|
||||
@@ -8,6 +8,7 @@ from tools import getClientIP, getGitCommit, json_response, parse_date, get_tool
|
||||
from blueprints import sol
|
||||
from dateutil import parser as date_parser
|
||||
from blueprints.spotify import get_spotify_track
|
||||
from cache_helper import get_nc_config, get_git_latest_activity
|
||||
|
||||
# Constants
|
||||
HTTP_OK = 200
|
||||
@@ -17,24 +18,17 @@ HTTP_NOT_FOUND = 404
|
||||
HTTP_UNSUPPORTED_MEDIA = 415
|
||||
HTTP_SERVER_ERROR = 500
|
||||
|
||||
app = Blueprint('api', __name__, url_prefix='/api/v1')
|
||||
app = Blueprint("api", __name__, url_prefix="/api/v1")
|
||||
# Register solana blueprint
|
||||
app.register_blueprint(sol.app)
|
||||
|
||||
# Load configuration
|
||||
NC_CONFIG = requests.get(
|
||||
"https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json"
|
||||
).json()
|
||||
|
||||
if 'time-zone' not in NC_CONFIG:
|
||||
NC_CONFIG['time-zone'] = 10
|
||||
|
||||
|
||||
@app.route("/", strict_slashes=False)
|
||||
@app.route("/help")
|
||||
def help():
|
||||
"""Provide API documentation and help."""
|
||||
return jsonify({
|
||||
return jsonify(
|
||||
{
|
||||
"message": "Welcome to Nathan.Woodburn/ API! This is a personal website. For more information, visit https://nathan.woodburn.au",
|
||||
"endpoints": {
|
||||
"/time": "Get the current time",
|
||||
@@ -49,19 +43,22 @@ def help():
|
||||
"/ping": "Just check if the site is up",
|
||||
"/ip": "Get your IP address",
|
||||
"/headers": "Get your request headers",
|
||||
"/help": "Get this help message"
|
||||
"/help": "Get this help message",
|
||||
},
|
||||
"base_url": "/api/v1",
|
||||
"version": getGitCommit(),
|
||||
"ip": getClientIP(request),
|
||||
"status": HTTP_OK
|
||||
})
|
||||
"status": HTTP_OK,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@app.route("/status")
|
||||
@app.route("/ping")
|
||||
def status():
|
||||
return json_response(request, "200 OK", HTTP_OK)
|
||||
|
||||
|
||||
@app.route("/version")
|
||||
def version():
|
||||
"""Get the current version of the website."""
|
||||
@@ -71,46 +68,48 @@ def version():
|
||||
@app.route("/time")
|
||||
def time():
|
||||
"""Get the current time in the configured timezone."""
|
||||
timezone_offset = datetime.timedelta(hours=NC_CONFIG["time-zone"])
|
||||
nc_config = get_nc_config()
|
||||
timezone_offset = datetime.timedelta(hours=nc_config["time-zone"])
|
||||
timezone = datetime.timezone(offset=timezone_offset)
|
||||
current_time = datetime.datetime.now(tz=timezone)
|
||||
return jsonify({
|
||||
return jsonify(
|
||||
{
|
||||
"timestring": current_time.strftime("%A, %B %d, %Y %I:%M %p"),
|
||||
"timestamp": current_time.timestamp(),
|
||||
"timezone": NC_CONFIG["time-zone"],
|
||||
"timezone": nc_config["time-zone"],
|
||||
"timeISO": current_time.isoformat(),
|
||||
"ip": getClientIP(request),
|
||||
"status": HTTP_OK
|
||||
})
|
||||
"status": HTTP_OK,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@app.route("/timezone")
|
||||
def timezone():
|
||||
"""Get the current timezone setting."""
|
||||
return jsonify({
|
||||
"timezone": NC_CONFIG["time-zone"],
|
||||
nc_config = get_nc_config()
|
||||
return jsonify(
|
||||
{
|
||||
"timezone": nc_config["time-zone"],
|
||||
"ip": getClientIP(request),
|
||||
"status": HTTP_OK
|
||||
})
|
||||
"status": HTTP_OK,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@app.route("/message")
|
||||
def message():
|
||||
"""Get the message from the configuration."""
|
||||
return jsonify({
|
||||
"message": NC_CONFIG["message"],
|
||||
"ip": getClientIP(request),
|
||||
"status": HTTP_OK
|
||||
})
|
||||
nc_config = get_nc_config()
|
||||
return jsonify(
|
||||
{"message": nc_config["message"], "ip": getClientIP(request), "status": HTTP_OK}
|
||||
)
|
||||
|
||||
|
||||
@app.route("/ip")
|
||||
def ip():
|
||||
"""Get the client's IP address."""
|
||||
return jsonify({
|
||||
"ip": getClientIP(request),
|
||||
"status": HTTP_OK
|
||||
})
|
||||
return jsonify({"ip": getClientIP(request), "status": HTTP_OK})
|
||||
|
||||
|
||||
@app.route("/email", methods=["POST"])
|
||||
@@ -118,7 +117,9 @@ def email_post():
|
||||
"""Send an email via the API (requires API key)."""
|
||||
# Verify json
|
||||
if not request.is_json:
|
||||
return json_response(request, "415 Unsupported Media Type", HTTP_UNSUPPORTED_MEDIA)
|
||||
return json_response(
|
||||
request, "415 Unsupported Media Type", HTTP_UNSUPPORTED_MEDIA
|
||||
)
|
||||
|
||||
# Check if api key sent
|
||||
data = request.json
|
||||
@@ -138,35 +139,27 @@ def email_post():
|
||||
@app.route("/project")
|
||||
def project():
|
||||
"""Get information about the current git project."""
|
||||
gitinfo = {
|
||||
"website": None,
|
||||
}
|
||||
try:
|
||||
git = requests.get(
|
||||
"https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1",
|
||||
headers={"Authorization": os.getenv("git_token")},
|
||||
)
|
||||
git = git.json()
|
||||
git = git[0]
|
||||
repo_name = git["repo"]["name"]
|
||||
repo_name = repo_name.lower()
|
||||
git = get_git_latest_activity()
|
||||
repo_name = git["repo"]["name"].lower()
|
||||
repo_description = git["repo"]["description"]
|
||||
gitinfo["name"] = repo_name
|
||||
gitinfo["description"] = repo_description
|
||||
gitinfo["url"] = git["repo"]["html_url"]
|
||||
if "website" in git["repo"]:
|
||||
gitinfo["website"] = git["repo"]["website"]
|
||||
except Exception as e:
|
||||
print(f"Error getting git data: {e}")
|
||||
return json_response(request, "500 Internal Server Error", HTTP_SERVER_ERROR)
|
||||
|
||||
return jsonify({
|
||||
gitinfo = {
|
||||
"name": repo_name,
|
||||
"description": repo_description,
|
||||
"url": git["repo"]["html_url"],
|
||||
"website": git["repo"].get("website"),
|
||||
}
|
||||
|
||||
return jsonify(
|
||||
{
|
||||
"repo_name": repo_name,
|
||||
"repo_description": repo_description,
|
||||
"repo": gitinfo,
|
||||
"ip": getClientIP(request),
|
||||
"status": HTTP_OK
|
||||
})
|
||||
"status": HTTP_OK,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@app.route("/tools")
|
||||
def tools():
|
||||
@@ -179,6 +172,7 @@ def tools():
|
||||
|
||||
return json_response(request, {"tools": tools}, HTTP_OK)
|
||||
|
||||
|
||||
@app.route("/playing")
|
||||
def playing():
|
||||
"""Get the currently playing Spotify track."""
|
||||
@@ -201,15 +195,11 @@ def headers():
|
||||
# Remove from headers
|
||||
toremove.append(key)
|
||||
|
||||
|
||||
for key in toremove:
|
||||
headers.pop(key)
|
||||
|
||||
return jsonify({
|
||||
"headers": headers,
|
||||
"ip": getClientIP(request),
|
||||
"status": HTTP_OK
|
||||
})
|
||||
return jsonify({"headers": headers, "ip": getClientIP(request), "status": HTTP_OK})
|
||||
|
||||
|
||||
@app.route("/page_date")
|
||||
def page_date():
|
||||
@@ -226,33 +216,33 @@ def page_date():
|
||||
r = requests.get(url, timeout=5)
|
||||
r.raise_for_status()
|
||||
except requests.exceptions.RequestException as e:
|
||||
return json_response(request, f"400 Bad Request 'url' unreachable: {e}", HTTP_BAD_REQUEST)
|
||||
return json_response(
|
||||
request, f"400 Bad Request 'url' unreachable: {e}", HTTP_BAD_REQUEST
|
||||
)
|
||||
|
||||
page_text = r.text
|
||||
|
||||
# Remove ordinal suffixes globally
|
||||
page_text = re.sub(r'(\d+)(st|nd|rd|th)', r'\1', page_text, flags=re.IGNORECASE)
|
||||
page_text = re.sub(r"(\d+)(st|nd|rd|th)", r"\1", page_text, flags=re.IGNORECASE)
|
||||
# Remove HTML comments
|
||||
page_text = re.sub(r'<!--.*?-->', '', page_text, flags=re.DOTALL)
|
||||
page_text = re.sub(r"<!--.*?-->", "", page_text, flags=re.DOTALL)
|
||||
|
||||
date_patterns = [
|
||||
r'(\d{4})[/-](\d{1,2})[/-](\d{1,2})', # YYYY-MM-DD
|
||||
r'(\d{1,2})[/-](\d{1,2})[/-](\d{4})', # DD-MM-YYYY
|
||||
r'(?:Last updated:|Updated:|Updated last:)?\s*(\d{1,2})\s+([A-Za-z]{3,9})[, ]?\s*(\d{4})', # DD Month YYYY
|
||||
r'(?:\b\w+\b\s+){0,3}([A-Za-z]{3,9})\s+(\d{1,2}),?\s*(\d{4})', # Month DD, YYYY with optional words
|
||||
r'\b(\d{4})(\d{2})(\d{2})\b', # YYYYMMDD
|
||||
r'(?:Last updated:|Updated:|Last update)?\s*([A-Za-z]{3,9})\s+(\d{4})', # Month YYYY only
|
||||
r"(\d{4})[/-](\d{1,2})[/-](\d{1,2})", # YYYY-MM-DD
|
||||
r"(\d{1,2})[/-](\d{1,2})[/-](\d{4})", # DD-MM-YYYY
|
||||
r"(?:Last updated:|Updated:|Updated last:)?\s*(\d{1,2})\s+([A-Za-z]{3,9})[, ]?\s*(\d{4})", # DD Month YYYY
|
||||
r"(?:\b\w+\b\s+){0,3}([A-Za-z]{3,9})\s+(\d{1,2}),?\s*(\d{4})", # Month DD, YYYY with optional words
|
||||
r"\b(\d{4})(\d{2})(\d{2})\b", # YYYYMMDD
|
||||
r"(?:Last updated:|Updated:|Last update)?\s*([A-Za-z]{3,9})\s+(\d{4})", # Month YYYY only
|
||||
]
|
||||
|
||||
|
||||
|
||||
# Structured data patterns
|
||||
json_date_patterns = {
|
||||
r'"datePublished"\s*:\s*"([^"]+)"': "published",
|
||||
r'"dateModified"\s*:\s*"([^"]+)"': "modified",
|
||||
r'<meta\s+(?:[^>]*?)property\s*=\s*"article:published_time"\s+content\s*=\s*"([^"]+)"': "published",
|
||||
r'<meta\s+(?:[^>]*?)property\s*=\s*"article:modified_time"\s+content\s*=\s*"([^"]+)"': "modified",
|
||||
r'<time\s+datetime\s*=\s*"([^"]+)"': "published"
|
||||
r'<time\s+datetime\s*=\s*"([^"]+)"': "published",
|
||||
}
|
||||
|
||||
found_dates = []
|
||||
@@ -270,7 +260,7 @@ def page_date():
|
||||
for match in re.findall(pattern, page_text):
|
||||
try:
|
||||
dt = date_parser.isoparse(match)
|
||||
formatted_date = dt.strftime('%Y-%m-%d')
|
||||
formatted_date = dt.strftime("%Y-%m-%d")
|
||||
found_dates.append([[formatted_date], -1, date_type])
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
@@ -279,7 +269,9 @@ def page_date():
|
||||
return json_response(request, "Date not found on page", HTTP_BAD_REQUEST)
|
||||
|
||||
today = datetime.date.today()
|
||||
tolerance_date = today + datetime.timedelta(days=1) # Allow for slight future dates (e.g., time zones)
|
||||
tolerance_date = today + datetime.timedelta(
|
||||
days=1
|
||||
) # Allow for slight future dates (e.g., time zones)
|
||||
# When processing dates
|
||||
processed_dates = []
|
||||
for date_groups, pattern_format, date_type in found_dates:
|
||||
@@ -300,18 +292,32 @@ def page_date():
|
||||
date_obj = {"date": dt.strftime("%Y-%m-%d"), "type": date_type}
|
||||
if verbose:
|
||||
if pattern_format == -1:
|
||||
date_obj.update({"source": "metadata", "pattern_used": pattern_format, "raw": date_groups[0]})
|
||||
date_obj.update(
|
||||
{
|
||||
"source": "metadata",
|
||||
"pattern_used": pattern_format,
|
||||
"raw": date_groups[0],
|
||||
}
|
||||
)
|
||||
else:
|
||||
date_obj.update({"source": "content", "pattern_used": pattern_format, "raw": " ".join(date_groups)})
|
||||
date_obj.update(
|
||||
{
|
||||
"source": "content",
|
||||
"pattern_used": pattern_format,
|
||||
"raw": " ".join(date_groups),
|
||||
}
|
||||
)
|
||||
processed_dates.append(date_obj)
|
||||
|
||||
if not processed_dates:
|
||||
if verbose:
|
||||
return jsonify({
|
||||
return jsonify(
|
||||
{
|
||||
"message": "No valid dates found on page",
|
||||
"found_dates": found_dates,
|
||||
"processed_dates": processed_dates
|
||||
}), HTTP_BAD_REQUEST
|
||||
"processed_dates": processed_dates,
|
||||
}
|
||||
), HTTP_BAD_REQUEST
|
||||
return json_response(request, "No valid dates found on page", HTTP_BAD_REQUEST)
|
||||
# Sort dates and return latest
|
||||
processed_dates.sort(key=lambda x: x["date"])
|
||||
|
||||
@@ -3,63 +3,83 @@ from flask import Blueprint, render_template, request, jsonify
|
||||
import markdown
|
||||
from bs4 import BeautifulSoup
|
||||
import re
|
||||
from functools import lru_cache
|
||||
from tools import isCLI, getClientIP, getHandshakeScript
|
||||
|
||||
app = Blueprint('blog', __name__, url_prefix='/blog')
|
||||
app = Blueprint("blog", __name__, url_prefix="/blog")
|
||||
|
||||
|
||||
@lru_cache(maxsize=32)
|
||||
def list_page_files():
|
||||
blog_pages = os.listdir("data/blog")
|
||||
# Sort pages by modified time, newest first
|
||||
blog_pages.sort(
|
||||
key=lambda x: os.path.getmtime(os.path.join("data/blog", x)), reverse=True)
|
||||
key=lambda x: os.path.getmtime(os.path.join("data/blog", x)), reverse=True
|
||||
)
|
||||
|
||||
# Remove .md extension
|
||||
blog_pages = [page.removesuffix(".md")
|
||||
for page in blog_pages if page.endswith(".md")]
|
||||
blog_pages = [
|
||||
page.removesuffix(".md") for page in blog_pages if page.endswith(".md")
|
||||
]
|
||||
|
||||
return blog_pages
|
||||
|
||||
|
||||
def render_page(date, handshake_scripts=None):
|
||||
# Convert md to html
|
||||
@lru_cache(maxsize=64)
|
||||
def get_blog_content(date):
|
||||
"""Get and cache blog content."""
|
||||
if not os.path.exists(f"data/blog/{date}.md"):
|
||||
return render_template("404.html"), 404
|
||||
return None
|
||||
|
||||
with open(f"data/blog/{date}.md", "r") as f:
|
||||
content = f.read()
|
||||
return f.read()
|
||||
|
||||
|
||||
@lru_cache(maxsize=64)
|
||||
def render_markdown_to_html(content):
|
||||
"""Convert markdown to HTML with caching."""
|
||||
html = markdown.markdown(
|
||||
content, extensions=["sane_lists", "codehilite", "fenced_code"]
|
||||
)
|
||||
# Add target="_blank" to all links
|
||||
html = html.replace('<a href="', '<a target="_blank" href="')
|
||||
html = html.replace("<h4", "<h4 style='margin-bottom:0px;'")
|
||||
html = fix_numbered_lists(html)
|
||||
return html
|
||||
|
||||
|
||||
def render_page(date, handshake_scripts=None):
|
||||
# Get cached content
|
||||
content = get_blog_content(date)
|
||||
if content is None:
|
||||
return render_template("404.html"), 404
|
||||
|
||||
# Get the title from the file name
|
||||
title = date.removesuffix(".md").replace("_", " ")
|
||||
# Convert the md to html
|
||||
content = markdown.markdown(
|
||||
content, extensions=['sane_lists', 'codehilite', 'fenced_code'])
|
||||
# Add target="_blank" to all links
|
||||
content = content.replace('<a href="', '<a target="_blank" href="')
|
||||
|
||||
content = content.replace("<h4", "<h4 style='margin-bottom:0px;'")
|
||||
content = fix_numbered_lists(content)
|
||||
# Convert the md to html (cached)
|
||||
html_content = render_markdown_to_html(content)
|
||||
|
||||
return render_template(
|
||||
"blog/template.html",
|
||||
title=title,
|
||||
content=content,
|
||||
content=html_content,
|
||||
handshake_scripts=handshake_scripts,
|
||||
)
|
||||
|
||||
|
||||
def fix_numbered_lists(html):
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
|
||||
# Find the <p> tag containing numbered steps
|
||||
paragraphs = soup.find_all('p')
|
||||
paragraphs = soup.find_all("p")
|
||||
for p in paragraphs:
|
||||
content = p.decode_contents() # type: ignore
|
||||
|
||||
# Check for likely numbered step structure
|
||||
if re.search(r'1\.\s', content):
|
||||
if re.search(r"1\.\s", content):
|
||||
# Split into pre-list and numbered steps
|
||||
# Match: <br>, optional whitespace, then a number and dot
|
||||
parts = re.split(r'(?:<br\s*/?>)?\s*(\d+)\.\s', content)
|
||||
parts = re.split(r"(?:<br\s*/?>)?\s*(\d+)\.\s", content)
|
||||
|
||||
# Result: [pre-text, '1', step1, '2', step2, ..., '10', step10]
|
||||
pre_text = parts[0].strip()
|
||||
@@ -70,8 +90,7 @@ def fix_numbered_lists(html):
|
||||
for i in range(0, len(steps), 2):
|
||||
if i + 1 < len(steps):
|
||||
step_html = steps[i + 1].strip()
|
||||
ol_items.append(
|
||||
f"<li style='list-style: auto;'>{step_html}</li>")
|
||||
ol_items.append(f"<li style='list-style: auto;'>{step_html}</li>")
|
||||
|
||||
# Build the final list HTML
|
||||
ol_html = "<ol>\n" + "\n".join(ol_items) + "\n</ol>"
|
||||
@@ -80,7 +99,7 @@ def fix_numbered_lists(html):
|
||||
new_html = f"{pre_text}<br />\n{ol_html}" if pre_text else ol_html
|
||||
|
||||
# Replace old <p> with parsed version
|
||||
new_fragment = BeautifulSoup(new_html, 'html.parser')
|
||||
new_fragment = BeautifulSoup(new_html, "html.parser")
|
||||
p.replace_with(new_fragment)
|
||||
break # Only process the first matching <p>
|
||||
|
||||
@@ -117,16 +136,23 @@ def index():
|
||||
blog_pages = list_page_files()
|
||||
# Create a html list of pages
|
||||
blog_pages = [
|
||||
{"name": page.replace("_", " "), "url": f"/blog/{page}", "download": f"/blog/{page}.md"} for page in blog_pages
|
||||
{
|
||||
"name": page.replace("_", " "),
|
||||
"url": f"/blog/{page}",
|
||||
"download": f"/blog/{page}.md",
|
||||
}
|
||||
for page in blog_pages
|
||||
]
|
||||
|
||||
# Render the template
|
||||
return jsonify({
|
||||
return jsonify(
|
||||
{
|
||||
"status": 200,
|
||||
"message": "Check out my various blog postsa",
|
||||
"ip": getClientIP(request),
|
||||
"blogs": blog_pages
|
||||
}), 200
|
||||
"blogs": blog_pages,
|
||||
}
|
||||
), 200
|
||||
|
||||
|
||||
@app.route("/<path:path>")
|
||||
@@ -134,31 +160,30 @@ def path(path):
|
||||
if not isCLI(request):
|
||||
return render_page(path, handshake_scripts=getHandshakeScript(request.host))
|
||||
|
||||
# Convert md to html
|
||||
if not os.path.exists(f"data/blog/{path}.md"):
|
||||
# Get cached content
|
||||
content = get_blog_content(path)
|
||||
if content is None:
|
||||
return render_template("404.html"), 404
|
||||
|
||||
with open(f"data/blog/{path}.md", "r") as f:
|
||||
content = f.read()
|
||||
# Get the title from the file name
|
||||
title = path.replace("_", " ")
|
||||
return jsonify({
|
||||
return jsonify(
|
||||
{
|
||||
"status": 200,
|
||||
"message": f"Blog post: {title}",
|
||||
"ip": getClientIP(request),
|
||||
"title": title,
|
||||
"content": content,
|
||||
"download": f"/blog/{path}.md"
|
||||
}), 200
|
||||
"download": f"/blog/{path}.md",
|
||||
}
|
||||
), 200
|
||||
|
||||
|
||||
@app.route("/<path:path>.md")
|
||||
def path_md(path):
|
||||
if not os.path.exists(f"data/blog/{path}.md"):
|
||||
content = get_blog_content(path)
|
||||
if content is None:
|
||||
return render_template("404.html"), 404
|
||||
|
||||
with open(f"data/blog/{path}.md", "r") as f:
|
||||
content = f.read()
|
||||
|
||||
# Return the raw markdown file
|
||||
return content, 200, {'Content-Type': 'text/plain; charset=utf-8'}
|
||||
return content, 200, {"Content-Type": "text/plain; charset=utf-8"}
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
from flask import Blueprint, render_template, make_response, request, jsonify
|
||||
import datetime
|
||||
import os
|
||||
from functools import lru_cache
|
||||
from tools import getHandshakeScript, error_response, isCLI
|
||||
from curl import get_header, MAX_WIDTH
|
||||
from bs4 import BeautifulSoup
|
||||
import re
|
||||
|
||||
# Create blueprint
|
||||
app = Blueprint('now', __name__, url_prefix='/now')
|
||||
app = Blueprint("now", __name__, url_prefix="/now")
|
||||
|
||||
|
||||
@lru_cache(maxsize=16)
|
||||
def list_page_files():
|
||||
now_pages = os.listdir("templates/now")
|
||||
now_pages = [
|
||||
@@ -19,12 +21,14 @@ def list_page_files():
|
||||
return now_pages
|
||||
|
||||
|
||||
@lru_cache(maxsize=16)
|
||||
def list_dates():
|
||||
now_pages = list_page_files()
|
||||
now_dates = [page.split(".")[0] for page in now_pages]
|
||||
return now_dates
|
||||
|
||||
|
||||
@lru_cache(maxsize=8)
|
||||
def get_latest_date(formatted=False):
|
||||
if formatted:
|
||||
date = list_dates()[0]
|
||||
@@ -51,7 +55,10 @@ def render(date, handshake_scripts=None):
|
||||
|
||||
date_formatted = datetime.datetime.strptime(date, "%y_%m_%d")
|
||||
date_formatted = date_formatted.strftime("%A, %B %d, %Y")
|
||||
return render_template(f"now/{date}.html", DATE=date_formatted, handshake_scripts=handshake_scripts)
|
||||
return render_template(
|
||||
f"now/{date}.html", DATE=date_formatted, handshake_scripts=handshake_scripts
|
||||
)
|
||||
|
||||
|
||||
def render_curl(date=None):
|
||||
# If the date is not available, render the latest page
|
||||
@@ -71,7 +78,7 @@ def render_curl(date=None):
|
||||
# Load HTML
|
||||
with open(f"templates/now/{date}.html", "r", encoding="utf-8") as f:
|
||||
raw_html = f.read().replace("{{ date }}", date_formatted)
|
||||
soup = BeautifulSoup(raw_html, 'html.parser')
|
||||
soup = BeautifulSoup(raw_html, "html.parser")
|
||||
|
||||
posts = []
|
||||
|
||||
@@ -103,7 +110,7 @@ def render_curl(date=None):
|
||||
for line in text.splitlines():
|
||||
while len(line) > MAX_WIDTH:
|
||||
# Find last space within max_width
|
||||
split_at = line.rfind(' ', 0, MAX_WIDTH)
|
||||
split_at = line.rfind(" ", 0, MAX_WIDTH)
|
||||
if split_at == -1:
|
||||
split_at = MAX_WIDTH
|
||||
wrapped_lines.append(line[:split_at].rstrip())
|
||||
@@ -124,8 +131,9 @@ def render_curl(date=None):
|
||||
for post in posts:
|
||||
response += f"[1m{post['header']}[0m\n\n{post['content']}\n\n"
|
||||
|
||||
return render_template("now.ascii", date=date_formatted, content=response, header=get_header())
|
||||
|
||||
return render_template(
|
||||
"now.ascii", date=date_formatted, content=response, header=get_header()
|
||||
)
|
||||
|
||||
|
||||
@app.route("/", strict_slashes=False)
|
||||
@@ -153,8 +161,9 @@ def old():
|
||||
date_fmt = datetime.datetime.strptime(date, "%y_%m_%d")
|
||||
date_fmt = date_fmt.strftime("%A, %B %d, %Y")
|
||||
response += f"{date_fmt} - /now/{link}\n"
|
||||
return render_template("now.ascii", date="Old Now Pages", content=response, header=get_header())
|
||||
|
||||
return render_template(
|
||||
"now.ascii", date="Old Now Pages", content=response, header=get_header()
|
||||
)
|
||||
|
||||
html = '<ul class="list-group">'
|
||||
html += f'<a style="text-decoration:none;" href="/now"><li class="list-group-item" style="background-color:#000000;color:#ffffff;">{get_latest_date(True)}</li></a>'
|
||||
@@ -167,7 +176,9 @@ def old():
|
||||
|
||||
html += "</ul>"
|
||||
return render_template(
|
||||
"now/old.html", handshake_scripts=getHandshakeScript(request.host), now_pages=html
|
||||
"now/old.html",
|
||||
handshake_scripts=getHandshakeScript(request.host),
|
||||
now_pages=html,
|
||||
)
|
||||
|
||||
|
||||
@@ -185,7 +196,7 @@ def rss():
|
||||
link = page.strip(".html")
|
||||
date = datetime.datetime.strptime(link, "%y_%m_%d")
|
||||
date = date.strftime("%A, %B %d, %Y")
|
||||
rss += f'<item><title>What\'s Happening {date}</title><link>{host}/now/{link}</link><description>Latest updates for {date}</description><guid>{host}/now/{link}</guid></item>'
|
||||
rss += f"<item><title>What's Happening {date}</title><link>{host}/now/{link}</link><description>Latest updates for {date}</description><guid>{host}/now/{link}</guid></item>"
|
||||
rss += "</channel></rss>"
|
||||
return make_response(rss, 200, {"Content-Type": "application/rss+xml"})
|
||||
|
||||
@@ -196,6 +207,17 @@ def json():
|
||||
host = "https://" + request.host
|
||||
if ":" in request.host:
|
||||
host = "http://" + request.host
|
||||
now_pages = [{"url": host+"/now/"+page.strip(".html"), "date": datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime(
|
||||
"%A, %B %d, %Y"), "title": "What's Happening "+datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime("%A, %B %d, %Y")} for page in now_pages]
|
||||
now_pages = [
|
||||
{
|
||||
"url": host + "/now/" + page.strip(".html"),
|
||||
"date": datetime.datetime.strptime(
|
||||
page.strip(".html"), "%y_%m_%d"
|
||||
).strftime("%A, %B %d, %Y"),
|
||||
"title": "What's Happening "
|
||||
+ datetime.datetime.strptime(page.strip(".html"), "%y_%m_%d").strftime(
|
||||
"%A, %B %d, %Y"
|
||||
),
|
||||
}
|
||||
for page in now_pages
|
||||
]
|
||||
return jsonify(now_pages)
|
||||
|
||||
@@ -2,7 +2,8 @@ from flask import Blueprint, make_response, request
|
||||
from tools import error_response
|
||||
import requests
|
||||
|
||||
app = Blueprint('podcast', __name__)
|
||||
app = Blueprint("podcast", __name__)
|
||||
|
||||
|
||||
@app.route("/ID1")
|
||||
def index():
|
||||
|
||||
@@ -9,12 +9,12 @@ import binascii
|
||||
import base64
|
||||
import os
|
||||
|
||||
app = Blueprint('sol', __name__)
|
||||
app = Blueprint("sol", __name__)
|
||||
|
||||
SOLANA_HEADERS = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Action-Version": "2.4.2",
|
||||
"X-Blockchain-Ids": "solana:5eykt4UsFv8P8NJdTREpY1vzqKqZKvdp"
|
||||
"X-Blockchain-Ids": "solana:5eykt4UsFv8P8NJdTREpY1vzqKqZKvdp",
|
||||
}
|
||||
|
||||
SOLANA_ADDRESS = None
|
||||
@@ -23,15 +23,19 @@ if os.path.isfile(".well-known/wallets/SOL"):
|
||||
address = file.read()
|
||||
SOLANA_ADDRESS = Pubkey.from_string(address.strip())
|
||||
|
||||
|
||||
def create_transaction(sender_address: str, amount: float) -> str:
|
||||
if SOLANA_ADDRESS is None:
|
||||
raise ValueError("SOLANA_ADDRESS is not set. Please ensure the .well-known/wallets/SOL file exists and contains a valid address.")
|
||||
raise ValueError(
|
||||
"SOLANA_ADDRESS is not set. Please ensure the .well-known/wallets/SOL file exists and contains a valid address."
|
||||
)
|
||||
# Create transaction
|
||||
sender = Pubkey.from_string(sender_address)
|
||||
transfer_ix = transfer(
|
||||
TransferParams(
|
||||
from_pubkey=sender, to_pubkey=SOLANA_ADDRESS, lamports=int(
|
||||
amount * 1000000000)
|
||||
from_pubkey=sender,
|
||||
to_pubkey=SOLANA_ADDRESS,
|
||||
lamports=int(amount * 1000000000),
|
||||
)
|
||||
)
|
||||
solana_client = Client("https://api.mainnet-beta.solana.com")
|
||||
@@ -50,11 +54,15 @@ def create_transaction(sender_address: str, amount: float) -> str:
|
||||
base64_string = base64.b64encode(raw_bytes).decode("utf-8")
|
||||
return base64_string
|
||||
|
||||
|
||||
def get_solana_address() -> str:
|
||||
if SOLANA_ADDRESS is None:
|
||||
raise ValueError("SOLANA_ADDRESS is not set. Please ensure the .well-known/wallets/SOL file exists and contains a valid address.")
|
||||
raise ValueError(
|
||||
"SOLANA_ADDRESS is not set. Please ensure the .well-known/wallets/SOL file exists and contains a valid address."
|
||||
)
|
||||
return str(SOLANA_ADDRESS)
|
||||
|
||||
|
||||
@app.route("/donate", methods=["GET", "OPTIONS"])
|
||||
def sol_donate():
|
||||
data = {
|
||||
@@ -103,7 +111,6 @@ def sol_donate_amount(amount):
|
||||
|
||||
@app.route("/donate/<amount>", methods=["POST"])
|
||||
def sol_donate_post(amount):
|
||||
|
||||
if not request.json:
|
||||
return jsonify({"message": "Error: No JSON data provided"}), 400, SOLANA_HEADERS
|
||||
|
||||
@@ -122,4 +129,8 @@ def sol_donate_post(amount):
|
||||
return jsonify({"message": "Error: Amount too small"}), 400, SOLANA_HEADERS
|
||||
|
||||
transaction = create_transaction(sender, amount)
|
||||
return jsonify({"message": "Success", "transaction": transaction}), 200, SOLANA_HEADERS
|
||||
return (
|
||||
jsonify({"message": "Success", "transaction": transaction}),
|
||||
200,
|
||||
SOLANA_HEADERS,
|
||||
)
|
||||
|
||||
@@ -5,7 +5,7 @@ import requests
|
||||
import time
|
||||
import base64
|
||||
|
||||
app = Blueprint('spotify', __name__, url_prefix='/spotify')
|
||||
app = Blueprint("spotify", __name__, url_prefix="/spotify")
|
||||
|
||||
CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
|
||||
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
|
||||
@@ -21,6 +21,7 @@ ACCESS_TOKEN = None
|
||||
REFRESH_TOKEN = os.getenv("SPOTIFY_REFRESH_TOKEN")
|
||||
TOKEN_EXPIRES = 0
|
||||
|
||||
|
||||
def refresh_access_token():
|
||||
"""Refresh Spotify access token when expired."""
|
||||
global ACCESS_TOKEN, TOKEN_EXPIRES
|
||||
@@ -52,6 +53,7 @@ def refresh_access_token():
|
||||
TOKEN_EXPIRES = time.time() + token_info.get("expires_in", 3600)
|
||||
return ACCESS_TOKEN
|
||||
|
||||
|
||||
@app.route("/login")
|
||||
def login():
|
||||
auth_query = (
|
||||
@@ -60,6 +62,7 @@ def login():
|
||||
)
|
||||
return redirect(auth_query)
|
||||
|
||||
|
||||
@app.route("/callback")
|
||||
def callback():
|
||||
code = request.args.get("code")
|
||||
@@ -76,12 +79,14 @@ def callback():
|
||||
response = requests.post(SPOTIFY_TOKEN_URL, data=data)
|
||||
token_info = response.json()
|
||||
if "access_token" not in token_info:
|
||||
return json_response(request, {"error": "Failed to obtain token", "details": token_info}, 400)
|
||||
return json_response(
|
||||
request, {"error": "Failed to obtain token", "details": token_info}, 400
|
||||
)
|
||||
|
||||
access_token = token_info["access_token"]
|
||||
me = requests.get(
|
||||
"https://api.spotify.com/v1/me",
|
||||
headers={"Authorization": f"Bearer {access_token}"}
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
).json()
|
||||
|
||||
if me.get("id") != ALLOWED_SPOTIFY_USER_ID:
|
||||
@@ -93,6 +98,7 @@ def callback():
|
||||
print("Refresh Token:", REFRESH_TOKEN)
|
||||
return redirect(url_for("spotify.currently_playing"))
|
||||
|
||||
|
||||
@app.route("/", strict_slashes=False)
|
||||
@app.route("/playing")
|
||||
def currently_playing():
|
||||
@@ -100,6 +106,7 @@ def currently_playing():
|
||||
track = get_spotify_track()
|
||||
return json_response(request, {"spotify": track}, 200)
|
||||
|
||||
|
||||
def get_spotify_track():
|
||||
"""Internal function to get current playing track without HTTP context."""
|
||||
token = refresh_access_token()
|
||||
@@ -125,6 +132,6 @@ def get_spotify_track():
|
||||
"album_art": data["item"]["album"]["images"][0]["url"],
|
||||
"is_playing": data["is_playing"],
|
||||
"progress_ms": data.get("progress_ms", 0),
|
||||
"duration_ms": data["item"].get("duration_ms",1)
|
||||
"duration_ms": data["item"].get("duration_ms", 1),
|
||||
}
|
||||
return track
|
||||
@@ -1,7 +1,7 @@
|
||||
from flask import Blueprint, request
|
||||
from tools import json_response
|
||||
|
||||
app = Blueprint('template', __name__)
|
||||
app = Blueprint("template", __name__)
|
||||
|
||||
|
||||
@app.route("/", strict_slashes=False)
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
from flask import Blueprint, make_response, request, jsonify, send_from_directory, redirect
|
||||
from flask import (
|
||||
Blueprint,
|
||||
make_response,
|
||||
request,
|
||||
jsonify,
|
||||
send_from_directory,
|
||||
redirect,
|
||||
)
|
||||
from tools import error_response
|
||||
import os
|
||||
|
||||
app = Blueprint('well-known', __name__, url_prefix='/.well-known')
|
||||
app = Blueprint("well-known", __name__, url_prefix="/.well-known")
|
||||
|
||||
|
||||
@app.route("/<path:path>")
|
||||
@@ -12,7 +19,7 @@ def index(path):
|
||||
|
||||
@app.route("/wallets/<path:path>")
|
||||
def wallets(path):
|
||||
if path[0] == "." and 'proof' not in path:
|
||||
if path[0] == "." and "proof" not in path:
|
||||
return send_from_directory(
|
||||
".well-known/wallets", path, mimetype="application/json"
|
||||
)
|
||||
|
||||
264
cache_helper.py
Normal file
264
cache_helper.py
Normal file
@@ -0,0 +1,264 @@
|
||||
"""
|
||||
Cache helper module for expensive API calls and configuration.
|
||||
Provides centralized caching with TTL for external API calls.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import os
|
||||
import json
|
||||
import requests
|
||||
from functools import lru_cache
|
||||
|
||||
|
||||
# Cache storage for NC_CONFIG with timestamp
|
||||
_nc_config_cache = {"data": None, "timestamp": 0}
|
||||
_nc_config_ttl = 3600 # 1 hour cache
|
||||
|
||||
|
||||
def get_nc_config():
|
||||
"""
|
||||
Get NC_CONFIG with caching (1 hour TTL).
|
||||
Falls back to default config on error.
|
||||
|
||||
Returns:
|
||||
dict: Configuration dictionary
|
||||
"""
|
||||
global _nc_config_cache
|
||||
current_time = datetime.datetime.now().timestamp()
|
||||
|
||||
# Check if cache is valid
|
||||
if (
|
||||
_nc_config_cache["data"]
|
||||
and (current_time - _nc_config_cache["timestamp"]) < _nc_config_ttl
|
||||
):
|
||||
return _nc_config_cache["data"]
|
||||
|
||||
# Fetch new config
|
||||
try:
|
||||
config = requests.get(
|
||||
"https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json",
|
||||
timeout=5,
|
||||
).json()
|
||||
_nc_config_cache = {"data": config, "timestamp": current_time}
|
||||
return config
|
||||
except Exception as e:
|
||||
print(f"Error fetching NC_CONFIG: {e}")
|
||||
# Return cached data if available, otherwise default
|
||||
if _nc_config_cache["data"]:
|
||||
return _nc_config_cache["data"]
|
||||
return {"time-zone": 10, "message": ""}
|
||||
|
||||
|
||||
# Cache storage for git data
|
||||
_git_data_cache = {"data": None, "timestamp": 0}
|
||||
_git_data_ttl = 300 # 5 minutes cache
|
||||
|
||||
|
||||
def get_git_latest_activity():
|
||||
"""
|
||||
Get latest git activity with caching (5 minute TTL).
|
||||
|
||||
Returns:
|
||||
dict: Git activity data or default values
|
||||
"""
|
||||
global _git_data_cache
|
||||
current_time = datetime.datetime.now().timestamp()
|
||||
|
||||
# Check if cache is valid
|
||||
if (
|
||||
_git_data_cache["data"]
|
||||
and (current_time - _git_data_cache["timestamp"]) < _git_data_ttl
|
||||
):
|
||||
return _git_data_cache["data"]
|
||||
|
||||
# Fetch new data
|
||||
try:
|
||||
git = requests.get(
|
||||
"https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1",
|
||||
headers={
|
||||
"Authorization": os.getenv("GIT_AUTH") or os.getenv("git_token") or ""
|
||||
},
|
||||
timeout=5,
|
||||
)
|
||||
git_data = git.json()
|
||||
if git_data and len(git_data) > 0:
|
||||
result = git_data[0]
|
||||
_git_data_cache = {"data": result, "timestamp": current_time}
|
||||
return result
|
||||
except Exception as e:
|
||||
print(f"Error fetching git data: {e}")
|
||||
|
||||
# Return cached or default
|
||||
if _git_data_cache["data"]:
|
||||
return _git_data_cache["data"]
|
||||
|
||||
return {
|
||||
"repo": {
|
||||
"html_url": "https://nathan.woodburn.au",
|
||||
"name": "nathanwoodburn.github.io",
|
||||
"description": "Personal website",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# Cache storage for projects
|
||||
_projects_cache = {"data": None, "timestamp": 0}
|
||||
_projects_ttl = 7200 # 2 hours cache
|
||||
|
||||
|
||||
def get_projects(limit=3):
|
||||
"""
|
||||
Get projects list with caching (2 hour TTL).
|
||||
|
||||
Args:
|
||||
limit (int): Number of projects to return
|
||||
|
||||
Returns:
|
||||
list: List of project dictionaries
|
||||
"""
|
||||
global _projects_cache
|
||||
current_time = datetime.datetime.now().timestamp()
|
||||
|
||||
# Check if cache is valid
|
||||
if (
|
||||
_projects_cache["data"]
|
||||
and (current_time - _projects_cache["timestamp"]) < _projects_ttl
|
||||
):
|
||||
return _projects_cache["data"][:limit]
|
||||
|
||||
# Fetch new data
|
||||
try:
|
||||
projects = []
|
||||
projectsreq = requests.get(
|
||||
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos", timeout=5
|
||||
)
|
||||
projects = projectsreq.json()
|
||||
|
||||
# Check for pagination
|
||||
pageNum = 2
|
||||
while 'rel="next"' in projectsreq.headers.get("link", ""):
|
||||
projectsreq = requests.get(
|
||||
f"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos?page={pageNum}",
|
||||
timeout=5,
|
||||
)
|
||||
projects += projectsreq.json()
|
||||
pageNum += 1
|
||||
# Safety limit
|
||||
if pageNum > 10:
|
||||
break
|
||||
|
||||
# Process projects
|
||||
for project in projects:
|
||||
if project.get("avatar_url") in ("https://git.woodburn.au/", ""):
|
||||
project["avatar_url"] = "/favicon.png"
|
||||
project["name"] = project["name"].replace("_", " ").replace("-", " ")
|
||||
|
||||
# Sort by last updated
|
||||
projects_sorted = sorted(
|
||||
projects, key=lambda x: x.get("updated_at", ""), reverse=True
|
||||
)
|
||||
|
||||
# Remove duplicates by name
|
||||
seen_names = set()
|
||||
unique_projects = []
|
||||
for project in projects_sorted:
|
||||
if project["name"] not in seen_names:
|
||||
unique_projects.append(project)
|
||||
seen_names.add(project["name"])
|
||||
|
||||
_projects_cache = {"data": unique_projects, "timestamp": current_time}
|
||||
return unique_projects[:limit]
|
||||
except Exception as e:
|
||||
print(f"Error fetching projects: {e}")
|
||||
if _projects_cache["data"]:
|
||||
return _projects_cache["data"][:limit]
|
||||
return []
|
||||
|
||||
|
||||
# Cache storage for uptime status
|
||||
_uptime_cache = {"data": None, "timestamp": 0}
|
||||
_uptime_ttl = 300 # 5 minutes cache
|
||||
|
||||
|
||||
def get_uptime_status():
|
||||
"""
|
||||
Get uptime status with caching (5 minute TTL).
|
||||
|
||||
Returns:
|
||||
bool: True if services are up, False otherwise
|
||||
"""
|
||||
global _uptime_cache
|
||||
current_time = datetime.datetime.now().timestamp()
|
||||
|
||||
# Check if cache is valid
|
||||
if (
|
||||
_uptime_cache["data"] is not None
|
||||
and (current_time - _uptime_cache["timestamp"]) < _uptime_ttl
|
||||
):
|
||||
return _uptime_cache["data"]
|
||||
|
||||
# Fetch new data
|
||||
try:
|
||||
uptime = requests.get(
|
||||
"https://uptime.woodburn.au/api/status-page/main/badge", timeout=5
|
||||
)
|
||||
content = uptime.content.decode("utf-8").lower()
|
||||
status = "maintenance" in content or uptime.content.count(b"Up") > 1
|
||||
_uptime_cache = {"data": status, "timestamp": current_time}
|
||||
return status
|
||||
except Exception as e:
|
||||
print(f"Error fetching uptime: {e}")
|
||||
# Return cached or default (assume up)
|
||||
if _uptime_cache["data"] is not None:
|
||||
return _uptime_cache["data"]
|
||||
return True
|
||||
|
||||
|
||||
# Cached wallet data loaders
|
||||
@lru_cache(maxsize=1)
|
||||
def get_wallet_tokens():
|
||||
"""
|
||||
Get wallet tokens with caching.
|
||||
|
||||
Returns:
|
||||
list: List of token dictionaries
|
||||
"""
|
||||
try:
|
||||
with open(".well-known/wallets/.tokens") as file:
|
||||
return json.load(file)
|
||||
except Exception as e:
|
||||
print(f"Error loading tokens: {e}")
|
||||
return []
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_coin_names():
|
||||
"""
|
||||
Get coin names with caching.
|
||||
|
||||
Returns:
|
||||
dict: Dictionary of coin names
|
||||
"""
|
||||
try:
|
||||
with open(".well-known/wallets/.coins") as file:
|
||||
return json.load(file)
|
||||
except Exception as e:
|
||||
print(f"Error loading coin names: {e}")
|
||||
return {}
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_wallet_domains():
|
||||
"""
|
||||
Get wallet domains with caching.
|
||||
|
||||
Returns:
|
||||
dict: Dictionary of wallet domains
|
||||
"""
|
||||
try:
|
||||
if os.path.isfile(".well-known/wallets/.domains"):
|
||||
with open(".well-known/wallets/.domains") as file:
|
||||
return json.load(file)
|
||||
except Exception as e:
|
||||
print(f"Error loading domains: {e}")
|
||||
return {}
|
||||
21
cleanSite.py
21
cleanSite.py
@@ -1,36 +1,37 @@
|
||||
import os
|
||||
|
||||
|
||||
def cleanSite(path: str):
|
||||
# Check if the file is sitemap.xml
|
||||
if path.endswith('sitemap.xml'):
|
||||
if path.endswith("sitemap.xml"):
|
||||
# Open the file
|
||||
with open(path, 'r') as f:
|
||||
with open(path, "r") as f:
|
||||
# Read the content
|
||||
content = f.read()
|
||||
# Replace all .html with empty string
|
||||
content = content.replace('.html', '')
|
||||
content = content.replace(".html", "")
|
||||
# Write the content back to the file
|
||||
with open(path, 'w') as f:
|
||||
with open(path, "w") as f:
|
||||
f.write(content)
|
||||
# Skip the file
|
||||
return
|
||||
|
||||
# If the file is not an html file, skip it
|
||||
if not path.endswith('.html'):
|
||||
if not path.endswith(".html"):
|
||||
if os.path.isdir(path):
|
||||
for file in os.listdir(path):
|
||||
cleanSite(path + '/' + file)
|
||||
cleanSite(path + "/" + file)
|
||||
|
||||
return
|
||||
|
||||
# Open the file
|
||||
with open(path, 'r') as f:
|
||||
with open(path, "r") as f:
|
||||
# Read and remove all .html
|
||||
content = f.read().replace('.html"', '"')
|
||||
# Write the cleaned content back to the file
|
||||
with open(path, 'w') as f:
|
||||
with open(path, "w") as f:
|
||||
f.write(content)
|
||||
|
||||
|
||||
for file in os.listdir('templates'):
|
||||
cleanSite('templates/' + file)
|
||||
for file in os.listdir("templates"):
|
||||
cleanSite("templates/" + file)
|
||||
|
||||
130
curl.py
130
curl.py
@@ -2,12 +2,13 @@ from flask import render_template
|
||||
from tools import getAddress, get_tools_data, getClientIP
|
||||
import os
|
||||
from functools import lru_cache
|
||||
import requests
|
||||
from blueprints.spotify import get_spotify_track
|
||||
from cache_helper import get_git_latest_activity, get_projects as get_projects_cached
|
||||
|
||||
|
||||
MAX_WIDTH = 80
|
||||
|
||||
|
||||
def clean_path(path: str):
|
||||
path = path.strip("/ ").lower()
|
||||
# Strip any .html extension
|
||||
@@ -19,66 +20,35 @@ def clean_path(path:str):
|
||||
path = "index"
|
||||
return path
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_header():
|
||||
with open("templates/header.ascii", "r") as f:
|
||||
return f.read()
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
|
||||
@lru_cache(maxsize=16)
|
||||
def get_current_project():
|
||||
git = requests.get(
|
||||
"https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1",
|
||||
headers={"Authorization": os.getenv("GIT_AUTH") if os.getenv("GIT_AUTH") else os.getenv("git_token")},
|
||||
)
|
||||
git = git.json()
|
||||
git = git[0]
|
||||
repo_name = git["repo"]["name"]
|
||||
repo_name = repo_name.lower()
|
||||
git = get_git_latest_activity()
|
||||
repo_name = git["repo"]["name"].lower()
|
||||
repo_description = git["repo"]["description"]
|
||||
if not repo_description:
|
||||
return f"[1;36m{repo_name}[0m"
|
||||
return f"[1;36m{repo_name}[0m - [1m{repo_description}[0m"
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
@lru_cache(maxsize=16)
|
||||
def get_projects():
|
||||
projectsreq = requests.get(
|
||||
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos"
|
||||
)
|
||||
|
||||
projects = projectsreq.json()
|
||||
|
||||
# Check for next page
|
||||
pageNum = 1
|
||||
while 'rel="next"' in projectsreq.headers["link"]:
|
||||
projectsreq = requests.get(
|
||||
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos?page="
|
||||
+ str(pageNum)
|
||||
)
|
||||
projects += projectsreq.json()
|
||||
pageNum += 1
|
||||
|
||||
# Sort by last updated
|
||||
projectsList = sorted(
|
||||
projects, key=lambda x: x["updated_at"], reverse=True)
|
||||
projects_data = get_projects_cached(limit=5)
|
||||
projects = ""
|
||||
projectNum = 0
|
||||
includedNames = []
|
||||
while len(includedNames) < 5 and projectNum < len(projectsList):
|
||||
# Avoid duplicates
|
||||
if projectsList[projectNum]["name"] in includedNames:
|
||||
projectNum += 1
|
||||
continue
|
||||
includedNames.append(projectsList[projectNum]["name"])
|
||||
project = projectsList[projectNum]
|
||||
projects += f"""[1m{project['name']}[0m - {project['description'] if project['description'] else 'No description'}
|
||||
{project['html_url']}
|
||||
for project in projects_data:
|
||||
projects += f"""[1m{project["name"]}[0m - {project["description"] if project["description"] else "No description"}
|
||||
{project["html_url"]}
|
||||
|
||||
"""
|
||||
projectNum += 1
|
||||
|
||||
return projects
|
||||
|
||||
|
||||
def curl_response(request):
|
||||
# Check if <path>.ascii exists
|
||||
path = clean_path(request.path)
|
||||
@@ -86,39 +56,81 @@ def curl_response(request):
|
||||
# Handle special cases
|
||||
if path == "index":
|
||||
# Get current project
|
||||
return render_template("index.ascii",repo=get_current_project(), ip=getClientIP(request), spotify=get_spotify_track()), 200, {'Content-Type': 'text/plain; charset=utf-8'}
|
||||
return (
|
||||
render_template(
|
||||
"index.ascii",
|
||||
repo=get_current_project(),
|
||||
ip=getClientIP(request),
|
||||
spotify=get_spotify_track(),
|
||||
),
|
||||
200,
|
||||
{"Content-Type": "text/plain; charset=utf-8"},
|
||||
)
|
||||
if path == "projects":
|
||||
# Get projects
|
||||
return render_template("projects.ascii",header=get_header(),projects=get_projects()), 200, {'Content-Type': 'text/plain; charset=utf-8'}
|
||||
return (
|
||||
render_template(
|
||||
"projects.ascii", header=get_header(), projects=get_projects()
|
||||
),
|
||||
200,
|
||||
{"Content-Type": "text/plain; charset=utf-8"},
|
||||
)
|
||||
|
||||
if path == "donate":
|
||||
# Get donation info
|
||||
return render_template("donate.ascii",header=get_header(),
|
||||
HNS=getAddress("HNS"), BTC=getAddress("BTC"),
|
||||
SOL=getAddress("SOL"), ETH=getAddress("ETH")
|
||||
), 200, {'Content-Type': 'text/plain; charset=utf-8'}
|
||||
return (
|
||||
render_template(
|
||||
"donate.ascii",
|
||||
header=get_header(),
|
||||
HNS=getAddress("HNS"),
|
||||
BTC=getAddress("BTC"),
|
||||
SOL=getAddress("SOL"),
|
||||
ETH=getAddress("ETH"),
|
||||
),
|
||||
200,
|
||||
{"Content-Type": "text/plain; charset=utf-8"},
|
||||
)
|
||||
|
||||
if path == "donate/more":
|
||||
coinList = os.listdir(".well-known/wallets")
|
||||
coinList = [file for file in coinList if file[0] != "."]
|
||||
coinList.sort()
|
||||
return render_template("donate_more.ascii",header=get_header(),
|
||||
coins=coinList
|
||||
), 200, {'Content-Type': 'text/plain; charset=utf-8'}
|
||||
return (
|
||||
render_template("donate_more.ascii", header=get_header(), coins=coinList),
|
||||
200,
|
||||
{"Content-Type": "text/plain; charset=utf-8"},
|
||||
)
|
||||
|
||||
# For other donation pages, fall back to ascii if it exists
|
||||
if path.startswith("donate/"):
|
||||
coin = path.split("/")[1]
|
||||
address = getAddress(coin)
|
||||
if address != "":
|
||||
return render_template("donate_coin.ascii",header=get_header(),coin=coin.upper(),address=address), 200, {'Content-Type': 'text/plain; charset=utf-8'}
|
||||
return (
|
||||
render_template(
|
||||
"donate_coin.ascii",
|
||||
header=get_header(),
|
||||
coin=coin.upper(),
|
||||
address=address,
|
||||
),
|
||||
200,
|
||||
{"Content-Type": "text/plain; charset=utf-8"},
|
||||
)
|
||||
|
||||
if path == "tools":
|
||||
tools = get_tools_data()
|
||||
return render_template("tools.ascii",header=get_header(),tools=tools), 200, {'Content-Type': 'text/plain; charset=utf-8'}
|
||||
return (
|
||||
render_template("tools.ascii", header=get_header(), tools=tools),
|
||||
200,
|
||||
{"Content-Type": "text/plain; charset=utf-8"},
|
||||
)
|
||||
|
||||
if os.path.exists(f"templates/{path}.ascii"):
|
||||
return render_template(f"{path}.ascii",header=get_header()), 200, {'Content-Type': 'text/plain; charset=utf-8'}
|
||||
return (
|
||||
render_template(f"{path}.ascii", header=get_header()),
|
||||
200,
|
||||
{"Content-Type": "text/plain; charset=utf-8"},
|
||||
)
|
||||
|
||||
# Fallback to html if it exists
|
||||
if os.path.exists(f"templates/{path}.html"):
|
||||
@@ -127,6 +139,10 @@ def curl_response(request):
|
||||
# Return curl error page
|
||||
error = {
|
||||
"code": 404,
|
||||
"message": "The requested resource was not found on this server."
|
||||
"message": "The requested resource was not found on this server.",
|
||||
}
|
||||
return render_template("error.ascii",header=get_header(),error=error), 404, {'Content-Type': 'text/plain; charset=utf-8'}
|
||||
return (
|
||||
render_template("error.ascii", header=get_header(), error=error),
|
||||
404,
|
||||
{"Content-Type": "text/plain; charset=utf-8"},
|
||||
)
|
||||
|
||||
57
mail.py
57
mail.py
@@ -21,6 +21,7 @@ import os
|
||||
# "body":"G'\''day\nThis is a test email from my website api\n\nRegards,\nNathan.Woodburn/"
|
||||
# }'
|
||||
|
||||
|
||||
def validateSender(email):
|
||||
domains = os.getenv("EMAIL_DOMAINS")
|
||||
if not domains:
|
||||
@@ -33,37 +34,29 @@ def validateSender(email):
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def sendEmail(data):
|
||||
fromEmail = "noreply@woodburn.au"
|
||||
if "from" in data:
|
||||
fromEmail = data["from"]
|
||||
|
||||
if not validateSender(fromEmail):
|
||||
return jsonify({
|
||||
"status": 400,
|
||||
"message": "Bad request 'from' email invalid"
|
||||
})
|
||||
|
||||
return jsonify({"status": 400, "message": "Bad request 'from' email invalid"})
|
||||
|
||||
if "to" not in data:
|
||||
return jsonify({
|
||||
"status": 400,
|
||||
"message": "Bad request 'to' json data missing"
|
||||
})
|
||||
return jsonify({"status": 400, "message": "Bad request 'to' json data missing"})
|
||||
to = data["to"]
|
||||
|
||||
if "subject" not in data:
|
||||
return jsonify({
|
||||
"status": 400,
|
||||
"message": "Bad request 'subject' json data missing"
|
||||
})
|
||||
return jsonify(
|
||||
{"status": 400, "message": "Bad request 'subject' json data missing"}
|
||||
)
|
||||
subject = data["subject"]
|
||||
|
||||
if "body" not in data:
|
||||
return jsonify({
|
||||
"status": 400,
|
||||
"message": "Bad request 'body' json data missing"
|
||||
})
|
||||
return jsonify(
|
||||
{"status": 400, "message": "Bad request 'body' json data missing"}
|
||||
)
|
||||
body = data["body"]
|
||||
|
||||
if not re.match(r"[^@]+@[^@]+\.[^@]+", to):
|
||||
@@ -76,15 +69,15 @@ def sendEmail(data):
|
||||
raise ValueError("Body cannot be empty.")
|
||||
|
||||
fromName = "Nathan Woodburn"
|
||||
if 'sender' in data:
|
||||
fromName = data['sender']
|
||||
if "sender" in data:
|
||||
fromName = data["sender"]
|
||||
|
||||
# Create the email message
|
||||
msg = MIMEMultipart()
|
||||
msg['From'] = formataddr((fromName, fromEmail))
|
||||
msg['To'] = to
|
||||
msg['Subject'] = subject
|
||||
msg.attach(MIMEText(body, 'plain'))
|
||||
msg["From"] = formataddr((fromName, fromEmail))
|
||||
msg["To"] = to
|
||||
msg["Subject"] = subject
|
||||
msg.attach(MIMEText(body, "plain"))
|
||||
|
||||
# Sending the email
|
||||
try:
|
||||
@@ -92,24 +85,12 @@ def sendEmail(data):
|
||||
user = os.getenv("EMAIL_USER")
|
||||
password = os.getenv("EMAIL_PASS")
|
||||
if host is None or user is None or password is None:
|
||||
return jsonify({
|
||||
"status": 500,
|
||||
"error": "Email server not configured"
|
||||
})
|
||||
return jsonify({"status": 500, "error": "Email server not configured"})
|
||||
|
||||
with smtplib.SMTP_SSL(host, 465) as server:
|
||||
server.login(user, password)
|
||||
server.sendmail(fromEmail, to, msg.as_string())
|
||||
print("Email sent successfully.")
|
||||
return jsonify({
|
||||
"status": 200,
|
||||
"message": "Send email successfully"
|
||||
})
|
||||
return jsonify({"status": 200, "message": "Send email successfully"})
|
||||
except Exception as e:
|
||||
return jsonify({
|
||||
"status": 500,
|
||||
"error": "Sending email failed",
|
||||
"exception":e
|
||||
})
|
||||
|
||||
|
||||
return jsonify({"status": 500, "error": "Sending email failed", "exception": e})
|
||||
|
||||
22
main.py
22
main.py
@@ -17,9 +17,10 @@ class GunicornApp(BaseApplication):
|
||||
def load(self):
|
||||
return self.application
|
||||
|
||||
if __name__ == '__main__':
|
||||
workers = os.getenv('WORKERS')
|
||||
threads = os.getenv('THREADS')
|
||||
|
||||
if __name__ == "__main__":
|
||||
workers = os.getenv("WORKERS")
|
||||
threads = os.getenv("THREADS")
|
||||
if workers is None:
|
||||
workers = 1
|
||||
if threads is None:
|
||||
@@ -27,10 +28,17 @@ if __name__ == '__main__':
|
||||
workers = int(workers)
|
||||
threads = int(threads)
|
||||
options = {
|
||||
'bind': '0.0.0.0:5000',
|
||||
'workers': workers,
|
||||
'threads': threads,
|
||||
"bind": "0.0.0.0:5000",
|
||||
"workers": workers,
|
||||
"threads": threads,
|
||||
}
|
||||
gunicorn_app = GunicornApp(app, options)
|
||||
print('Starting server with ' + str(workers) + ' workers and ' + str(threads) + ' threads', flush=True)
|
||||
print(
|
||||
"Starting server with "
|
||||
+ str(workers)
|
||||
+ " workers and "
|
||||
+ str(threads)
|
||||
+ " threads",
|
||||
flush=True,
|
||||
)
|
||||
gunicorn_app.run()
|
||||
|
||||
162
server.py
162
server.py
@@ -33,6 +33,15 @@ from tools import (
|
||||
get_tools_data,
|
||||
)
|
||||
from curl import curl_response
|
||||
from cache_helper import (
|
||||
get_nc_config,
|
||||
get_git_latest_activity,
|
||||
get_projects,
|
||||
get_uptime_status,
|
||||
get_wallet_tokens,
|
||||
get_coin_names,
|
||||
get_wallet_domains,
|
||||
)
|
||||
|
||||
app = Flask(__name__)
|
||||
CORS(app)
|
||||
@@ -70,13 +79,6 @@ if os.path.isfile("data/sites.json"):
|
||||
# Remove any sites that are not enabled
|
||||
SITES = [site for site in SITES if "enabled" not in site or site["enabled"]]
|
||||
|
||||
PROJECTS = []
|
||||
PROJECTS_UPDATED = 0
|
||||
|
||||
NC_CONFIG = requests.get(
|
||||
"https://cloud.woodburn.au/s/4ToXgFe3TnnFcN7/download/website-conf.json"
|
||||
).json()
|
||||
|
||||
# endregion
|
||||
|
||||
# region Assets routes
|
||||
@@ -226,9 +228,6 @@ def api_legacy(function):
|
||||
|
||||
@app.route("/")
|
||||
def index():
|
||||
global PROJECTS
|
||||
global PROJECTS_UPDATED
|
||||
|
||||
# Check if host if podcast.woodburn.au
|
||||
if "podcast.woodburn.au" in request.host:
|
||||
return render_template("podcast.html")
|
||||
@@ -259,81 +258,22 @@ def index():
|
||||
resp.set_cookie("loaded", "true", max_age=604800)
|
||||
return resp
|
||||
|
||||
try:
|
||||
git = requests.get(
|
||||
"https://git.woodburn.au/api/v1/users/nathanwoodburn/activities/feeds?only-performed-by=true&limit=1",
|
||||
headers={"Authorization": os.getenv("GIT_AUTH")},
|
||||
)
|
||||
git = git.json()
|
||||
git = git[0]
|
||||
repo_name = git["repo"]["name"]
|
||||
repo_name = repo_name.lower()
|
||||
# Use cached git data
|
||||
git = get_git_latest_activity()
|
||||
repo_name = git["repo"]["name"].lower()
|
||||
repo_description = git["repo"]["description"]
|
||||
except Exception as e:
|
||||
repo_name = "nathanwoodburn.github.io"
|
||||
repo_description = "Personal website"
|
||||
git = {
|
||||
"repo": {
|
||||
"html_url": "https://nathan.woodburn.au",
|
||||
"name": "nathanwoodburn.github.io",
|
||||
"description": "Personal website",
|
||||
}
|
||||
}
|
||||
print(f"Error getting git data: {e}")
|
||||
|
||||
# Get only repo names for the newest updates
|
||||
if (
|
||||
PROJECTS == []
|
||||
or PROJECTS_UPDATED
|
||||
< (datetime.datetime.now() - datetime.timedelta(hours=2)).timestamp()
|
||||
):
|
||||
projectsreq = requests.get(
|
||||
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos"
|
||||
)
|
||||
|
||||
PROJECTS = projectsreq.json()
|
||||
|
||||
# Check for next page
|
||||
pageNum = 1
|
||||
while 'rel="next"' in projectsreq.headers["link"]:
|
||||
projectsreq = requests.get(
|
||||
"https://git.woodburn.au/api/v1/users/nathanwoodburn/repos?page="
|
||||
+ str(pageNum)
|
||||
)
|
||||
PROJECTS += projectsreq.json()
|
||||
pageNum += 1
|
||||
|
||||
for project in PROJECTS:
|
||||
if (
|
||||
project["avatar_url"] == "https://git.woodburn.au/"
|
||||
or project["avatar_url"] == ""
|
||||
):
|
||||
project["avatar_url"] = "/favicon.png"
|
||||
project["name"] = project["name"].replace("_", " ").replace("-", " ")
|
||||
# Sort by last updated
|
||||
projectsList = sorted(PROJECTS, key=lambda x: x["updated_at"], reverse=True)
|
||||
PROJECTS = []
|
||||
projectNames = []
|
||||
projectNum = 0
|
||||
while len(PROJECTS) < 3:
|
||||
if projectsList[projectNum]["name"] not in projectNames:
|
||||
PROJECTS.append(projectsList[projectNum])
|
||||
projectNames.append(projectsList[projectNum]["name"])
|
||||
projectNum += 1
|
||||
PROJECTS_UPDATED = datetime.datetime.now().timestamp()
|
||||
# Use cached projects data
|
||||
projects = get_projects(limit=3)
|
||||
|
||||
# Use cached uptime status
|
||||
uptime = get_uptime_status()
|
||||
custom = ""
|
||||
# Check for downtime
|
||||
uptime = requests.get("https://uptime.woodburn.au/api/status-page/main/badge")
|
||||
if "maintenance" in uptime.content.decode("utf-8").lower():
|
||||
uptime = True
|
||||
else:
|
||||
uptime = uptime.content.count(b"Up") > 1
|
||||
|
||||
if uptime:
|
||||
custom += "<style>#downtime{display:none !important;}</style>"
|
||||
else:
|
||||
custom += "<style>#downtime{opacity:1;}</style>"
|
||||
|
||||
# Special names
|
||||
if repo_name == "nathanwoodburn.github.io":
|
||||
repo_name = "Nathan.Woodburn/"
|
||||
@@ -341,8 +281,9 @@ def index():
|
||||
html_url = git["repo"]["html_url"]
|
||||
repo = '<a href="' + html_url + '" target="_blank">' + repo_name + "</a>"
|
||||
|
||||
# Get time
|
||||
timezone_offset = datetime.timedelta(hours=NC_CONFIG["time-zone"])
|
||||
# Get time using cached config
|
||||
nc_config = get_nc_config()
|
||||
timezone_offset = datetime.timedelta(hours=nc_config["time-zone"])
|
||||
timezone = datetime.timezone(offset=timezone_offset)
|
||||
time = datetime.datetime.now(tz=timezone)
|
||||
|
||||
@@ -365,7 +306,7 @@ def index():
|
||||
setInterval(updateClock, 1000);
|
||||
}
|
||||
"""
|
||||
time += f"startClock({NC_CONFIG['time-zone']});"
|
||||
time += f"startClock({nc_config['time-zone']});"
|
||||
time += "</script>"
|
||||
|
||||
HNSaddress = getAddress("HNS")
|
||||
@@ -385,9 +326,9 @@ def index():
|
||||
repo_description=repo_description,
|
||||
custom=custom,
|
||||
sites=SITES,
|
||||
projects=PROJECTS,
|
||||
projects=projects,
|
||||
time=time,
|
||||
message=NC_CONFIG.get("message", ""),
|
||||
message=nc_config.get("message", ""),
|
||||
),
|
||||
200,
|
||||
{"Content-Type": "text/html"},
|
||||
@@ -409,31 +350,25 @@ def donate():
|
||||
coinList = [file for file in coinList if file[0] != "."]
|
||||
coinList.sort()
|
||||
|
||||
tokenList = []
|
||||
|
||||
with open(".well-known/wallets/.tokens") as file:
|
||||
tokenList = file.read()
|
||||
tokenList = json.loads(tokenList)
|
||||
|
||||
coinNames = {}
|
||||
with open(".well-known/wallets/.coins") as file:
|
||||
coinNames = file.read()
|
||||
coinNames = json.loads(coinNames)
|
||||
tokenList = get_wallet_tokens()
|
||||
coinNames = get_coin_names()
|
||||
|
||||
coins = ""
|
||||
default_coins = ["btc", "eth", "hns", "sol", "xrp", "ada", "dot"]
|
||||
|
||||
for file in coinList:
|
||||
if file in coinNames:
|
||||
coins += f'<a class="dropdown-item" style="{"display:none;" if file.lower() not in default_coins else ""}" href="?c={file.lower()}">{coinNames[file]}</a>'
|
||||
else:
|
||||
coins += f'<a class="dropdown-item" style="{"display:none;" if file.lower() not in default_coins else ""}" href="?c={file.lower()}">{file}</a>'
|
||||
coin_name = coinNames.get(file, file)
|
||||
display_style = "" if file.lower() in default_coins else "display:none;"
|
||||
coins += f'<a class="dropdown-item" style="{display_style}" href="?c={file.lower()}">{coin_name}</a>'
|
||||
|
||||
for token in tokenList:
|
||||
if token["chain"] != "null":
|
||||
coins += f'<a class="dropdown-item" style="display:none;" href="?t={token["symbol"].lower()}&c={token["chain"].lower()}">{token["name"]} ({token["symbol"] + " on " if token["symbol"] != token["name"] else ""}{token["chain"]})</a>'
|
||||
else:
|
||||
coins += f'<a class="dropdown-item" style="display:none;" href="?t={token["symbol"].lower()}&c={token["chain"].lower()}">{token["name"]} ({token["symbol"] if token["symbol"] != token["name"] else ""})</a>'
|
||||
chain_display = f" on {token['chain']}" if token["chain"] != "null" else ""
|
||||
symbol_display = (
|
||||
f" ({token['symbol']}{chain_display})"
|
||||
if token["symbol"] != token["name"]
|
||||
else chain_display
|
||||
)
|
||||
coins += f'<a class="dropdown-item" style="display:none;" href="?t={token["symbol"].lower()}&c={token["chain"].lower()}">{token["name"]}{symbol_display}</a>'
|
||||
|
||||
crypto = request.args.get("c")
|
||||
if not crypto:
|
||||
@@ -460,7 +395,6 @@ def donate():
|
||||
token = {"name": "Unknown token", "symbol": token, "chain": crypto}
|
||||
|
||||
address = ""
|
||||
domain = ""
|
||||
cryptoHTML = ""
|
||||
|
||||
proof = ""
|
||||
@@ -470,10 +404,16 @@ def donate():
|
||||
if os.path.isfile(f".well-known/wallets/{crypto}"):
|
||||
with open(f".well-known/wallets/{crypto}") as file:
|
||||
address = file.read()
|
||||
coin_display = coinNames.get(crypto, crypto)
|
||||
if not token:
|
||||
cryptoHTML += f"<br>Donate with {coinNames[crypto] if crypto in coinNames else crypto}:"
|
||||
cryptoHTML += f"<br>Donate with {coin_display}:"
|
||||
else:
|
||||
cryptoHTML += f"<br>Donate with {token['name']} {'(' + token['symbol'] + ') ' if token['symbol'] != token['name'] else ''}on {crypto}:"
|
||||
token_symbol = (
|
||||
f" ({token['symbol']})" if token["symbol"] != token["name"] else ""
|
||||
)
|
||||
cryptoHTML += (
|
||||
f"<br>Donate with {token['name']}{token_symbol} on {crypto}:"
|
||||
)
|
||||
cryptoHTML += f'<br><code data-bs-toggle="tooltip" data-bss-tooltip="" id="crypto-address" class="address" style="color: rgb(242,90,5);display: inline-block;" data-bs-original-title="Click to copy">{address}</code>'
|
||||
|
||||
if proof:
|
||||
@@ -481,7 +421,13 @@ def donate():
|
||||
elif token:
|
||||
if "address" in token:
|
||||
address = token["address"]
|
||||
cryptoHTML += f"<br>Donate with {token['name']} {'(' + token['symbol'] + ')' if token['symbol'] != token['name'] else ''}{' on ' + crypto if crypto != 'NULL' else ''}:"
|
||||
token_symbol = (
|
||||
f" ({token['symbol']})" if token["symbol"] != token["name"] else ""
|
||||
)
|
||||
chain_display = f" on {crypto}" if crypto != "NULL" else ""
|
||||
cryptoHTML += (
|
||||
f"<br>Donate with {token['name']}{token_symbol}{chain_display}:"
|
||||
)
|
||||
cryptoHTML += f'<br><code data-bs-toggle="tooltip" data-bss-tooltip="" id="crypto-address" class="address" style="color: rgb(242,90,5);display: inline-block;" data-bs-original-title="Click to copy">{address}</code>'
|
||||
if proof:
|
||||
cryptoHTML += proof
|
||||
@@ -490,16 +436,12 @@ def donate():
|
||||
else:
|
||||
cryptoHTML += f"<br>Invalid chain: {crypto}<br>"
|
||||
|
||||
if os.path.isfile(".well-known/wallets/.domains"):
|
||||
# Get json of all domains
|
||||
with open(".well-known/wallets/.domains") as file:
|
||||
domains = file.read()
|
||||
domains = json.loads(domains)
|
||||
|
||||
domains = get_wallet_domains()
|
||||
if crypto in domains:
|
||||
domain = domains[crypto]
|
||||
cryptoHTML += "<br>Or send to this domain on compatible wallets:<br>"
|
||||
cryptoHTML += f'<code data-bs-toggle="tooltip" data-bss-tooltip="" id="crypto-domain" class="address" style="color: rgb(242,90,5);display: block;" data-bs-original-title="Click to copy">{domain}</code>'
|
||||
|
||||
if address:
|
||||
cryptoHTML += (
|
||||
'<br><img src="/address/'
|
||||
|
||||
54
tools.py
54
tools.py
@@ -1,6 +1,6 @@
|
||||
from flask import Request, render_template, jsonify, make_response
|
||||
import os
|
||||
from functools import lru_cache as cache
|
||||
from functools import lru_cache
|
||||
import datetime
|
||||
from typing import Optional, Dict, Union, Tuple
|
||||
import re
|
||||
@@ -24,17 +24,10 @@ CRAWLERS = [
|
||||
"Exabot",
|
||||
"facebot",
|
||||
"ia_archiver",
|
||||
"Twitterbot"
|
||||
"Twitterbot",
|
||||
]
|
||||
|
||||
CLI_AGENTS = [
|
||||
"curl",
|
||||
"hurl",
|
||||
"xh",
|
||||
"Posting",
|
||||
"HTTPie",
|
||||
"nushell"
|
||||
]
|
||||
CLI_AGENTS = ["curl", "hurl", "xh", "Posting", "HTTPie", "nushell"]
|
||||
|
||||
|
||||
def getClientIP(request: Request) -> str:
|
||||
@@ -56,7 +49,8 @@ def getClientIP(request: Request) -> str:
|
||||
ip = "unknown"
|
||||
return ip
|
||||
|
||||
@cache
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def getGitCommit() -> str:
|
||||
"""
|
||||
Get the current git commit hash.
|
||||
@@ -115,7 +109,8 @@ def isCrawler(request: Request) -> bool:
|
||||
return any(crawler in user_agent for crawler in CRAWLERS)
|
||||
return False
|
||||
|
||||
@cache
|
||||
|
||||
@lru_cache(maxsize=128)
|
||||
def isDev(host: str) -> bool:
|
||||
"""
|
||||
Check if the host indicates a development environment.
|
||||
@@ -135,7 +130,8 @@ def isDev(host: str) -> bool:
|
||||
return True
|
||||
return False
|
||||
|
||||
@cache
|
||||
|
||||
@lru_cache(maxsize=128)
|
||||
def getHandshakeScript(host: str) -> str:
|
||||
"""
|
||||
Get the handshake script HTML snippet.
|
||||
@@ -150,7 +146,8 @@ def getHandshakeScript(host: str) -> str:
|
||||
return ""
|
||||
return '<script src="https://nathan.woodburn/handshake.js" domain="nathan.woodburn" async></script><script src="https://nathan.woodburn/https.js" async></script>'
|
||||
|
||||
@cache
|
||||
|
||||
@lru_cache(maxsize=64)
|
||||
def getAddress(coin: str) -> str:
|
||||
"""
|
||||
Get the wallet address for a cryptocurrency.
|
||||
@@ -169,7 +166,7 @@ def getAddress(coin: str) -> str:
|
||||
return address
|
||||
|
||||
|
||||
@cache
|
||||
@lru_cache(maxsize=256)
|
||||
def getFilePath(name: str, path: str) -> Optional[str]:
|
||||
"""
|
||||
Find a file in a directory tree.
|
||||
@@ -187,7 +184,9 @@ def getFilePath(name: str, path: str) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def json_response(request: Request, message: Union[str, Dict] = "404 Not Found", code: int = 404):
|
||||
def json_response(
|
||||
request: Request, message: Union[str, Dict] = "404 Not Found", code: int = 404
|
||||
):
|
||||
"""
|
||||
Create a JSON response with standard formatting.
|
||||
|
||||
@@ -205,17 +204,20 @@ def json_response(request: Request, message: Union[str, Dict] = "404 Not Found",
|
||||
message["ip"] = getClientIP(request)
|
||||
return jsonify(message), code
|
||||
|
||||
return jsonify({
|
||||
return jsonify(
|
||||
{
|
||||
"status": code,
|
||||
"message": message,
|
||||
"ip": getClientIP(request),
|
||||
}), code
|
||||
}
|
||||
), code
|
||||
|
||||
|
||||
def error_response(
|
||||
request: Request,
|
||||
message: str = "404 Not Found",
|
||||
code: int = 404,
|
||||
force_json: bool = False
|
||||
force_json: bool = False,
|
||||
) -> Union[Tuple[Dict, int], object]:
|
||||
"""
|
||||
Create an error response in JSON or HTML format.
|
||||
@@ -233,10 +235,12 @@ def error_response(
|
||||
return json_response(request, message, code)
|
||||
|
||||
# Check if <error code>.html exists in templates
|
||||
template_name = f"{code}.html" if os.path.isfile(
|
||||
f"templates/{code}.html") else "404.html"
|
||||
response = make_response(render_template(
|
||||
template_name, code=code, message=message), code)
|
||||
template_name = (
|
||||
f"{code}.html" if os.path.isfile(f"templates/{code}.html") else "404.html"
|
||||
)
|
||||
response = make_response(
|
||||
render_template(template_name, code=code, message=message), code
|
||||
)
|
||||
|
||||
# Add message to response headers
|
||||
response.headers["X-Error-Message"] = message
|
||||
@@ -260,8 +264,7 @@ def parse_date(date_groups: list[str]) -> str | None:
|
||||
date_str = " ".join(date_groups).strip()
|
||||
|
||||
# Remove ordinal suffixes
|
||||
date_str = re.sub(r'(\d+)(st|nd|rd|th)', r'\1',
|
||||
date_str, flags=re.IGNORECASE)
|
||||
date_str = re.sub(r"(\d+)(st|nd|rd|th)", r"\1", date_str, flags=re.IGNORECASE)
|
||||
|
||||
# Parse with dateutil, default day=1 if missing
|
||||
dt = parse(date_str, default=datetime.datetime(1900, 1, 1))
|
||||
@@ -275,6 +278,7 @@ def parse_date(date_groups: list[str]) -> str | None:
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def get_tools_data():
|
||||
with open("data/tools.json", "r") as f:
|
||||
return json.load(f)
|
||||
|
||||
Reference in New Issue
Block a user