feat: Add TXT login using HIP 13
All checks were successful
Build Docker / Build Docker (push) Successful in 27s

This commit is contained in:
2024-06-15 13:29:25 +10:00
parent f6009fc335
commit e66ae58494
3 changed files with 367 additions and 46 deletions

View File

@@ -1,6 +1,6 @@
import time
from .varo_auth import flask_login as varo_auth_flask_login
from flask import Blueprint, request, session, url_for
from flask import Blueprint, request, session, url_for, make_response
from flask import render_template, redirect, jsonify, send_from_directory
from werkzeug.security import gen_salt
from authlib.integrations.flask_oauth2 import current_token
@@ -40,6 +40,14 @@ def split_by_crlf(s):
@bp.route("/", methods=("GET", "POST"))
def home():
next_page = request.args.get("next")
# Check if session exists
if "uuid" not in session:
session["uuid"] = gen_salt(24)
session.permanent = True
uuid = session["uuid"]
if request.method == "POST":
auth = varo_auth_flask_login(request)
if auth == False:
@@ -83,9 +91,15 @@ def home():
if openseaInfo.status_code == 200:
hnsid = openseaInfo.json()
domains = []
if 'domains' in session:
domains = session['domains']
return render_template("home.html", user=user, clients=clients, address=address, hnsid=hnsid, users=users)
return render_template("home.html", user=user, clients=clients,
address=address, hnsid=hnsid, users=users,
uuid=uuid, next=next_page, domains=domains)
@bp.route("/hnsid", methods=["POST"])
def hnsid():
@@ -142,6 +156,129 @@ def hnsid_domain(domain):
return jsonify({"success": False, "error": "Domain not found"})
@bp.route("/txt", methods=["POST"])
def txtLogin():
# Get domain from form
domain = request.form.get("domain")
# Get uuid
uuid = session["uuid"]
query = dns.message.make_query(domain, dns.rdatatype.TXT)
dns_request = query.to_wire()
# Send the DNS query over HTTPS
response = requests.post('https://hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
# Parse the DNS response
dns_response = dns.message.from_wire(response.content)
# Loop over TXT records and look for profile avatar
idns_records = []
for record in dns_response.answer:
if record.rdtype == dns.rdatatype.TXT:
for txt in record:
txt_value:str = txt.to_text().strip('"')
if txt_value.startswith("IDNS1"):
print(txt_value)
idns = txt_value.removeprefix("IDNS1 ")
idns = idns.split(" ")
for r in idns:
idns_records.append(r)
for record in idns_records:
print(record)
type = record.split(":")[0]
content = record.split(":")[1]
key = content.split("=")[0]
value = content.split("=")[1]
print(f"Type: {type}, Key: {key}, Value: {value}")
if type == "auth" and key == "login.hns.au":
if value == uuid:
# Add domain to user
user = User.query.filter_by(username=domain).first()
if not user:
# Create user with username and profile picture
user = User(username=domain)
db.session.add(user)
db.session.commit()
session["id"] = user.id
session.permanent = True
if "domains" not in session:
session["domains"] = []
if domain not in session["domains"]:
session["domains"].append(domain)
print("User logged in with TXT")
# Check if next page is specified
next_page = request.args.get("next")
print(next_page)
if next_page and next_page != "None":
return redirect(next_page)
return redirect("/")
@bp.route("/txt/<domain>")
def txtLoginDomain(domain):
# Get uuid
uuid = session["uuid"]
query = dns.message.make_query(domain, dns.rdatatype.TXT)
dns_request = query.to_wire()
# Send the DNS query over HTTPS
response = requests.post('https://hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
# Parse the DNS response
dns_response = dns.message.from_wire(response.content)
# Loop over TXT records and look for profile avatar
idns_records = []
for record in dns_response.answer:
if record.rdtype == dns.rdatatype.TXT:
for txt in record:
txt_value:str = txt.to_text().strip('"')
if txt_value.startswith("IDNS1"):
print(txt_value)
idns = txt_value.removeprefix("IDNS1 ")
idns = idns.split(" ")
for r in idns:
idns_records.append(r)
for record in idns_records:
print(record)
type = record.split(":")[0]
content = record.split(":")[1]
key = content.split("=")[0]
value = content.split("=")[1]
print(f"Type: {type}, Key: {key}, Value: {value}")
if type == "auth" and key == "login.hns.au":
if value == uuid:
# Add domain to user
user = User.query.filter_by(username=domain).first()
if not user:
# Create user with username and profile picture
user = User(username=domain)
db.session.add(user)
db.session.commit()
session["id"] = user.id
session.permanent = True
if "domains" not in session:
session["domains"] = []
if domain not in session["domains"]:
session["domains"].append(domain)
print("User logged in with TXT")
# Check if next page is specified
next_page = request.args.get("next")
print(next_page)
if next_page and next_page != "None":
return redirect(next_page)
return redirect("/")
@bp.route("/logout")
def logout():
del session["id"]
@@ -355,11 +492,13 @@ def avatar(username):
return send_from_directory("templates", "favicon.png", mimetype="image/png")
@bp.route("/favicon.png")
def favicon():
return send_from_directory("templates", "favicon.png", mimetype="image/png")
@bp.errorhandler(404)
@bp.app_errorhandler(404)
def page_not_found(e):
print(f'404 error: {e}')
return render_template("404.html"), 404