hns-login/website/routes.py

250 lines
7.6 KiB
Python
Raw Normal View History

2024-02-09 23:04:20 +11:00
import time
from .varo_auth import flask_login as varo_auth_flask_login
from flask import Blueprint, request, session, url_for
from flask import render_template, redirect, jsonify, send_from_directory
from werkzeug.security import gen_salt
from authlib.integrations.flask_oauth2 import current_token
from authlib.oauth2 import OAuth2Error
from .models import db, User, OAuth2Client
from .oauth2 import authorization, require_oauth
2024-06-13 16:53:25 +10:00
import os
import requests
import dns.message
import dns.query
import dns.rdatatype
from requests_doh import DNSOverHTTPSSession, add_dns_provider
2024-02-09 23:04:20 +11:00
bp = Blueprint("home", __name__)
2024-02-09 23:04:20 +11:00
2024-06-13 16:53:25 +10:00
if not os.path.exists("website/avatars"):
os.makedirs("website/avatars")
2024-02-09 23:04:20 +11:00
def current_user():
if "id" in session:
uid = session["id"]
2024-02-09 23:04:20 +11:00
return User.query.get(uid)
return None
def split_by_crlf(s):
return [v for v in s.splitlines() if v]
@bp.route("/", methods=("GET", "POST"))
2024-02-09 23:04:20 +11:00
def home():
next_page = request.args.get("next")
if request.method == "POST":
2024-02-09 23:04:20 +11:00
auth = varo_auth_flask_login(request)
if auth == False:
return redirect("/?error=login_failed")
2024-02-09 23:04:20 +11:00
print(auth)
user = User.query.filter_by(username=auth).first()
if not user:
user = User(username=auth)
db.session.add(user)
db.session.commit()
session["id"] = user.id
2024-02-09 23:04:20 +11:00
# if user is not just to log in, but need to head back to the auth page, then go for it
if next_page:
return redirect(next_page)
return redirect("/")
2024-02-09 23:04:20 +11:00
user = current_user()
if user:
clients = OAuth2Client.query.filter_by(user_id=user.id).all()
if next_page:
return redirect(next_page)
else:
clients = []
return render_template("home.html", user=user, clients=clients)
2024-02-09 23:04:20 +11:00
@bp.route("/logout")
2024-02-09 23:04:20 +11:00
def logout():
del session["id"]
next = request.args.get("next")
2024-02-09 23:04:20 +11:00
if next:
return redirect(url_for("home.home", next=next))
2024-02-09 23:04:20 +11:00
return redirect("/")
2024-02-09 23:04:20 +11:00
@bp.route("/create_client", methods=("GET", "POST"))
2024-02-09 23:04:20 +11:00
def create_client():
user = current_user()
if not user:
return redirect("/")
if request.method == "GET":
return render_template("create_client.html")
2024-02-09 23:04:20 +11:00
client_id = gen_salt(24)
client_id_issued_at = int(time.time())
client = OAuth2Client(
client_id=client_id,
client_id_issued_at=client_id_issued_at,
user_id=user.id,
)
form = request.form
client_metadata = {
"client_name": form["client_name"],
"client_uri": form["client_uri"],
"grant_types": split_by_crlf(form["grant_type"]),
"redirect_uris": split_by_crlf(form["redirect_uri"]),
"response_types": split_by_crlf(form["response_type"]),
"scope": form["scope"],
"token_endpoint_auth_method": form["token_endpoint_auth_method"],
2024-02-09 23:04:20 +11:00
}
client.set_client_metadata(client_metadata)
if form["token_endpoint_auth_method"] == "none":
client.client_secret = ""
2024-02-09 23:04:20 +11:00
else:
client.client_secret = gen_salt(48)
db.session.add(client)
db.session.commit()
return redirect("/")
2024-02-09 23:04:20 +11:00
@bp.route("/delete_client")
2024-02-13 22:46:46 +11:00
def delete_client():
user = current_user()
if not user:
return redirect("/")
2024-02-13 22:46:46 +11:00
if user.id != 1:
return redirect("/")
2024-02-13 22:46:46 +11:00
client_id = request.args.get("client_id")
2024-02-13 22:46:46 +11:00
client = OAuth2Client.query.filter_by(client_id=client_id).first()
if client:
db.session.delete(client)
db.session.commit()
return redirect("/")
2024-02-13 22:46:46 +11:00
2024-02-09 23:04:20 +11:00
@bp.route("/oauth/authorize", methods=["GET", "POST"])
2024-02-09 23:04:20 +11:00
def authorize():
user = current_user()
# if user log status is not true (Auth server), then to log it in
if not user:
return redirect(url_for("home.home", next=request.url))
if request.method == "GET":
2024-02-09 23:04:20 +11:00
try:
grant = authorization.get_consent_grant(end_user=user)
except OAuth2Error as error:
return error.error
return render_template("authorize.html", user=user, grant=grant)
2024-02-09 23:04:20 +11:00
grant_user = user
2024-02-09 23:04:20 +11:00
return authorization.create_authorization_response(grant_user=grant_user)
@bp.route("/oauth/token", methods=["POST"])
2024-02-09 23:04:20 +11:00
def issue_token():
return authorization.create_token_response()
@bp.route("/oauth/revoke", methods=["POST"])
2024-02-09 23:04:20 +11:00
def revoke_token():
return authorization.create_endpoint_response("revocation")
2024-02-09 23:04:20 +11:00
@bp.route("/api/me")
@require_oauth(["profile", "openid"])
2024-02-09 23:04:20 +11:00
def api_me():
user = current_token.user
userInfo = {
"id": user.id,
2024-06-13 16:13:27 +10:00
"uid": user.id,
"username": user.username,
"email": f"{user.username}@login.hns.au",
"displayName": user.username + "/",
"sub": user.id,
"name": user.username,
"given_name": user.username,
"family_name": user.username,
"nickname": user.username,
"preferred_username": user.username,
"profile": f"https://login.hns.au/u/{user.username}",
2024-06-13 16:53:25 +10:00
"picture": f"https://login.hns.au/u/{user.username}/avatar.png",
2024-06-13 16:13:27 +10:00
"website": f"https://{user.username}",
"email_verified": True
}
return jsonify(userInfo)
@bp.route("/discovery")
2024-02-20 22:32:24 +11:00
def autodiscovery():
host = request.host
discovery = {
"issuer": f"https://{host}/",
"authorization_endpoint": f"https://{host}/oauth/authorize",
"token_endpoint": f"https://{host}/oauth/token",
"userinfo_endpoint": f"https://{host}/api/me",
"revocation_endpoint": f"https://{host}/oauth/revoke",
"response_types_supported": ["code"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"scopes_supported": ["openid", "email", "profile"],
"token_endpoint_auth_methods_supported": [
"client_secret_basic",
"client_secret_post",
],
"grant_types_supported": ["authorization_code"],
}
2024-02-20 22:32:24 +11:00
return jsonify(discovery)
2024-06-13 16:53:25 +10:00
@bp.route("/u/<username>")
def profile(username):
user = User.query.filter_by(username=username).first()
return jsonify({"name": user.username, "id": user.id})
@bp.route("/u/<username>/avatar.png")
def avatar(username):
# Check if file exists
if os.path.exists(f"website/avatars/{username}.png"):
return send_from_directory("avatars", f"{username}.png", mimetype="image/png")
# If not, download from HNS info
query = dns.message.make_query(username, dns.rdatatype.TXT)
dns_request = query.to_wire()
# Send the DNS query over HTTPS
response = requests.post('https://hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
# Parse the DNS response
dns_response = dns.message.from_wire(response.content)
# Loop over TXT records and look for profile avatar
avatar_url=""
for record in dns_response.answer:
if record.rdtype == dns.rdatatype.TXT:
for txt in record:
txt_value = txt.to_text().strip('"')
if txt_value.startswith("profile avatar="):
avatar_url = txt_value.split("profile avatar=")[1]
break
if avatar_url != "":
# Download the avatar using DNS-over-HTTPS
add_dns_provider("hns", "https://hnsdoh.com/dns-query")
session = DNSOverHTTPSSession(provider="hns")
response = session.get(avatar_url)
with open(f"website/avatars/{username}.png", "wb") as f:
f.write(response.content)
return send_from_directory("avatars", f"{username}.png", mimetype="image/png")
return send_from_directory("templates", "favicon.png", mimetype="image/png")
2024-02-09 23:04:20 +11:00
@bp.route("/favicon.png")
2024-02-09 23:04:20 +11:00
def favicon():
return send_from_directory("templates", "favicon.png", mimetype="image/png")