Nathan Woodburn
859eede9ed
All checks were successful
Build Docker / Build Image (push) Successful in 32s
124 lines
4.8 KiB
Python
124 lines
4.8 KiB
Python
# DNS
|
|
import binascii
|
|
import datetime
|
|
import subprocess
|
|
import tempfile
|
|
import dns.resolver
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
|
def resolve(HSD_IP, HSD_PORT, domain, token="HNS"):
|
|
resolver = dns.resolver.Resolver()
|
|
resolver.nameservers = [HSD_IP]
|
|
resolver.port = HSD_PORT
|
|
records = []
|
|
try:
|
|
# Query the DNS record
|
|
response = resolver.resolve(domain, "A")
|
|
for record in response:
|
|
records.append(str(record))
|
|
if not records:
|
|
return False
|
|
except Exception as e:
|
|
return False
|
|
|
|
Server_IP = records[0]
|
|
curl_command = ["curl","--connect-to",f"{domain}:443:{Server_IP}:443",f"https://{domain}/.well-known/wallets/{token}","--insecure"]
|
|
curl_process = subprocess.Popen(curl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
curl_output, _ = curl_process.communicate()
|
|
return curl_output.decode("utf-8")
|
|
|
|
def TLSA_check(HSD_IP,HSD_PORT,domain):
|
|
resolver = dns.resolver.Resolver()
|
|
resolver.nameservers = [HSD_IP]
|
|
resolver.port = HSD_PORT
|
|
try:
|
|
# Query the DNS record
|
|
response = resolver.resolve(domain, "A")
|
|
records = []
|
|
for record in response:
|
|
records.append(str(record))
|
|
|
|
if not records:
|
|
return "No A record found"
|
|
|
|
# Get the first A record
|
|
ip = records[0]
|
|
|
|
# Run the openssl s_client command
|
|
s_client_command = ["openssl","s_client","-showcerts","-connect",f"{ip}:443","-servername",domain,]
|
|
s_client_process = subprocess.Popen(s_client_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
|
|
s_client_output, _ = s_client_process.communicate(input=b"\n")
|
|
certificates = []
|
|
current_cert = ""
|
|
for line in s_client_output.split(b"\n"):
|
|
current_cert += line.decode("utf-8") + "\n"
|
|
if "-----END CERTIFICATE-----" in line.decode("utf-8"):
|
|
certificates.append(current_cert)
|
|
current_cert = ""
|
|
# Remove anything before -----BEGIN CERTIFICATE-----
|
|
certificates = [cert[cert.find("-----BEGIN CERTIFICATE-----"):] for cert in certificates]
|
|
if not certificates:
|
|
return "No certificates found"
|
|
cert = certificates[0]
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_cert_file:
|
|
temp_cert_file.write(cert)
|
|
temp_cert_file.seek(0) # Move back to the beginning of the temporary file
|
|
tlsa_command = ["openssl","x509","-in",temp_cert_file.name,"-pubkey","-noout","|","openssl","pkey","-pubin","-outform","der","|","openssl","dgst","-sha256","-binary",]
|
|
tlsa_process = subprocess.Popen(" ".join(tlsa_command), shell=True, stdout=subprocess.PIPE)
|
|
tlsa_output, _ = tlsa_process.communicate()
|
|
tlsa_server = "3 1 1 " + binascii.hexlify(tlsa_output).decode("utf-8")
|
|
|
|
# Get domains
|
|
cert_obj = x509.load_pem_x509_certificate(cert.encode("utf-8"), default_backend())
|
|
|
|
domains = []
|
|
for ext in cert_obj.extensions:
|
|
if ext.oid == x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
|
|
san_list = ext.value.get_values_for_type(x509.DNSName)
|
|
domains.extend(san_list)
|
|
|
|
# Extract the common name (CN) from the subject
|
|
common_name = cert_obj.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
|
|
if common_name:
|
|
if common_name[0].value not in domains:
|
|
domains.append(common_name[0].value)
|
|
|
|
if not domains:
|
|
return "Invalid certificate"
|
|
|
|
domain_parts = domain.split(".")
|
|
higher_domain = ""
|
|
for i in range(1,len(domain_parts)):
|
|
higher_domain = domain_parts[i] + "." + higher_domain
|
|
|
|
higher_domain = higher_domain[:-1]
|
|
|
|
if not domain in domains and not "*." + higher_domain in domains:
|
|
return "Invalid certificate - Missing domain"
|
|
|
|
expiry_date = cert_obj.not_valid_after
|
|
# Check if expiry date is past
|
|
if expiry_date < datetime.datetime.now():
|
|
return "Invalid certificate - Expired"
|
|
|
|
try:
|
|
# Check for TLSA record
|
|
response = resolver.resolve("_443._tcp."+domain, "TLSA")
|
|
tlsa_records = []
|
|
for record in response:
|
|
tlsa_records.append(str(record))
|
|
|
|
if not tlsa_records:
|
|
return "No TLSA record found"
|
|
else:
|
|
if tlsa_server != tlsa_records[0]:
|
|
return "Invalid TLSA record"
|
|
except:
|
|
return "Exception in TLSA record check"
|
|
return True
|
|
|
|
# Catch all exceptions
|
|
except Exception as e:
|
|
return "Exception: "+str(e) |