generated from nathanwoodburn/python-webserver-template
411 lines
15 KiB
Python
411 lines
15 KiB
Python
import dns.resolver
|
|
import subprocess
|
|
import tempfile
|
|
import binascii
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
import datetime
|
|
from dns import resolver, dnssec, name, exception
|
|
import time
|
|
|
|
resolver = dns.resolver.Resolver()
|
|
resolver.nameservers = ["194.50.5.28","194.50.5.27","194.50.5.26"]
|
|
resolver.port = 53
|
|
|
|
|
|
def check_ssl(domain: str):
|
|
domain_check = False
|
|
returns = {"success": False,"valid":False}
|
|
try:
|
|
# Query the DNS record
|
|
response = resolver.resolve(domain, "A")
|
|
records = []
|
|
for record in response:
|
|
records.append(str(record))
|
|
|
|
if not records:
|
|
return {"success": False, "message": "No A record found for " + domain}
|
|
|
|
returns["ip"] = records[0]
|
|
if len(records) > 1:
|
|
returns["other_ips"] = records[1:]
|
|
|
|
result, details = validate_dnssec(domain, trace=True)
|
|
returns["dnssec"] = details
|
|
returns["dnssec"]["valid"] = result
|
|
|
|
|
|
# Get the first A record
|
|
ip = records[0]
|
|
|
|
# Run the openssl s_client command
|
|
s_client_command = ["openssl","s_client","-showcerts","-connect",f"{ip}:443","-servername",domain,]
|
|
|
|
s_client_process = subprocess.Popen(s_client_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
|
|
s_client_output, _ = s_client_process.communicate(input=b"\n")
|
|
|
|
certificates = []
|
|
current_cert = ""
|
|
for line in s_client_output.split(b"\n"):
|
|
current_cert += line.decode("utf-8") + "\n"
|
|
if "-----END CERTIFICATE-----" in line.decode("utf-8"):
|
|
certificates.append(current_cert)
|
|
current_cert = ""
|
|
|
|
# Remove anything before -----BEGIN CERTIFICATE-----
|
|
certificates = [cert[cert.find("-----BEGIN CERTIFICATE-----"):] for cert in certificates]
|
|
|
|
if not certificates:
|
|
returns["message"] = "No certificate found on remote webserver"
|
|
return returns
|
|
|
|
cert = certificates[0]
|
|
returns["cert"] = {"cert": cert}
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_cert_file:
|
|
temp_cert_file.write(cert)
|
|
temp_cert_file.seek(0)
|
|
|
|
tlsa_command = ["openssl","x509","-in",temp_cert_file.name,"-pubkey","-noout","|","openssl","pkey","-pubin","-outform","der","|","openssl","dgst","-sha256","-binary",]
|
|
|
|
tlsa_process = subprocess.Popen(" ".join(tlsa_command), shell=True, stdout=subprocess.PIPE)
|
|
tlsa_output, _ = tlsa_process.communicate()
|
|
|
|
tlsa_server = "3 1 1 " + binascii.hexlify(tlsa_output).decode("utf-8")
|
|
|
|
|
|
returns["tlsa"] = {
|
|
"server": tlsa_server,
|
|
"nameserver": "",
|
|
"match": False
|
|
}
|
|
|
|
|
|
# Get domains
|
|
cert_obj = x509.load_pem_x509_certificate(cert.encode("utf-8"), default_backend())
|
|
|
|
domains = []
|
|
for ext in cert_obj.extensions:
|
|
if ext.oid == x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
|
|
san_list = ext.value.get_values_for_type(x509.DNSName)
|
|
domains.extend(san_list)
|
|
|
|
# Extract the common name (CN) from the subject
|
|
common_name = cert_obj.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
|
|
if common_name:
|
|
if common_name[0].value not in domains:
|
|
domains.append(common_name[0].value)
|
|
|
|
|
|
if domains:
|
|
cert_domains = []
|
|
for cn in domains:
|
|
cert_domains.append(cn)
|
|
if cn == domain:
|
|
domain_check = True
|
|
elif cn.startswith("*."):
|
|
if domain.endswith(cn[1:]):
|
|
domain_check = True
|
|
|
|
returns["cert"]["domains"] = cert_domains
|
|
|
|
expiry_date = cert_obj.not_valid_after_utc
|
|
# Check if expiry date is past
|
|
if expiry_date < datetime.datetime.now(datetime.timezone.utc):
|
|
returns["cert"]["expired"] = True
|
|
returns["cert"]["valid"] = False
|
|
|
|
else:
|
|
returns["cert"]["expired"] = False
|
|
returns["cert"]["valid"] = True if domain_check else False
|
|
|
|
returns["cert"]["expiry_date"] = expiry_date.strftime("%d %B %Y %H:%M:%S")
|
|
|
|
|
|
|
|
|
|
try:
|
|
# Check for TLSA record
|
|
response = resolver.resolve("_443._tcp."+domain, "TLSA")
|
|
tlsa_records = []
|
|
for record in response:
|
|
tlsa_records.append(str(record))
|
|
|
|
if not tlsa_records:
|
|
returns["message"] = "No TLSA record found on DNS"
|
|
return returns
|
|
|
|
returns["tlsa"]["nameserver"] = tlsa_records[0]
|
|
if tlsa_server == tlsa_records[0]:
|
|
returns["tlsa"]["match"] = True
|
|
|
|
|
|
except:
|
|
returns["message"] = "No TLSA record found on DNS"
|
|
return returns
|
|
|
|
# Check if valid
|
|
if returns["cert"]["valid"] and returns["tlsa"]["match"]:
|
|
returns["valid"] = True
|
|
|
|
returns["success"] = True
|
|
return returns
|
|
|
|
# Catch all exceptions
|
|
except Exception as e:
|
|
returns["message"] = f"An error occurred: {e}"
|
|
return returns
|
|
|
|
|
|
def validate_dnssec(domain, trace=False):
|
|
"""
|
|
Validate DNSSEC for a given domain by following the chain of trust.
|
|
|
|
Args:
|
|
domain (str): The domain to validate
|
|
trace (bool): Enable detailed tracing of validation steps
|
|
|
|
Returns:
|
|
bool: True if DNSSEC validation succeeds, False otherwise
|
|
dict: Details about the validation process
|
|
"""
|
|
|
|
domain = dns.name.from_text(domain)
|
|
if not domain.is_absolute():
|
|
domain = domain.concatenate(dns.name.root)
|
|
|
|
# Start with root servers
|
|
trusted_keys = get_root_trust_anchors()
|
|
|
|
result = {
|
|
'domain': str(domain),
|
|
'validation_time': time.time(),
|
|
'validated': False,
|
|
'validation_chain': [],
|
|
'errors': []
|
|
}
|
|
|
|
try:
|
|
# Walk the DNS hierarchy to validate the chain
|
|
current = dns.name.root
|
|
while current != domain and current.is_subdomain(domain):
|
|
# Determine the next label in the chain
|
|
next_part = domain.relativize(current).split()[0]
|
|
next_domain = dns.name.Name([next_part]) + current
|
|
|
|
|
|
# Get DNSKEY records for the current zone
|
|
dnskey_response = query_dns(current, dns.rdatatype.DNSKEY)
|
|
dnskey_rrset = extract_rrset_from_response(dnskey_response, current, dns.rdatatype.DNSKEY)
|
|
|
|
if not dnskey_rrset:
|
|
error = f"No DNSKEY found for {current}"
|
|
result['errors'].append(error)
|
|
return False, result
|
|
|
|
# Validate current zone's DNSKEY against trusted keys
|
|
if not validate_rrset(dnskey_rrset, trusted_keys):
|
|
error = f"Failed to validate DNSKEY for {current}"
|
|
result['errors'].append(error)
|
|
return False, result
|
|
|
|
# Get DS records for the next domain
|
|
ds_response = query_dns(current, dns.rdatatype.DS, next_domain)
|
|
ds_rrset = extract_rrset_from_response(ds_response, next_domain, dns.rdatatype.DS)
|
|
|
|
if not ds_rrset:
|
|
error = f"No DS records found for {next_domain}"
|
|
result['errors'].append(error)
|
|
return False, result
|
|
|
|
# Validate DS records with the trusted keys from the parent zone
|
|
if not validate_rrset(ds_rrset, trusted_keys):
|
|
error = f"Failed to validate DS records for {next_domain}"
|
|
result['errors'].append(error)
|
|
return False, result
|
|
|
|
# Now get DNSKEYs for the next domain level
|
|
next_dnskey_response = query_dns(next_domain, dns.rdatatype.DNSKEY)
|
|
next_dnskey_rrset = extract_rrset_from_response(next_dnskey_response, next_domain, dns.rdatatype.DNSKEY)
|
|
|
|
if not next_dnskey_rrset:
|
|
error = f"No DNSKEY found for {next_domain}"
|
|
result['errors'].append(error)
|
|
return False, result
|
|
|
|
# Verify the DNSKEY matches the DS record hash
|
|
if not match_ds_to_dnskey(ds_rrset, next_dnskey_rrset):
|
|
error = f"DS record does not match DNSKEY for {next_domain}"
|
|
result['errors'].append(error)
|
|
return False, result
|
|
|
|
# The next domain's DNSKEYs become our trusted keys
|
|
trusted_keys = next_dnskey_rrset
|
|
current = next_domain
|
|
|
|
result['validation_chain'].append({
|
|
'zone': str(current),
|
|
'validated': True,
|
|
'timestamp': time.time()
|
|
})
|
|
|
|
# Finally, validate the actual records we're interested in (A, AAAA, etc)
|
|
for rdtype in [dns.rdatatype.A, dns.rdatatype.TLSA]:
|
|
if rdtype == dns.rdatatype.TLSA:
|
|
record_response = query_dns(f"_443._tcp.{domain}", rdtype)
|
|
else:
|
|
record_response = query_dns(domain, rdtype)
|
|
if record_response.answer:
|
|
record_rrset = extract_rrset_from_response(record_response, domain, rdtype)
|
|
if record_rrset and not validate_rrset(record_rrset, trusted_keys):
|
|
error = f"Failed to validate {dns.rdatatype.to_text(rdtype)} records for {domain}"
|
|
result['errors'].append(error)
|
|
return False, result
|
|
|
|
result['validated'] = True
|
|
return True, result
|
|
|
|
except Exception as e:
|
|
error = f"Validation error: {str(e)}"
|
|
result['errors'].append(error)
|
|
return False, result
|
|
|
|
|
|
def get_root_trust_anchors():
|
|
"""
|
|
Get the root trust anchors (usually the root DNSKEY)
|
|
In practice, this would be configured or obtained from a trusted source.
|
|
"""
|
|
try:
|
|
# For production use, you should use properly validated root anchors
|
|
# This is a simplified example
|
|
response = resolver.query('.', dns.rdatatype.DNSKEY)
|
|
return response.rrset
|
|
except Exception as e:
|
|
# Fallback to hardcoded root KSK if needed
|
|
# This should be updated when the root KSK changes
|
|
from dns.rdtypes.ANY.DNSKEY import DNSKEY
|
|
from dns.rdata import from_text
|
|
|
|
# Root KSK-2017 (hardcoded as fallback only - you should obtain this securely)
|
|
root_ksk = ". IN DNSKEY 257 3 8 AwEAAaz/tAm8yTn4Mfeh5eyI96WSVexTBAvkMgJzkKTOiW1vkIbzxeF3+/4RgWOq7HrxRixHlFlExOLAJr5emLvN7SWXgnLh4+B5xQlNVz8Og8kvArMtNROxVQuCaSnIDdD5LKyWbRd2n9WGe2R8PzgCmr3EgVLrjyBxWezF0jLHwVN8efS3rCj/EWgvIWgb9tarpVUDK/b58Da+sqqls3eNbuv7pr+eoZG+SrDK6nWeL3c6H5Apxz7LjVc1uTIdsIXxuOLYA4/ilBmSVIzuDWfdRUfhHdY6+cn8HFRm+2hM8AnXGXws9555KrUB5qihylGa8subX2Nn6UwNR1AkUTV74bU="
|
|
|
|
# Create a DNS rrset
|
|
dnskey_rdata = from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, root_ksk)
|
|
rrset = dns.rrset.RRset(dns.name.root, dns.rdataclass.IN, dns.rdatatype.DNSKEY)
|
|
rrset.add(dnskey_rdata)
|
|
return rrset
|
|
|
|
|
|
def query_dns(domain, rdtype, qname=None):
|
|
"""
|
|
Query DNS for specific record types
|
|
|
|
Args:
|
|
domain (dns.name.Name): Domain to query from (name server authority)
|
|
rdtype (int): DNS record type
|
|
qname (dns.name.Name, optional): Specific name to query. If None, uses domain
|
|
|
|
Returns:
|
|
dns.message.Message: DNS response
|
|
"""
|
|
if qname is None:
|
|
qname = domain
|
|
|
|
try:
|
|
# For more control, you could use dns.query directly to a specific server
|
|
response = resolver.query(qname, rdtype, raise_on_no_answer=False)
|
|
return response.response
|
|
except dns.resolver.NXDOMAIN:
|
|
# Create an empty response with NXDOMAIN code
|
|
response = dns.message.make_response(dns.message.make_query(qname, rdtype))
|
|
response.set_rcode(dns.rcode.NXDOMAIN)
|
|
return response
|
|
except Exception as e:
|
|
# Return empty response
|
|
response = dns.message.make_response(dns.message.make_query(qname, rdtype))
|
|
response.set_rcode(dns.rcode.SERVFAIL)
|
|
return response
|
|
|
|
|
|
def extract_rrset_from_response(response, name, rdtype):
|
|
"""
|
|
Extract the RRset from a DNS response
|
|
|
|
Args:
|
|
response (dns.message.Message): DNS response
|
|
name (dns.name.Name): Name of the RRset
|
|
rdtype (int): Record type
|
|
|
|
Returns:
|
|
dns.rrset.RRset or None: The requested RRset or None if not found
|
|
"""
|
|
if response.rcode() != dns.rcode.NOERROR:
|
|
return None
|
|
|
|
for section in [response.answer, response.authority, response.additional]:
|
|
for rrset in section:
|
|
if rrset.name == name and rrset.rdtype == rdtype:
|
|
return rrset
|
|
|
|
return None
|
|
|
|
|
|
def validate_rrset(rrset, keys):
|
|
"""
|
|
Validate a DNS RRset using DNSSEC
|
|
|
|
Args:
|
|
rrset (dns.rrset.RRset): The RRset to validate
|
|
keys (dns.rrset.RRset): The DNSKEY rrset to validate against
|
|
|
|
Returns:
|
|
bool: True if the RRset validates, False otherwise
|
|
"""
|
|
try:
|
|
# Get the RRSIG for this RRset
|
|
for rrsig in [r for r in rrset.rrsigs]:
|
|
# Find the corresponding key
|
|
key = None
|
|
for dnskey in keys:
|
|
if dnskey.key_tag() == rrsig.key_tag:
|
|
key = dnskey
|
|
break
|
|
|
|
if key:
|
|
# Perform the validation
|
|
dns.dnssec.validate_rrsig(rrset, rrsig, {(keys.name, keys.rdtype): keys})
|
|
return True
|
|
return False
|
|
except dns.dnssec.ValidationFailure as e:
|
|
return False
|
|
except Exception as e:
|
|
return False
|
|
|
|
|
|
def match_ds_to_dnskey(ds_rrset, dnskey_rrset):
|
|
"""
|
|
Verify that a DS record matches a DNSKEY
|
|
|
|
Args:
|
|
ds_rrset (dns.rrset.RRset): DS record set
|
|
dnskey_rrset (dns.rrset.RRset): DNSKEY record set
|
|
|
|
Returns:
|
|
bool: True if any DS matches any DNSKEY, False otherwise
|
|
"""
|
|
for ds in ds_rrset:
|
|
for dnskey in dnskey_rrset:
|
|
# Calculate the DS from the DNSKEY
|
|
calculated_ds = dns.dnssec.make_ds(dnskey_rrset.name, dnskey, ds.digest_type)
|
|
|
|
# Compare the calculated DS to the actual DS
|
|
if (ds.key_tag == calculated_ds.key_tag and
|
|
ds.algorithm == calculated_ds.algorithm and
|
|
ds.digest_type == calculated_ds.digest_type and
|
|
ds.digest == calculated_ds.digest):
|
|
return True
|
|
|
|
return False
|
|
|