generated from nathanwoodburn/python-webserver-template
feat: Add initial code drop
All checks were successful
Build Docker / BuildImage (push) Successful in 1m5s
All checks were successful
Build Docker / BuildImage (push) Successful in 1m5s
This commit is contained in:
134
server.py
134
server.py
@@ -9,16 +9,43 @@ from flask import (
|
||||
render_template,
|
||||
send_from_directory,
|
||||
send_file,
|
||||
session,
|
||||
url_for,
|
||||
)
|
||||
import os
|
||||
import json
|
||||
import requests
|
||||
from datetime import datetime
|
||||
import dotenv
|
||||
import markdown
|
||||
import markdown.extensions.fenced_code
|
||||
from flask_session import Session
|
||||
from yubico_client import Yubico
|
||||
from functools import wraps
|
||||
import hmac
|
||||
import hashlib
|
||||
|
||||
dotenv.load_dotenv()
|
||||
|
||||
app = Flask(__name__)
|
||||
app.config["SECRET_KEY"] = os.getenv("SECRET_KEY", os.urandom(24))
|
||||
app.config["SESSION_TYPE"] = "filesystem"
|
||||
app.config["SESSION_PERMANENT"] = False
|
||||
Session(app)
|
||||
|
||||
# Yubikey settings
|
||||
YUBICO_CLIENT_ID = os.getenv("YUBICO_CLIENT_ID")
|
||||
YUBICO_SECRET_KEY = os.getenv("YUBICO_SECRET_KEY")
|
||||
YUBIKEY_ID = os.getenv("YUBIKEY_ID") # The first 12 characters of your YubiKey OTP
|
||||
|
||||
# Authentication function
|
||||
def login_required(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
if not session.get('authenticated'):
|
||||
return redirect(url_for('login', next=request.url))
|
||||
return f(*args, **kwargs)
|
||||
return decorated_function
|
||||
|
||||
|
||||
def find(name, path):
|
||||
@@ -74,8 +101,65 @@ def wellknown(path):
|
||||
# region Main routes
|
||||
@app.route("/")
|
||||
def index():
|
||||
return render_template("index.html")
|
||||
return render_template("index.html", authenticated=session.get('authenticated', False))
|
||||
|
||||
# Login and authentication routes
|
||||
@app.route("/login", methods=["GET", "POST"])
|
||||
def login():
|
||||
error = None
|
||||
if request.method == "POST":
|
||||
otp = request.form.get("otp", "")
|
||||
|
||||
# Verify the first 12 characters of the OTP match the expected YubiKey ID
|
||||
if not otp or len(otp) < 12 or otp[:12] != YUBIKEY_ID:
|
||||
error = "Invalid YubiKey OTP"
|
||||
else:
|
||||
try:
|
||||
# Initialize Yubico client
|
||||
client = Yubico(YUBICO_CLIENT_ID, YUBICO_SECRET_KEY)
|
||||
|
||||
# Verify the OTP with Yubico servers
|
||||
verification = client.verify(otp)
|
||||
if verification:
|
||||
session['authenticated'] = True
|
||||
next_url = request.args.get('next', url_for('emergency'))
|
||||
return redirect(next_url)
|
||||
else:
|
||||
error = "YubiKey authentication failed"
|
||||
except Exception as e:
|
||||
error = f"Authentication error: {str(e)}"
|
||||
|
||||
return render_template("login.html", error=error)
|
||||
|
||||
@app.route("/logout")
|
||||
def logout():
|
||||
session.pop('authenticated', None)
|
||||
return redirect(url_for('index'))
|
||||
|
||||
@app.route("/emergency")
|
||||
@login_required
|
||||
def emergency():
|
||||
# Check if emergency.md exists
|
||||
emergency_path = os.path.join(os.path.dirname(__file__), "emergency.md")
|
||||
|
||||
if os.path.exists(emergency_path):
|
||||
with open(emergency_path, 'r') as f:
|
||||
emergency_content = f.read()
|
||||
# Convert markdown to HTML with enhanced extensions
|
||||
emergency_html = markdown.markdown(
|
||||
emergency_content,
|
||||
extensions=[
|
||||
'fenced_code',
|
||||
'codehilite',
|
||||
'tables',
|
||||
'nl2br',
|
||||
'sane_lists' # Ensures proper list rendering
|
||||
]
|
||||
)
|
||||
return render_template("emergency.html", content=emergency_html)
|
||||
else:
|
||||
return render_template("emergency.html",
|
||||
content="<p>No emergency information available.</p>")
|
||||
|
||||
@app.route("/<path:path>")
|
||||
def catch_all(path: str):
|
||||
@@ -102,6 +186,54 @@ def catch_all(path: str):
|
||||
# endregion
|
||||
|
||||
|
||||
# region Webhook
|
||||
@app.route("/webhook/update", methods=["POST"])
|
||||
def webhook_update():
|
||||
# Get the webhook secret from environment
|
||||
webhook_secret = os.getenv("WEBHOOK_SECRET")
|
||||
if not webhook_secret:
|
||||
return jsonify({"error": "Webhook not configured"}), 500
|
||||
|
||||
# Verify X-Webhook-Signature header
|
||||
signature = request.headers.get("X-Webhook-Signature")
|
||||
if not signature:
|
||||
return jsonify({"error": "Missing signature"}), 401
|
||||
|
||||
# Get request body
|
||||
payload = request.get_data()
|
||||
|
||||
# Verify signature
|
||||
expected_signature = hmac.new(
|
||||
webhook_secret.encode(),
|
||||
payload,
|
||||
hashlib.sha256
|
||||
).hexdigest()
|
||||
|
||||
if not hmac.compare_digest(signature, expected_signature):
|
||||
return jsonify({"error": "Invalid signature"}), 401
|
||||
|
||||
# Process the update
|
||||
try:
|
||||
data = request.json
|
||||
if not data or not isinstance(data, dict) or "content" not in data:
|
||||
return jsonify({"error": "Invalid payload format"}), 400
|
||||
|
||||
emergency_content = data["content"]
|
||||
emergency_path = os.path.join(os.path.dirname(__file__), "emergency.md")
|
||||
|
||||
# Write the new content to the file
|
||||
with open(emergency_path, "w") as f:
|
||||
f.write(emergency_content)
|
||||
|
||||
return jsonify({"success": True, "message": "Emergency content updated"}), 200
|
||||
|
||||
except Exception as e:
|
||||
app.logger.error(f"Webhook error: {str(e)}")
|
||||
return jsonify({"error": f"Update failed: {str(e)}"}), 500
|
||||
|
||||
# endregion
|
||||
|
||||
|
||||
# region Error Catching
|
||||
# 404 catch all
|
||||
@app.errorhandler(404)
|
||||
|
||||
Reference in New Issue
Block a user