240 lines
7.8 KiB
240 lines
7.8 KiB
import json
import account
import requests
import dns.resolver
import dns.message
import dns.query
import dns.rdatatype
import dns.rrset
from cryptography import x509
from cryptography.hazmat.backends import default_backend
import tempfile
import subprocess
import binascii
import datetime
import dns.asyncresolver
import httpx
from requests_doh import DNSOverHTTPSSession, add_dns_provider
import domainLookup
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Disable insecure request warnings (since we are manually verifying the certificate)
doh_url = "https://hnsdoh.com/dns-query"
# Plugin Data
info = {
"name": "Troubleshooting",
"description": "Various troubleshooting functions",
"version": "1.0",
"author": "Nathan.Woodburn/"
# Functions
functions = {
"name": "DNS Lookup",
"type": "default",
"description": "Do DNS lookups on a domain",
"params": {
"domain": {
"name":"Domain to lookup (eg. woodburn)",
"type": {
"name":"Type of lookup (A,TXT,NS,DS,TLSA)",
"returns": {
"name": "Result",
"type": "list"
"name": "HTTPS Check",
"type": "default",
"description": "Check if a domain has an HTTPS certificate",
"params": {
"domain": {
"name":"Domain to lookup (eg. woodburn)",
"returns": {
"name": "Result",
"type": "text"
"hip_lookup": {
"name": "Hip Lookup",
"type": "default",
"description": "Look up a domain's hip address",
"params": {
"domain": {
"name": "Domain to lookup",
"type": "text"
"returns": {
"result": {
"name": "Result",
"type": "text"
def dns_request(domain: str, rType:str) -> list[dns.rrset.RRset]:
if rType == "":
rType = "A"
rType = dns.rdatatype.from_text(rType.upper())
with httpx.Client() as client:
q = dns.message.make_query(domain, rType)
r = dns.query.https(q, doh_url, session=client)
return r.answer
def dig(params, authentication):
domain = params["domain"]
type = params["type"]
result: list[dns.rrset.RRset] = dns_request(domain, type)
if result:
if len(result) == 1:
result: dns.rrset.RRset = result[0]
result = result.items
return {"result": result}
return {"result": result}
return {"result": ["No result"]}
def https_check(params, authentication):
domain = params["domain"]
domain_check = False
# Get the IP
ip = list(dns_request(domain,"A")[0].items.keys())
if len(ip) == 0:
return {"result": "No IP found"}
ip = ip[0]
# Run the openssl s_client command
s_client_command = ["openssl","s_client","-showcerts","-connect",f"{ip}:443","-servername",domain,]
s_client_process = subprocess.Popen(s_client_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
s_client_output, _ = s_client_process.communicate(input=b"\n")
certificates = []
current_cert = ""
for line in s_client_output.split(b"\n"):
current_cert += line.decode("utf-8") + "\n"
if "-----END CERTIFICATE-----" in line.decode("utf-8"):
current_cert = ""
# Remove anything before -----BEGIN CERTIFICATE-----
certificates = [cert[cert.find("-----BEGIN CERTIFICATE-----"):] for cert in certificates]
if certificates:
cert = certificates[0]
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_cert_file:
temp_cert_file.seek(0) # Move back to the beginning of the temporary file
tlsa_command = ["openssl","x509","-in",temp_cert_file.name,"-pubkey","-noout","|","openssl","pkey","-pubin","-outform","der","|","openssl","dgst","-sha256","-binary",]
tlsa_process = subprocess.Popen(" ".join(tlsa_command), shell=True, stdout=subprocess.PIPE)
tlsa_output, _ = tlsa_process.communicate()
tlsa_server = "3 1 1 " + binascii.hexlify(tlsa_output).decode("utf-8")
print(f"TLSA Server: {tlsa_server}")
# Get domains
cert_obj = x509.load_pem_x509_certificate(cert.encode("utf-8"), default_backend())
domains = []
for ext in cert_obj.extensions:
if ext.oid == x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
san_list = ext.value.get_values_for_type(x509.DNSName)
# Extract the common name (CN) from the subject
common_name = cert_obj.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
if common_name:
if common_name[0].value not in domains:
if domains:
if domain in domains:
domain_check = True
# Check if matching wildcard domain exists
for d in domains:
if d.startswith("*"):
if domain.split(".")[1:] == d.split(".")[1:]:
domain_check = True
expiry_date = cert_obj.not_valid_after_utc
# Check if expiry date is past
if expiry_date < datetime.datetime.now(datetime.timezone.utc):
return {"result": "Certificate is expired"}
return {"result": "No certificate found"}
# Check for TLSA record
tlsa = dns_request(f"_443._tcp.{domain}","TLSA")
tlsa = list(tlsa[0].items.keys())
if len(tlsa) == 0:
return {"result": "No TLSA record found"}
tlsa = tlsa[0]
print(f"TLSA: {tlsa}")
if not tlsa:
return {"result": "TLSA lookup failed"}
if tlsa_server == str(tlsa):
if domain_check:
add_dns_provider("HNSDoH", "https://hnsdoh.com/dns-query")
session = DNSOverHTTPSSession("HNSDoH")
r = session.get(f"https://{domain}/",verify=False)
if r.status_code != 200:
return {"result": "Webserver returned status code: " + str(r.status_code)}
return {"result": "HTTPS check successful"}
return {"result": "TLSA record matches certificate, but domain does not match certificate"}
return {"result": "TLSA record does not match certificate"}
except Exception as e:
return {"result": "TLSA lookup failed with error: " + str(e)}
# Catch all exceptions
except Exception as e:
return {"result": "Lookup failed.<br><br>Error: " + str(e)}
def hip_lookup(params, authentication):
domain = params["domain"]
hip = domainLookup.hip2(domain)
return {"result": hip} |