Nathan Woodburn
79621f2d0f
All checks were successful
Build Docker / Build Docker (push) Successful in 29s
655 lines
22 KiB
Python
655 lines
22 KiB
Python
import time
|
|
import datetime as dt
|
|
from .varo_auth import flask_login as varo_auth_flask_login
|
|
from flask import Blueprint, request, session, url_for, make_response
|
|
from flask import render_template, redirect, jsonify, send_from_directory
|
|
from werkzeug.security import gen_salt
|
|
from authlib.integrations.flask_oauth2 import current_token
|
|
from authlib.oauth2 import OAuth2Error
|
|
from .models import db, User, OAuth2Client, AuthTokens
|
|
from .oauth2 import authorization, require_oauth
|
|
import os
|
|
import requests
|
|
import dns.message
|
|
import dns.query
|
|
import dns.rdatatype
|
|
from requests_doh import DNSOverHTTPSSession, add_dns_provider
|
|
from datetime import timedelta
|
|
from eth_account.messages import encode_defunct
|
|
from eth_account import Account
|
|
import json
|
|
import urllib.parse
|
|
|
|
|
|
|
|
bp = Blueprint("home", __name__)
|
|
openSeaAPIKey = os.getenv("OPENSEA_API_KEY")
|
|
|
|
restricted_keys = [
|
|
"id",
|
|
"uid",
|
|
"username",
|
|
"sub",
|
|
"email_verified",
|
|
"roles",
|
|
"role",
|
|
"admin",
|
|
"hemail"
|
|
]
|
|
|
|
if not os.path.exists("website/avatars"):
|
|
os.makedirs("website/avatars")
|
|
|
|
def current_user():
|
|
if "id" in session:
|
|
uid = session["id"]
|
|
return User.query.get(uid)
|
|
return None
|
|
|
|
def urlParse(url):
|
|
if url.startswith("https://"):
|
|
return url
|
|
if 'localhost' in url or '127.0.0.1' in url:
|
|
return url
|
|
return url.replace('http://','https://')
|
|
|
|
def split_by_crlf(s):
|
|
return [v for v in s.splitlines() if v]
|
|
|
|
def get_idns_records(domain:str) -> list:
|
|
query = dns.message.make_query(domain, dns.rdatatype.TXT)
|
|
dns_request = query.to_wire()
|
|
# Send the DNS query over HTTPS
|
|
response = requests.post('https://hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
|
|
# Parse the DNS response
|
|
dns_response = dns.message.from_wire(response.content)
|
|
# Loop over TXT records and look for profile
|
|
idns_records = []
|
|
for record in dns_response.answer:
|
|
if record.rdtype == dns.rdatatype.TXT:
|
|
for txt in record:
|
|
txt_value:str = txt.to_text().strip('"')
|
|
if txt_value.startswith("IDNS1"):
|
|
print(txt_value)
|
|
idns = txt_value.removeprefix("IDNS1 ")
|
|
idns = idns.split(" ")
|
|
for r in idns:
|
|
idns_records.append(r)
|
|
return idns_records
|
|
|
|
def get_user_info(user:User) -> dict:
|
|
picture = f"https://login.hns.au/u/{user.username}/avatar.png"
|
|
|
|
if user.profile_picture:
|
|
picture = user.profile_picture
|
|
|
|
userInfo = {
|
|
"id": user.id,
|
|
"uid": user.id,
|
|
"username": user.username,
|
|
"email": f"{user.username}@login.hns.au",
|
|
"hemail": f"{user.username}@login.hns.au",
|
|
"displayName": user.username + "/",
|
|
"sub": user.id,
|
|
"name": user.username,
|
|
"given_name": user.username,
|
|
"family_name": user.username,
|
|
"nickname": user.username,
|
|
"preferred_username": user.username,
|
|
"profile": f"https://login.hns.au/u/{user.username}",
|
|
"picture": picture,
|
|
"website": f"https://{user.username}",
|
|
"email_verified": True
|
|
}
|
|
|
|
idns_info = get_idns_records(user.username)
|
|
for record in idns_info:
|
|
type = record.split(":")[0]
|
|
content = record.split(":")
|
|
if len(content) > 2:
|
|
content = ":".join(content[1:])
|
|
else:
|
|
content = content[1]
|
|
key = content.split("=")[0].lower()
|
|
value = content.split("=")[1]
|
|
if type == "profile":
|
|
if key == "avatar":
|
|
userInfo["picture"] = value
|
|
elif key == "email":
|
|
userInfo["email"] = value
|
|
userInfo["email_verified"] = False
|
|
elif key == "name":
|
|
userInfo["name"] = value
|
|
userInfo["nickname"] = value
|
|
userInfo["displayName"] = value
|
|
else:
|
|
if key in restricted_keys:
|
|
continue
|
|
userInfo[key] = value
|
|
|
|
return userInfo
|
|
|
|
@bp.route("/", methods=("GET", "POST"))
|
|
def home():
|
|
next_page = request.args.get("next")
|
|
|
|
# Check if session exists
|
|
if "uuid" not in session:
|
|
session["uuid"] = gen_salt(24)
|
|
session.permanent = True
|
|
|
|
uuid = session["uuid"]
|
|
|
|
if request.method == "POST":
|
|
auth = varo_auth_flask_login(request)
|
|
if auth == False:
|
|
return redirect("/?error=login_failed")
|
|
print(auth)
|
|
user = User.query.filter_by(username=auth).first()
|
|
if not user:
|
|
user = User(username=auth)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
session["id"] = user.id
|
|
# Make sure the session is permanent
|
|
session.permanent = True
|
|
# if user is not just to log in, but need to head back to the auth page, then go for it
|
|
if next_page:
|
|
return redirect(next_page)
|
|
return redirect("/")
|
|
user = current_user()
|
|
users = []
|
|
if user:
|
|
clients = OAuth2Client.query.filter_by(user_id=user.id).all()
|
|
|
|
if user.id == 1:
|
|
clients = OAuth2Client.query.all()
|
|
users = User.query.all()
|
|
users = [{"id": user.id, "username": user.username} for user in users]
|
|
|
|
if next_page:
|
|
return redirect(next_page)
|
|
else:
|
|
clients = []
|
|
|
|
# Check if the user has signed in with HNS ID
|
|
hnsid=''
|
|
address=''
|
|
if "address" in session:
|
|
address = session["address"]
|
|
openseaInfo = requests.get(f"https://api.opensea.io/api/v2/chain/optimism/account/{address}/nfts?collection=handshake-slds",
|
|
headers={"Accept": "application/json",
|
|
"x-api-key":openSeaAPIKey})
|
|
if openseaInfo.status_code == 200:
|
|
hnsid = openseaInfo.json()
|
|
|
|
domains = []
|
|
if 'domains' in session:
|
|
domains = session['domains']
|
|
|
|
if next_page:
|
|
# find next in url
|
|
next_page = request.url.split("next=")[1]
|
|
|
|
return render_template("home.html", user=user, clients=clients,
|
|
address=address, hnsid=hnsid, users=users,
|
|
uuid=uuid, next=next_page, domains=domains)
|
|
|
|
@bp.route("/hnsid", methods=["POST"])
|
|
def hnsid():
|
|
# Get address and signature from the request
|
|
address = request.json.get("address")
|
|
signature = request.json.get("signature")
|
|
message = request.json.get("message")
|
|
|
|
# Make sure message is in the correct format
|
|
if not message.startswith("I am signing this message to log in to HNS Login as "):
|
|
print("Invalid message format")
|
|
return jsonify({"success": False})
|
|
if not message.endswith(session["uuid"]):
|
|
print("Invalid message format")
|
|
return jsonify({"success": False})
|
|
|
|
# Verify the signature
|
|
msg = encode_defunct(text=message)
|
|
signer = Account.recover_message(msg, signature=signature).lower()
|
|
if signer != address:
|
|
print("Signature verification failed")
|
|
print(signer, address)
|
|
return jsonify({"success": False})
|
|
|
|
# Save the address in the session
|
|
session["address"] = address
|
|
session.permanent = True
|
|
|
|
return jsonify({"success": True})
|
|
|
|
@bp.route("/hnsid/<domain>")
|
|
def hnsid_domain(domain):
|
|
# Get the address from the session
|
|
address = session.get("address")
|
|
if not address:
|
|
return jsonify({"error": "No address found in session"})
|
|
|
|
# Get domain info from Opensea
|
|
openseaInfo = requests.get(f"https://api.opensea.io/api/v2/chain/optimism/account/{address}/nfts?collection=handshake-slds",
|
|
headers={"Accept": "application/json",
|
|
"x-api-key":openSeaAPIKey})
|
|
if openseaInfo.status_code != 200:
|
|
return jsonify({"error": "Failed to get domain info from Opensea"})
|
|
hnsid = openseaInfo.json()
|
|
for nft in hnsid["nfts"]:
|
|
if nft["name"] == domain:
|
|
# Add domain to the session
|
|
user = User.query.filter_by(username=domain).first()
|
|
if not user:
|
|
# Create user with username and profile picture
|
|
user = User(username=domain, profile_picture=nft["image_url"])
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
session["id"] = user.id
|
|
session.permanent = True
|
|
|
|
# Check if next page is specified
|
|
next_page = request.args.get("next")
|
|
if next_page:
|
|
return redirect(next_page)
|
|
|
|
return redirect("/")
|
|
|
|
return jsonify({"success": False, "error": "Domain not found"})
|
|
|
|
|
|
@bp.route("/txt", methods=["POST"])
|
|
def txtLogin():
|
|
# Get domain from form
|
|
domain = request.form.get("domain").lower().strip().replace("/", "").removesuffix(".")
|
|
# Get uuid
|
|
uuid = session["uuid"]
|
|
|
|
query = dns.message.make_query(domain, dns.rdatatype.TXT)
|
|
dns_request = query.to_wire()
|
|
|
|
# Send the DNS query over HTTPS
|
|
response = requests.post('https://hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
|
|
|
|
# Parse the DNS response
|
|
dns_response = dns.message.from_wire(response.content)
|
|
|
|
# Loop over TXT records and look for profile avatar
|
|
idns_records = []
|
|
for record in dns_response.answer:
|
|
if record.rdtype == dns.rdatatype.TXT:
|
|
for txt in record:
|
|
txt_value:str = txt.to_text().strip('"')
|
|
if txt_value.startswith("IDNS1"):
|
|
print(txt_value)
|
|
idns = txt_value.removeprefix("IDNS1 ")
|
|
idns = idns.split(" ")
|
|
for r in idns:
|
|
idns_records.append(r)
|
|
|
|
for record in idns_records:
|
|
print(record)
|
|
type = record.split(":")[0]
|
|
content = record.split(":")[1]
|
|
key = content.split("=")[0]
|
|
value = content.split("=")[1]
|
|
print(f"Type: {type}, Key: {key}, Value: {value}")
|
|
if type == "auth" and key == "login.hns.au":
|
|
if value == uuid:
|
|
# Add domain to user
|
|
user = User.query.filter_by(username=domain).first()
|
|
if not user:
|
|
# Create user with username and profile picture
|
|
user = User(username=domain)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
session["id"] = user.id
|
|
session.permanent = True
|
|
if "domains" not in session:
|
|
session["domains"] = []
|
|
|
|
if domain not in session["domains"]:
|
|
session["domains"].append(domain)
|
|
print("User logged in with TXT")
|
|
|
|
# Check if next page is specified
|
|
next_page = request.args.get("next")
|
|
print(next_page)
|
|
if next_page and next_page != "None":
|
|
return redirect(next_page)
|
|
return redirect("/")
|
|
|
|
return render_template("error.html",error="The domain wasn't able to be authenticated.",
|
|
message="<br>Double check the TXT record and try again",
|
|
custom="<button onclick='window.location.reload();'>Try again</button>"), 200
|
|
|
|
@bp.route("/txt/<domain>")
|
|
def txtLoginDomain(domain:str):
|
|
# Get uuid
|
|
uuid = session["uuid"]
|
|
|
|
idns_records = get_idns_records(domain.lower().strip().replace("/", "").removesuffix("."))
|
|
|
|
|
|
for record in idns_records:
|
|
print(record)
|
|
type = record.split(":")[0]
|
|
content = record.split(":")[1]
|
|
key = content.split("=")[0]
|
|
value = content.split("=")[1]
|
|
print(f"Type: {type}, Key: {key}, Value: {value}")
|
|
if type == "auth" and key == "login.hns.au":
|
|
if value == uuid:
|
|
# Add domain to user
|
|
user = User.query.filter_by(username=domain).first()
|
|
if not user:
|
|
# Create user with username and profile picture
|
|
user = User(username=domain)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
session["id"] = user.id
|
|
session.permanent = True
|
|
if "domains" not in session:
|
|
session["domains"] = []
|
|
|
|
if domain not in session["domains"]:
|
|
session["domains"].append(domain)
|
|
print("User logged in with TXT")
|
|
|
|
# Check if next page is specified
|
|
next_page = request.args.get("next")
|
|
print(next_page)
|
|
if next_page and next_page != "None":
|
|
return redirect(next_page)
|
|
return redirect("/")
|
|
|
|
return render_template("error.html",error="The domain wasn't able to be authenticated.",
|
|
message="<br>Double check the TXT record and try again",
|
|
custom="<button onclick='window.location.reload();'>Try again</button>"), 200
|
|
|
|
@bp.route("/logout")
|
|
def logout():
|
|
del session["id"]
|
|
next = request.args.get("next")
|
|
if next:
|
|
return redirect(url_for("home.home", next=next))
|
|
|
|
return redirect("/")
|
|
|
|
|
|
@bp.route("/create_client", methods=("GET", "POST"))
|
|
def create_client():
|
|
user = current_user()
|
|
if not user:
|
|
return redirect("/")
|
|
if request.method == "GET":
|
|
return render_template("create_client.html")
|
|
|
|
client_id = gen_salt(24)
|
|
client_id_issued_at = int(time.time())
|
|
client = OAuth2Client(
|
|
client_id=client_id,
|
|
client_id_issued_at=client_id_issued_at,
|
|
user_id=user.id,
|
|
)
|
|
|
|
form = request.form
|
|
client_metadata = {
|
|
"client_name": form["client_name"],
|
|
"client_uri": form["client_uri"],
|
|
"grant_types": split_by_crlf(form["grant_type"]),
|
|
"redirect_uris": split_by_crlf(form["redirect_uri"]),
|
|
"response_types": split_by_crlf(form["response_type"]),
|
|
"scope": " ".join(form.getlist("scope")),
|
|
"token_endpoint_auth_method": form["token_endpoint_auth_method"],
|
|
}
|
|
client.set_client_metadata(client_metadata)
|
|
|
|
if form["token_endpoint_auth_method"] == "none":
|
|
client.client_secret = ""
|
|
else:
|
|
client.client_secret = gen_salt(48)
|
|
|
|
db.session.add(client)
|
|
db.session.commit()
|
|
return redirect("/client/" + client_id)
|
|
|
|
@bp.route("/client/<client_id>")
|
|
def client(client_id):
|
|
user = current_user()
|
|
if not user:
|
|
return redirect("/")
|
|
|
|
client:OAuth2Client = OAuth2Client.query.filter_by(client_id=client_id).first()
|
|
if not client:
|
|
return redirect("/")
|
|
|
|
if client.user_id != user.id and user.id != 1:
|
|
return redirect("/")
|
|
|
|
metadata = client.client_metadata
|
|
# Convert metadata to json
|
|
metadata = json.dumps(metadata, indent=4)
|
|
|
|
return render_template("client.html", client=metadata,id=client_id,
|
|
secret=client.client_secret)
|
|
|
|
|
|
@bp.route("/delete_client")
|
|
def delete_client():
|
|
user = current_user()
|
|
if not user:
|
|
return redirect("/")
|
|
|
|
client_id = request.args.get("client_id")
|
|
client:OAuth2Client = OAuth2Client.query.filter_by(client_id=client_id).first()
|
|
|
|
if not client:
|
|
return redirect("/")
|
|
|
|
if client.user_id != user.id and user.id != 1:
|
|
return redirect("/")
|
|
|
|
db.session.delete(client)
|
|
db.session.commit()
|
|
return redirect("/")
|
|
|
|
|
|
@bp.route("/oauth/authorize", methods=["GET", "POST"])
|
|
def authorize():
|
|
user = current_user()
|
|
# if user log status is not true (Auth server), then to log it in
|
|
if not user:
|
|
return redirect(url_for("home.home", next=urlParse(request.url))) # Force HTTPS
|
|
if request.method == "GET":
|
|
try:
|
|
grant = authorization.get_consent_grant(end_user=user)
|
|
except OAuth2Error as error:
|
|
print(json.dumps({
|
|
"error": error.error,
|
|
"description": error.description,
|
|
"uri": error.uri,
|
|
}, indent=4),flush=True)
|
|
return jsonify({
|
|
"error": error.error,
|
|
"description": error.description,
|
|
"uri": error.uri,
|
|
})
|
|
return render_template("authorize.html", user=user, grant=grant)
|
|
|
|
grant_user = user
|
|
|
|
return authorization.create_authorization_response(grant_user=grant_user)
|
|
|
|
@bp.route("/auth", methods=["GET", "POST"])
|
|
def plainAuth():
|
|
user:User = current_user()
|
|
if not user:
|
|
return redirect(url_for("home.home", next=urlParse(request.url)))
|
|
|
|
# Check for return URL
|
|
return_url = request.args.get("return")
|
|
if not return_url:
|
|
return render_template("error.html",error="No return URL specified")
|
|
|
|
# Get host from return URL
|
|
host = return_url.split("/")[2]
|
|
|
|
|
|
if request.method == "GET":
|
|
# Custom grant
|
|
grant = {
|
|
"client":{
|
|
"client_name": host,
|
|
}
|
|
}
|
|
return render_template("authorize.html", user=user, grant=grant)
|
|
|
|
# Create a hex token for the user
|
|
token = gen_salt(24)
|
|
|
|
expiry = dt.datetime.now() + timedelta(days=7)
|
|
# Save the token in the database
|
|
auth_token = AuthTokens(service=host, user_name=user.username, access_token=token, refresh_token="", expires_at=expiry)
|
|
db.session.add(auth_token)
|
|
db.session.commit()
|
|
|
|
# Remove any stale tokens
|
|
AuthTokens.query.filter(AuthTokens.expires_at < time.time()).delete()
|
|
|
|
|
|
return redirect(return_url+"?username="+user.username+"&token="+token)
|
|
|
|
@bp.route("/auth/user")
|
|
def authUser():
|
|
if "token" not in request.args:
|
|
return jsonify({"error": "No token specified"})
|
|
|
|
token = request.args.get("token")
|
|
# Remove any stale tokens
|
|
AuthTokens.query.filter(AuthTokens.expires_at < time.time()).delete()
|
|
|
|
username = AuthTokens.query.filter_by(access_token=token).first()
|
|
if not username:
|
|
return jsonify({"error": "Invalid token"})
|
|
|
|
user = User.query.filter_by(username=username.user_name).first()
|
|
|
|
return jsonify(get_user_info(user))
|
|
|
|
@bp.route("/oauth/token", methods=["POST"])
|
|
def issue_token():
|
|
try:
|
|
resp = authorization.create_token_response()
|
|
return resp
|
|
except OAuth2Error as error:
|
|
print(json.dumps({
|
|
"error": error.error,
|
|
"description": error.description,
|
|
"uri": error.uri,
|
|
}, indent=4),flush=True)
|
|
return jsonify({
|
|
"error": error.error,
|
|
"description": error.description,
|
|
"uri": error.uri,
|
|
})
|
|
|
|
|
|
@bp.route("/oauth/revoke", methods=["POST"])
|
|
def revoke_token():
|
|
return authorization.create_endpoint_response("revocation")
|
|
|
|
|
|
@bp.route("/discovery")
|
|
def autodiscovery():
|
|
host = request.host
|
|
discovery = {
|
|
"issuer": f"https://{host}/",
|
|
"authorization_endpoint": f"https://{host}/oauth/authorize",
|
|
"token_endpoint": f"https://{host}/oauth/token",
|
|
"userinfo_endpoint": f"https://{host}/api/me",
|
|
"revocation_endpoint": f"https://{host}/oauth/revoke",
|
|
"response_types_supported": ["code"],
|
|
"subject_types_supported": ["public"],
|
|
"id_token_signing_alg_values_supported": ["RS256"],
|
|
"scopes_supported": ["openid", "email", "profile"],
|
|
"token_endpoint_auth_methods_supported": [
|
|
"client_secret_basic",
|
|
"client_secret_post",
|
|
],
|
|
"grant_types_supported": ["authorization_code"],
|
|
}
|
|
|
|
return jsonify(discovery)
|
|
|
|
@bp.route("/api/me")
|
|
@require_oauth(["profile", "openid"])
|
|
def api_me():
|
|
user = current_token.user
|
|
return jsonify(get_user_info(user))
|
|
|
|
|
|
@bp.route("/u/<username>")
|
|
def profile(username):
|
|
username = username.lower().strip().replace("/", "")
|
|
user = User.query.filter_by(username=username).first()
|
|
if not user:
|
|
return jsonify({"error": "User not found"})
|
|
|
|
return jsonify(get_user_info(user))
|
|
|
|
@bp.route("/u/<username>/avatar.png")
|
|
def avatar(username):
|
|
username = username.lower().strip().replace("/", "")
|
|
|
|
user = User.query.filter_by(username=username).first()
|
|
if not user:
|
|
print("User not found")
|
|
return send_from_directory("templates", "favicon.png", mimetype="image/png")
|
|
|
|
# Check if file exists
|
|
if os.path.exists(f"website/avatars/{username}.png"):
|
|
return send_from_directory("avatars", f"{username}.png", mimetype="image/png")
|
|
# If not, download from HNS info
|
|
query = dns.message.make_query(username, dns.rdatatype.TXT)
|
|
dns_request = query.to_wire()
|
|
|
|
# Send the DNS query over HTTPS
|
|
response = requests.post('https://hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
|
|
|
|
# Parse the DNS response
|
|
dns_response = dns.message.from_wire(response.content)
|
|
|
|
# Loop over TXT records and look for profile avatar
|
|
avatar_url=""
|
|
for record in dns_response.answer:
|
|
if record.rdtype == dns.rdatatype.TXT:
|
|
for txt in record:
|
|
txt_value = txt.to_text().strip('"')
|
|
if txt_value.startswith("profile avatar="):
|
|
avatar_url = txt_value.split("profile avatar=")[1]
|
|
break
|
|
|
|
if avatar_url != "":
|
|
# Download the avatar using DNS-over-HTTPS
|
|
add_dns_provider("hns", "https://hnsdoh.com/dns-query")
|
|
session = DNSOverHTTPSSession(provider="hns")
|
|
response = session.get(avatar_url)
|
|
with open(f"website/avatars/{username}.png", "wb") as f:
|
|
f.write(response.content)
|
|
return send_from_directory("avatars", f"{username}.png", mimetype="image/png")
|
|
|
|
return send_from_directory("templates", "favicon.png", mimetype="image/png")
|
|
|
|
@bp.route("/favicon.png")
|
|
def favicon():
|
|
return send_from_directory("templates", "favicon.png", mimetype="image/png")
|
|
|
|
|
|
@bp.errorhandler(404)
|
|
@bp.app_errorhandler(404)
|
|
def page_not_found(e):
|
|
return render_template("error.html",error="404 - Page Not Found"), 404
|