import dns.resolver import subprocess import tempfile import binascii from cryptography import x509 from cryptography.hazmat.backends import default_backend import datetime from dns import resolver, dnssec, name, exception import time resolver = dns.resolver.Resolver() resolver.nameservers = ["194.50.5.28","194.50.5.27","194.50.5.26"] resolver.port = 53 def check_ssl(domain: str): domain_check = False returns = {"success": False,"valid":False} try: # Query the DNS record response = resolver.resolve(domain, "A") records = [] for record in response: records.append(str(record)) if not records: return {"success": False, "message": "No A record found for " + domain} returns["ip"] = records[0] if len(records) > 1: returns["other_ips"] = records[1:] result, details = validate_dnssec(domain, trace=True) returns["dnssec"] = details returns["dnssec"]["valid"] = result # Get the first A record ip = records[0] # Run the openssl s_client command s_client_command = ["openssl","s_client","-showcerts","-connect",f"{ip}:443","-servername",domain,] s_client_process = subprocess.Popen(s_client_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) s_client_output, _ = s_client_process.communicate(input=b"\n") certificates = [] current_cert = "" for line in s_client_output.split(b"\n"): current_cert += line.decode("utf-8") + "\n" if "-----END CERTIFICATE-----" in line.decode("utf-8"): certificates.append(current_cert) current_cert = "" # Remove anything before -----BEGIN CERTIFICATE----- certificates = [cert[cert.find("-----BEGIN CERTIFICATE-----"):] for cert in certificates] if not certificates: returns["message"] = "No certificate found on remote webserver" return returns cert = certificates[0] returns["cert"] = {"cert": cert} with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_cert_file: temp_cert_file.write(cert) temp_cert_file.seek(0) tlsa_command = ["openssl","x509","-in",temp_cert_file.name,"-pubkey","-noout","|","openssl","pkey","-pubin","-outform","der","|","openssl","dgst","-sha256","-binary",] tlsa_process = subprocess.Popen(" ".join(tlsa_command), shell=True, stdout=subprocess.PIPE) tlsa_output, _ = tlsa_process.communicate() tlsa_server = "3 1 1 " + binascii.hexlify(tlsa_output).decode("utf-8") returns["tlsa"] = { "server": tlsa_server, "nameserver": "", "match": False } # Get domains cert_obj = x509.load_pem_x509_certificate(cert.encode("utf-8"), default_backend()) domains = [] for ext in cert_obj.extensions: if ext.oid == x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME: san_list = ext.value.get_values_for_type(x509.DNSName) domains.extend(san_list) # Extract the common name (CN) from the subject common_name = cert_obj.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) if common_name: if common_name[0].value not in domains: domains.append(common_name[0].value) if domains: cert_domains = [] for cn in domains: cert_domains.append(cn) if cn == domain: domain_check = True elif cn.startswith("*."): if domain.endswith(cn[1:]): domain_check = True returns["cert"]["domains"] = cert_domains returns["cert"]["domain"] = domain_check expiry_date = cert_obj.not_valid_after_utc # Check if expiry date is past if expiry_date < datetime.datetime.now(datetime.timezone.utc): returns["cert"]["expired"] = True returns["cert"]["valid"] = False else: returns["cert"]["expired"] = False returns["cert"]["valid"] = True if domain_check else False returns["cert"]["expiry_date"] = expiry_date.strftime("%d %B %Y %H:%M:%S") try: # Check for TLSA record response = resolver.resolve("_443._tcp."+domain, "TLSA") tlsa_records = [] for record in response: tlsa_records.append(str(record)) if not tlsa_records: returns["message"] = "No TLSA record found on DNS" return returns returns["tlsa"]["nameserver"] = tlsa_records[0] if tlsa_server == tlsa_records[0]: returns["tlsa"]["match"] = True except: returns["message"] = "No TLSA record found on DNS" return returns # Check if valid if returns["cert"]["valid"] and returns["tlsa"]["match"]: returns["valid"] = True returns["success"] = True return returns # Catch all exceptions except Exception as e: returns["message"] = f"An error occurred: {e}" return returns def validate_dnssec(domain, trace=False): """ Validate DNSSEC for a given domain by following the chain of trust. Args: domain (str): The domain to validate trace (bool): Enable detailed tracing of validation steps Returns: bool: True if DNSSEC validation succeeds, False otherwise dict: Details about the validation process """ domain = dns.name.from_text(domain) if not domain.is_absolute(): domain = domain.concatenate(dns.name.root) # Start with root servers trusted_keys = get_root_trust_anchors() result = { 'domain': str(domain), 'validation_time': time.time(), 'validated': False, 'validation_chain': [], 'errors': [] } try: # Walk the DNS hierarchy to validate the chain current = dns.name.root while current != domain and current.is_subdomain(domain): # Determine the next label in the chain next_part = domain.relativize(current).split()[0] next_domain = dns.name.Name([next_part]) + current # Get DNSKEY records for the current zone dnskey_response = query_dns(current, dns.rdatatype.DNSKEY) dnskey_rrset = extract_rrset_from_response(dnskey_response, current, dns.rdatatype.DNSKEY) if not dnskey_rrset: error = f"No DNSKEY found for {current}" result['errors'].append(error) return False, result # Validate current zone's DNSKEY against trusted keys if not validate_rrset(dnskey_rrset, trusted_keys): error = f"Failed to validate DNSKEY for {current}" result['errors'].append(error) return False, result # Get DS records for the next domain ds_response = query_dns(current, dns.rdatatype.DS, next_domain) ds_rrset = extract_rrset_from_response(ds_response, next_domain, dns.rdatatype.DS) if not ds_rrset: error = f"No DS records found for {next_domain}" result['errors'].append(error) return False, result # Validate DS records with the trusted keys from the parent zone if not validate_rrset(ds_rrset, trusted_keys): error = f"Failed to validate DS records for {next_domain}" result['errors'].append(error) return False, result # Now get DNSKEYs for the next domain level next_dnskey_response = query_dns(next_domain, dns.rdatatype.DNSKEY) next_dnskey_rrset = extract_rrset_from_response(next_dnskey_response, next_domain, dns.rdatatype.DNSKEY) if not next_dnskey_rrset: error = f"No DNSKEY found for {next_domain}" result['errors'].append(error) return False, result # Verify the DNSKEY matches the DS record hash if not match_ds_to_dnskey(ds_rrset, next_dnskey_rrset): error = f"DS record does not match DNSKEY for {next_domain}" result['errors'].append(error) return False, result # The next domain's DNSKEYs become our trusted keys trusted_keys = next_dnskey_rrset current = next_domain result['validation_chain'].append({ 'zone': str(current), 'validated': True, 'timestamp': time.time() }) # Finally, validate the actual records we're interested in (A, AAAA, etc) for rdtype in [dns.rdatatype.A, dns.rdatatype.TLSA]: if rdtype == dns.rdatatype.TLSA: record_response = query_dns(f"_443._tcp.{domain}", rdtype) else: record_response = query_dns(domain, rdtype) if record_response.answer: record_rrset = extract_rrset_from_response(record_response, domain, rdtype) if record_rrset and not validate_rrset(record_rrset, trusted_keys): error = f"Failed to validate {dns.rdatatype.to_text(rdtype)} records for {domain}" result['errors'].append(error) return False, result result['validated'] = True return True, result except Exception as e: error = f"Validation error: {str(e)}" result['errors'].append(error) return False, result def get_root_trust_anchors(): """ Get the root trust anchors (usually the root DNSKEY) In practice, this would be configured or obtained from a trusted source. """ try: # For production use, you should use properly validated root anchors # This is a simplified example response = resolver.query('.', dns.rdatatype.DNSKEY) return response.rrset except Exception as e: # Fallback to hardcoded root KSK if needed # This should be updated when the root KSK changes from dns.rdtypes.ANY.DNSKEY import DNSKEY from dns.rdata import from_text # Root KSK-2017 (hardcoded as fallback only - you should obtain this securely) root_ksk = ". IN DNSKEY 257 3 8 AwEAAaz/tAm8yTn4Mfeh5eyI96WSVexTBAvkMgJzkKTOiW1vkIbzxeF3+/4RgWOq7HrxRixHlFlExOLAJr5emLvN7SWXgnLh4+B5xQlNVz8Og8kvArMtNROxVQuCaSnIDdD5LKyWbRd2n9WGe2R8PzgCmr3EgVLrjyBxWezF0jLHwVN8efS3rCj/EWgvIWgb9tarpVUDK/b58Da+sqqls3eNbuv7pr+eoZG+SrDK6nWeL3c6H5Apxz7LjVc1uTIdsIXxuOLYA4/ilBmSVIzuDWfdRUfhHdY6+cn8HFRm+2hM8AnXGXws9555KrUB5qihylGa8subX2Nn6UwNR1AkUTV74bU=" # Create a DNS rrset dnskey_rdata = from_text(dns.rdataclass.IN, dns.rdatatype.DNSKEY, root_ksk) rrset = dns.rrset.RRset(dns.name.root, dns.rdataclass.IN, dns.rdatatype.DNSKEY) rrset.add(dnskey_rdata) return rrset def query_dns(domain, rdtype, qname=None): """ Query DNS for specific record types Args: domain (dns.name.Name): Domain to query from (name server authority) rdtype (int): DNS record type qname (dns.name.Name, optional): Specific name to query. If None, uses domain Returns: dns.message.Message: DNS response """ if qname is None: qname = domain try: # For more control, you could use dns.query directly to a specific server response = resolver.query(qname, rdtype, raise_on_no_answer=False) return response.response except dns.resolver.NXDOMAIN: # Create an empty response with NXDOMAIN code response = dns.message.make_response(dns.message.make_query(qname, rdtype)) response.set_rcode(dns.rcode.NXDOMAIN) return response except Exception as e: # Return empty response response = dns.message.make_response(dns.message.make_query(qname, rdtype)) response.set_rcode(dns.rcode.SERVFAIL) return response def extract_rrset_from_response(response, name, rdtype): """ Extract the RRset from a DNS response Args: response (dns.message.Message): DNS response name (dns.name.Name): Name of the RRset rdtype (int): Record type Returns: dns.rrset.RRset or None: The requested RRset or None if not found """ if response.rcode() != dns.rcode.NOERROR: return None for section in [response.answer, response.authority, response.additional]: for rrset in section: if rrset.name == name and rrset.rdtype == rdtype: return rrset return None def validate_rrset(rrset, keys): """ Validate a DNS RRset using DNSSEC Args: rrset (dns.rrset.RRset): The RRset to validate keys (dns.rrset.RRset): The DNSKEY rrset to validate against Returns: bool: True if the RRset validates, False otherwise """ try: # Get the RRSIG for this RRset for rrsig in [r for r in rrset.rrsigs]: # Find the corresponding key key = None for dnskey in keys: if dnskey.key_tag() == rrsig.key_tag: key = dnskey break if key: # Perform the validation dns.dnssec.validate_rrsig(rrset, rrsig, {(keys.name, keys.rdtype): keys}) return True return False except dns.dnssec.ValidationFailure as e: return False except Exception as e: return False def match_ds_to_dnskey(ds_rrset, dnskey_rrset): """ Verify that a DS record matches a DNSKEY Args: ds_rrset (dns.rrset.RRset): DS record set dnskey_rrset (dns.rrset.RRset): DNSKEY record set Returns: bool: True if any DS matches any DNSKEY, False otherwise """ for ds in ds_rrset: for dnskey in dnskey_rrset: # Calculate the DS from the DNSKEY calculated_ds = dns.dnssec.make_ds(dnskey_rrset.name, dnskey, ds.digest_type) # Compare the calculated DS to the actual DS if (ds.key_tag == calculated_ds.key_tag and ds.algorithm == calculated_ds.algorithm and ds.digest_type == calculated_ds.digest_type and ds.digest == calculated_ds.digest): return True return False