Files
FireExplorer/tools.py
Nathan Woodburn f70454c9a4
All checks were successful
Check Code Quality / RuffCheck (push) Successful in 1m5s
Build Docker / BuildImage (push) Successful in 1m10s
fix: Add openssl to dockerfile
2025-11-21 00:29:42 +11:00

226 lines
6.9 KiB
Python

import dns.resolver
from cryptography import x509
from cryptography.hazmat.backends import default_backend
import tempfile
import subprocess
import binascii
import datetime
import dns.asyncresolver
import dns.message
import dns.query
import dns.rdatatype
import httpx
from requests_doh import DNSOverHTTPSSession, add_dns_provider
import urllib3
from cryptography.x509.oid import ExtensionOID
urllib3.disable_warnings(
urllib3.exceptions.InsecureRequestWarning
) # Disable insecure request warnings (since we are manually verifying the certificate)
def hip2(domain: str) -> str | None:
domain_check = False
try:
# Get the IP
ip = resolve_with_doh(domain)
# Run the openssl s_client command
s_client_command = [
"openssl",
"s_client",
"-showcerts",
"-connect",
f"{ip}:443",
"-servername",
domain,
]
s_client_process = subprocess.Popen(
s_client_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
)
s_client_output, _ = s_client_process.communicate(input=b"\n")
certificates = []
current_cert = ""
for line in s_client_output.split(b"\n"):
current_cert += line.decode("utf-8") + "\n"
if "-----END CERTIFICATE-----" in line.decode("utf-8"):
certificates.append(current_cert)
current_cert = ""
# Remove anything before -----BEGIN CERTIFICATE-----
certificates = [
cert[cert.find("-----BEGIN CERTIFICATE-----") :] for cert in certificates
]
if certificates:
cert = certificates[0]
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_cert_file:
temp_cert_file.write(cert)
temp_cert_file.seek(
0
) # Move back to the beginning of the temporary file
tlsa_command = [
"openssl",
"x509",
"-in",
temp_cert_file.name,
"-pubkey",
"-noout",
"|",
"openssl",
"pkey",
"-pubin",
"-outform",
"der",
"|",
"openssl",
"dgst",
"-sha256",
"-binary",
]
tlsa_process = subprocess.Popen(
" ".join(tlsa_command), shell=True, stdout=subprocess.PIPE
)
tlsa_output, _ = tlsa_process.communicate()
tlsa_server = "3 1 1 " + binascii.hexlify(tlsa_output).decode("utf-8")
# Get domains
cert_obj = x509.load_pem_x509_certificate(
cert.encode("utf-8"), default_backend()
)
domains = []
for ext in cert_obj.extensions:
if ext.oid == ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
san_list = ext.value.get_values_for_type(x509.DNSName)
domains.extend(san_list)
# Extract the common name (CN) from the subject
common_name = cert_obj.subject.get_attributes_for_oid(
x509.NameOID.COMMON_NAME
)
if common_name:
if common_name[0].value not in domains:
domains.append(common_name[0].value)
if domains:
if domain in domains:
domain_check = True
else:
# Check if matching wildcard domain exists
for d in domains:
if d.startswith("*"):
if domain.split(".")[1:] == d.split(".")[1:]:
domain_check = True
break
expiry_date = cert_obj.not_valid_after_utc
# Check if expiry date is past
if expiry_date < datetime.datetime.now(datetime.timezone.utc):
return
else:
return
try:
# Check for TLSA record
tlsa = resolve_TLSA_with_doh(domain)
if not tlsa:
return
else:
if tlsa_server == str(tlsa):
if domain_check:
# Get the Hip2 addresss from /.well-known/wallets/HNS
add_dns_provider("HNSDoH", "https://au.hnsdoh.com/dns-query")
session = DNSOverHTTPSSession("HNSDoH")
r = session.get(
f"https://{domain}/.well-known/wallets/HNS", verify=False
)
return r.text
else:
return
else:
return
except Exception as e:
print(f"Hip2: TLSA lookup/verification failed with error: {e}", flush=True)
return
# Catch all exceptions
except Exception as e:
print(f"Hip2: Lookup failed with error: {e}", flush=True)
return
def wallet_txt(domain: str, doh_url="https://au.hnsdoh.com/dns-query") -> str | None:
with httpx.Client() as client:
q = dns.message.make_query(domain, dns.rdatatype.from_text("TYPE262"))
r = dns.query.https(q, doh_url, session=client)
if not r.answer:
return
wallet_record = None
for ans in r.answer:
raw = ans[0].to_wire() # type: ignore
try:
data = raw[1:].decode("utf-8", errors="ignore")
except UnicodeDecodeError:
return f"Unknown WALLET record format: {raw.hex()}"
if data.startswith("HNS:"):
wallet_record = data[4:]
break
elif data.startswith("HNS "):
wallet_record = data[4:]
break
elif data.startswith('"HNS" '):
wallet_record = data[6:].strip('"')
break
return wallet_record
def resolve_with_doh(query_name, doh_url="https://au.hnsdoh.com/dns-query"):
with httpx.Client() as client:
q = dns.message.make_query(query_name, dns.rdatatype.A)
r = dns.query.https(q, doh_url, session=client)
ip = r.answer[0][0].address # type: ignore
return ip
def resolve_TLSA_with_doh(query_name, doh_url="https://au.hnsdoh.com/dns-query"):
query_name = "_443._tcp." + query_name
with httpx.Client() as client:
q = dns.message.make_query(query_name, dns.rdatatype.TLSA)
r = dns.query.https(q, doh_url, session=client)
tlsa = r.answer[0][0]
return tlsa
def emoji_to_punycode(emoji):
try:
return emoji.encode("idna").decode("ascii")
except Exception:
return emoji
def punycode_to_emoji(punycode):
try:
return punycode.encode("ascii").decode("idna")
except Exception:
return punycode