Compare commits

..

2 Commits

Author SHA1 Message Date
2f099b7c07 feat: Add onchain TXT record login using FIRE HSD
All checks were successful
Build Docker / Build Docker (push) Successful in 34s
2025-11-20 10:39:55 +11:00
03fe2b552c fix: Catch DNS from wireformat error
All checks were successful
Build Docker / Build Docker (push) Successful in 1m39s
2025-11-20 10:15:52 +11:00
6 changed files with 1743 additions and 48 deletions

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.13

6
main.py Normal file
View File

@@ -0,0 +1,6 @@
def main():
print("Hello from hns-login!")
if __name__ == "__main__":
main()

22
pyproject.toml Normal file
View File

@@ -0,0 +1,22 @@
[project]
name = "hns-login"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"authlib>=1.6.5",
"dnspython>=2.6.1",
"eth-account>=0.13.7",
"flask>=3.1.2",
"flask-sqlalchemy>=3.1.1",
"python-dotenv>=1.2.1",
"requests>=2.32.3",
"requests-doh>=1.0.0",
"web3>=7.14.0",
]
[dependency-groups]
dev = [
"ruff>=0.14.5",
]

1619
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
import time
import datetime as dt
from .varo_auth import flask_login as varo_auth_flask_login
from flask import Blueprint, request, session, url_for, make_response
from flask import Blueprint, request, session, url_for
from flask import render_template, redirect, jsonify, send_from_directory
from werkzeug.security import gen_salt
from authlib.integrations.flask_oauth2 import current_token
@@ -18,7 +18,6 @@ from datetime import timedelta
from eth_account.messages import encode_defunct
from eth_account import Account
import json
import urllib.parse
@@ -57,14 +56,15 @@ def split_by_crlf(s):
return [v for v in s.splitlines() if v]
def get_idns_records(domain:str) -> list:
idns_records = []
try:
query = dns.message.make_query(domain, dns.rdatatype.TXT)
dns_request = query.to_wire()
# Send the DNS query over HTTPS
response = requests.post('https://hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
response = requests.post('https://au.hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
# Parse the DNS response
dns_response = dns.message.from_wire(response.content)
# Loop over TXT records and look for profile
idns_records = []
for record in dns_response.answer:
if record.rdtype == dns.rdatatype.TXT:
for txt in record:
@@ -75,6 +75,30 @@ def get_idns_records(domain:str) -> list:
idns = idns.split(" ")
for r in idns:
idns_records.append(r)
except Exception as e:
print(f"Error fetching DNS records: {e}")
# Get onchain records
try:
onchain_response = requests.get(f"https://hsd.hns.au/api/v1/nameresource/{domain}")
if onchain_response.status_code == 200:
onchain_data = onchain_response.json()
if "records" in onchain_data:
for record in onchain_data["records"]:
if record["type"] == "TXT":
txt_values = record["txt"]
for txt_value in txt_values:
txt_value = txt_value.strip('"')
if txt_value.startswith("IDNS1"):
print(txt_value)
idns = txt_value.removeprefix("IDNS1 ")
idns = idns.split(" ")
for r in idns:
idns_records.append(r)
except Exception as e:
print(f"Error fetching onchain records: {e}")
return idns_records
def get_user_info(user:User) -> dict:
@@ -145,7 +169,7 @@ def home():
if request.method == "POST":
auth = varo_auth_flask_login(request)
if auth == False:
if not auth:
return redirect("/?error=login_failed")
print(auth)
user = User.query.filter_by(username=auth).first()
@@ -267,22 +291,20 @@ def hnsid_domain(domain):
@bp.route("/txt", methods=["POST"])
def txtLogin():
idns_records = []
try:
# Get domain from form
domain = request.form.get("domain").lower().strip().replace("/", "").removesuffix(".")
# Get uuid
uuid = session["uuid"]
query = dns.message.make_query(domain, dns.rdatatype.TXT)
dns_request = query.to_wire()
# Send the DNS query over HTTPS
response = requests.post('https://hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
response = requests.post('https://au.hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
# Parse the DNS response
dns_response = dns.message.from_wire(response.content)
# Loop over TXT records and look for profile avatar
idns_records = []
# Loop over TXT records and look for profile
for record in dns_response.answer:
if record.rdtype == dns.rdatatype.TXT:
for txt in record:
@@ -293,6 +315,31 @@ def txtLogin():
idns = idns.split(" ")
for r in idns:
idns_records.append(r)
except Exception as e:
print(f"Error fetching DNS records: {e}")
return render_template("error.html",error="The domain wasn't able to be authenticated.",
message="<br>Double check the TXT record and try again.",
custom="<button onclick='window.location.reload();'>Try again</button>"), 200
try:
# Get onchain records
onchain_response = requests.get(f"https://hsd.hns.au/api/v1/nameresource/{domain}")
if onchain_response.status_code == 200:
onchain_data = onchain_response.json()
if "records" in onchain_data:
for record in onchain_data["records"]:
if record["type"] == "TXT":
txt_values = record["txt"]
for txt_value in txt_values:
txt_value = txt_value.strip('"')
if txt_value.startswith("IDNS1"):
print(txt_value)
idns = txt_value.removeprefix("IDNS1 ")
idns = idns.split(" ")
for r in idns:
idns_records.append(r)
except Exception as e:
print(f"Error fetching onchain records: {e}")
for record in idns_records:
print(record)
@@ -623,7 +670,7 @@ def avatar(username):
dns_request = query.to_wire()
# Send the DNS query over HTTPS
response = requests.post('https://hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
response = requests.post('https://au.hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
# Parse the DNS response
dns_response = dns.message.from_wire(response.content)
@@ -640,7 +687,7 @@ def avatar(username):
if avatar_url != "":
# Download the avatar using DNS-over-HTTPS
add_dns_provider("hns", "https://hnsdoh.com/dns-query")
add_dns_provider("hns", "https://au.hnsdoh.com/dns-query")
session = DNSOverHTTPSSession(provider="hns")
response = session.get(avatar_url)
with open(f"website/avatars/{username}.png", "wb") as f:

View File

@@ -12,7 +12,7 @@ def flask_login(request):
def login(request):
r = requests.get(f'https://auth.shakestation.io/verify/{request}')
r = r.json()
if r['success'] == False:
if not r['success']:
return False
if 'data' in r: