feat: Add onchain TXT record login using FIRE HSD
All checks were successful
Build Docker / Build Docker (push) Successful in 34s
All checks were successful
Build Docker / Build Docker (push) Successful in 34s
This commit is contained in:
@@ -56,24 +56,49 @@ def split_by_crlf(s):
|
||||
return [v for v in s.splitlines() if v]
|
||||
|
||||
def get_idns_records(domain:str) -> list:
|
||||
query = dns.message.make_query(domain, dns.rdatatype.TXT)
|
||||
dns_request = query.to_wire()
|
||||
# Send the DNS query over HTTPS
|
||||
response = requests.post('https://au.hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
|
||||
# Parse the DNS response
|
||||
dns_response = dns.message.from_wire(response.content)
|
||||
# Loop over TXT records and look for profile
|
||||
idns_records = []
|
||||
for record in dns_response.answer:
|
||||
if record.rdtype == dns.rdatatype.TXT:
|
||||
for txt in record:
|
||||
txt_value:str = txt.to_text().strip('"')
|
||||
if txt_value.startswith("IDNS1"):
|
||||
print(txt_value)
|
||||
idns = txt_value.removeprefix("IDNS1 ")
|
||||
idns = idns.split(" ")
|
||||
for r in idns:
|
||||
idns_records.append(r)
|
||||
try:
|
||||
query = dns.message.make_query(domain, dns.rdatatype.TXT)
|
||||
dns_request = query.to_wire()
|
||||
# Send the DNS query over HTTPS
|
||||
response = requests.post('https://au.hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
|
||||
# Parse the DNS response
|
||||
dns_response = dns.message.from_wire(response.content)
|
||||
# Loop over TXT records and look for profile
|
||||
for record in dns_response.answer:
|
||||
if record.rdtype == dns.rdatatype.TXT:
|
||||
for txt in record:
|
||||
txt_value:str = txt.to_text().strip('"')
|
||||
if txt_value.startswith("IDNS1"):
|
||||
print(txt_value)
|
||||
idns = txt_value.removeprefix("IDNS1 ")
|
||||
idns = idns.split(" ")
|
||||
for r in idns:
|
||||
idns_records.append(r)
|
||||
except Exception as e:
|
||||
print(f"Error fetching DNS records: {e}")
|
||||
|
||||
# Get onchain records
|
||||
try:
|
||||
onchain_response = requests.get(f"https://hsd.hns.au/api/v1/nameresource/{domain}")
|
||||
if onchain_response.status_code == 200:
|
||||
onchain_data = onchain_response.json()
|
||||
if "records" in onchain_data:
|
||||
for record in onchain_data["records"]:
|
||||
if record["type"] == "TXT":
|
||||
txt_values = record["txt"]
|
||||
for txt_value in txt_values:
|
||||
txt_value = txt_value.strip('"')
|
||||
if txt_value.startswith("IDNS1"):
|
||||
print(txt_value)
|
||||
idns = txt_value.removeprefix("IDNS1 ")
|
||||
idns = idns.split(" ")
|
||||
for r in idns:
|
||||
idns_records.append(r)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error fetching onchain records: {e}")
|
||||
|
||||
return idns_records
|
||||
|
||||
def get_user_info(user:User) -> dict:
|
||||
@@ -266,7 +291,7 @@ def hnsid_domain(domain):
|
||||
|
||||
@bp.route("/txt", methods=["POST"])
|
||||
def txtLogin():
|
||||
|
||||
idns_records = []
|
||||
try:
|
||||
# Get domain from form
|
||||
domain = request.form.get("domain").lower().strip().replace("/", "").removesuffix(".")
|
||||
@@ -278,23 +303,43 @@ def txtLogin():
|
||||
response = requests.post('https://au.hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
|
||||
# Parse the DNS response
|
||||
dns_response = dns.message.from_wire(response.content)
|
||||
|
||||
# Loop over TXT records and look for profile
|
||||
for record in dns_response.answer:
|
||||
if record.rdtype == dns.rdatatype.TXT:
|
||||
for txt in record:
|
||||
txt_value:str = txt.to_text().strip('"')
|
||||
if txt_value.startswith("IDNS1"):
|
||||
print(txt_value)
|
||||
idns = txt_value.removeprefix("IDNS1 ")
|
||||
idns = idns.split(" ")
|
||||
for r in idns:
|
||||
idns_records.append(r)
|
||||
except Exception as e:
|
||||
print(f"Error fetching DNS records: {e}")
|
||||
return render_template("error.html",error="The domain wasn't able to be authenticated.",
|
||||
message="<br>Double check the TXT record and try again.",
|
||||
custom="<button onclick='window.location.reload();'>Try again</button>"), 200
|
||||
# Loop over TXT records and look for profile avatar
|
||||
idns_records = []
|
||||
for record in dns_response.answer:
|
||||
if record.rdtype == dns.rdatatype.TXT:
|
||||
for txt in record:
|
||||
txt_value:str = txt.to_text().strip('"')
|
||||
if txt_value.startswith("IDNS1"):
|
||||
print(txt_value)
|
||||
idns = txt_value.removeprefix("IDNS1 ")
|
||||
idns = idns.split(" ")
|
||||
for r in idns:
|
||||
idns_records.append(r)
|
||||
|
||||
try:
|
||||
# Get onchain records
|
||||
onchain_response = requests.get(f"https://hsd.hns.au/api/v1/nameresource/{domain}")
|
||||
if onchain_response.status_code == 200:
|
||||
onchain_data = onchain_response.json()
|
||||
if "records" in onchain_data:
|
||||
for record in onchain_data["records"]:
|
||||
if record["type"] == "TXT":
|
||||
txt_values = record["txt"]
|
||||
for txt_value in txt_values:
|
||||
txt_value = txt_value.strip('"')
|
||||
if txt_value.startswith("IDNS1"):
|
||||
print(txt_value)
|
||||
idns = txt_value.removeprefix("IDNS1 ")
|
||||
idns = idns.split(" ")
|
||||
for r in idns:
|
||||
idns_records.append(r)
|
||||
except Exception as e:
|
||||
print(f"Error fetching onchain records: {e}")
|
||||
|
||||
for record in idns_records:
|
||||
print(record)
|
||||
|
||||
Reference in New Issue
Block a user