generated from nathanwoodburn/python-webserver-template
feat: Add webserver routes and account logic
All checks were successful
Build Docker / BuildImage (push) Successful in 2m7s
All checks were successful
Build Docker / BuildImage (push) Successful in 2m7s
This commit is contained in:
113
server.py
113
server.py
@@ -18,6 +18,7 @@ import dotenv
|
||||
import threading
|
||||
import time
|
||||
import domains
|
||||
from alerts import NOTIFICATION_TYPES
|
||||
|
||||
dotenv.load_dotenv()
|
||||
|
||||
@@ -94,6 +95,118 @@ def index():
|
||||
return render_template("index.html")
|
||||
|
||||
|
||||
@app.route("/account")
|
||||
def account():
|
||||
# Check if the user is logged in
|
||||
# Check for token cookie
|
||||
token = request.cookies.get("token")
|
||||
if not token:
|
||||
return redirect(f"https://login.hns.au/auth?return={request.host_url}login")
|
||||
|
||||
user_data = requests.get(f"https://login.hns.au/auth/user?token={token}")
|
||||
if user_data.status_code != 200:
|
||||
return redirect(f"https://login.hns.au/auth?return={request.host_url}login")
|
||||
user_data = user_data.json()
|
||||
|
||||
notifications = domains.get_account_notifications(user_data["username"])
|
||||
if not notifications:
|
||||
return render_template("account.html", user=user_data, domains=[], NOTIFICATION_TYPES=NOTIFICATION_TYPES)
|
||||
|
||||
|
||||
return render_template("account.html", user=user_data, notifications=notifications, NOTIFICATION_TYPES=NOTIFICATION_TYPES)
|
||||
|
||||
@app.route("/login")
|
||||
def login():
|
||||
# Check if token parameter is present
|
||||
token = request.args.get("token")
|
||||
if not token:
|
||||
return redirect(f"https://login.hns.au/auth?return={request.host_url}login")
|
||||
# Set token cookie
|
||||
response = make_response(redirect(f"{request.host_url}account"))
|
||||
response.set_cookie("token", token, httponly=True, secure=True)
|
||||
return response
|
||||
|
||||
@app.route("/logout")
|
||||
def logout():
|
||||
# Clear the token cookie
|
||||
response = make_response(redirect(f"{request.host_url}"))
|
||||
response.set_cookie("token", "", expires=0, httponly=True, secure=True)
|
||||
return response
|
||||
|
||||
|
||||
@app.route("/notification/<notificationtype>", methods=["POST"])
|
||||
def addNotification(notificationtype: str):
|
||||
"""
|
||||
Add a notification for a domain.
|
||||
"""
|
||||
|
||||
token = request.cookies.get("token")
|
||||
if not token:
|
||||
return redirect(f"https://login.hns.au/auth?return={request.host_url}login")
|
||||
|
||||
user_data = requests.get(f"https://login.hns.au/auth/user?token={token}")
|
||||
if user_data.status_code != 200:
|
||||
return redirect(f"https://login.hns.au/auth?return={request.host_url}login")
|
||||
user_data = user_data.json()
|
||||
username = user_data.get("username", None)
|
||||
if not username:
|
||||
return jsonify({"error": "Invalid user data"}), 400
|
||||
|
||||
notificationType = None
|
||||
for notification in NOTIFICATION_TYPES:
|
||||
if notification['type'] == notificationtype:
|
||||
notificationType = notification
|
||||
break
|
||||
else:
|
||||
return jsonify({"error": "Invalid notification type"}), 400
|
||||
|
||||
# Get form data
|
||||
data = request.form.to_dict()
|
||||
if not data or 'domain' not in data or 'blocks' not in data:
|
||||
return jsonify({"error": "Invalid request data"}), 400
|
||||
|
||||
domain = data['domain']
|
||||
|
||||
for field in notificationType['fields']:
|
||||
if field['name'] not in data and field.get('required', False):
|
||||
return jsonify({"error": f"Missing field: {field['name']}"}), 400
|
||||
|
||||
|
||||
notification = data
|
||||
|
||||
notification['type'] = notificationtype
|
||||
notification['id'] = os.urandom(16).hex() # Generate a random ID for the notification
|
||||
notification['user_name'] = username
|
||||
# Delete domain duplicate from data
|
||||
notification.pop('domain', None)
|
||||
|
||||
domains.add_notification(domain, notification)
|
||||
return redirect(f"{request.host_url}account")
|
||||
|
||||
@app.route("/notification/delete/<notification_id>")
|
||||
def delete_notification(notification_id: str):
|
||||
"""
|
||||
Delete a notification by its ID.
|
||||
"""
|
||||
token = request.cookies.get("token")
|
||||
if not token:
|
||||
return redirect(f"https://login.hns.au/auth?return={request.host_url}login")
|
||||
|
||||
user_data = requests.get(f"https://login.hns.au/auth/user?token={token}")
|
||||
if user_data.status_code != 200:
|
||||
return redirect(f"https://login.hns.au/auth?return={request.host_url}login")
|
||||
user_data = user_data.json()
|
||||
|
||||
domains.delete_notification(notification_id, user_data['username'])
|
||||
return redirect(f"{request.host_url}account")
|
||||
|
||||
|
||||
@app.route("/account/<domain>")
|
||||
def account_domain(domain: str):
|
||||
# TODO - Implement account domain logic
|
||||
return redirect(f"/account")
|
||||
|
||||
|
||||
@app.route("/<path:path>")
|
||||
def catch_all(path: str):
|
||||
if os.path.isfile("templates/" + path):
|
||||
|
||||
Reference in New Issue
Block a user