commit a96907462c8843745ed7ffbb9a21f5ae4f1d49e1 Author: Nathan Woodburn Date: Tue Jan 28 16:49:46 2025 +1100 feat: Add inital plugins diff --git a/README.md b/README.md new file mode 100644 index 0000000..fd460d5 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# Some additional plugins for FireWallet + +These plugins aren't included by default as there are already a lot of plugins included diff --git a/clean-list.py b/clean-list.py new file mode 100644 index 0000000..3bb4fc6 --- /dev/null +++ b/clean-list.py @@ -0,0 +1,70 @@ +import json +import account +import requests + +# Plugin Data +info = { + "name": "Clean Domain List", + "description": "Remove existing domains from domain list", + "version": "1.0", + "author": "Nathan.Woodburn/" +} + +# Functions +functions = { + "main":{ + "name": "Clean list", + "type": "default", + "description": "Paste list here and clean", + "params": { + "list": { + "name": "List", + "type": "longText", + } + }, + "returns": { + "status": + { + "name": "Status of the function", + "type": "text" + }, + "domains": { + "name": "List of domains", + "type": "text" + }, + "removed": { + "name": "Removed domains", + "type": "list" + } + } + } +} + +def main(params, authentication): + domains = params["list"].split("\n") + domains = [domain.strip() for domain in domains] + domains = [domain for domain in domains if domain != ""] + + finalList = [] + removedList = [] + for domain in domains: + if domain not in finalList: + # Check if domain exists already + info = account.getDomain(domain) + if 'error' in info: + removedList.append(domain) + print(f"Domain {domain} error") + continue + if 'info' in info: + if info['info'] is not None: + removedList.append(domain) + print(f"Domain {domain} has info") + continue + finalList.append(domain) + + finalList = "
".join(finalList) + removed = [f"
Domain: {domain}  Info: {account.getDomain(domain)}
" for domain in removedList] + + + return {"status": "Success","domains": finalList, "removed": removed} + \ No newline at end of file diff --git a/public_info.py b/public_info.py new file mode 100644 index 0000000..54012da --- /dev/null +++ b/public_info.py @@ -0,0 +1,41 @@ +import json +import account +import requests + +# Plugin Data +info = { + "name": "Public Node Dashboard", + "description": "Dashboard modules for public nodes", + "version": "1.0", + "author": "Nathan.Woodburn/" +} + +# Functions +functions = { + "main":{ + "name": "Info Dashboard widget", + "type": "dashboard", + "description": "This creates the widget that shows on the dashboard", + "params": {}, + "returns": { + "status": + { + "name": "Status of Node", + "type": "text" + } + } + } +} + +def main(params, authentication): + info = account.hsd.getInfo() + + status = f"Version: {info['version']}
Inbound Connections: {info['pool']['inbound']}
Outbound Connections: {info['pool']['outbound']}
" + if info['pool']['public']['listen']: + status += f"Public Node: Yes
Host: {info['pool']['public']['host']}
Port: {info['pool']['public']['port']}
" + else: + status += f"Public Node: No
" + status += f"Agent: {info['pool']['agent']}
Services: {info['pool']['services']}
" + + return {"status": status} + \ No newline at end of file diff --git a/troubleshooting.py b/troubleshooting.py new file mode 100644 index 0000000..a5325e8 --- /dev/null +++ b/troubleshooting.py @@ -0,0 +1,237 @@ +import json +import account +import requests + +import dns.resolver +import dns.message +import dns.query +import dns.rdatatype +import dns.rrset +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +import tempfile +import subprocess +import binascii +import datetime +import dns.asyncresolver +import httpx +from requests_doh import DNSOverHTTPSSession, add_dns_provider +import domainLookup + +doh_url = "https://hnsdoh.com/dns-query" + +# Plugin Data +info = { + "name": "Troubleshooting", + "description": "Various troubleshooting functions", + "version": "1.0", + "author": "Nathan.Woodburn/" +} + +# Functions +functions = { + "dig":{ + "name": "DNS Lookup", + "type": "default", + "description": "Do DNS lookups on a domain", + "params": { + "domain": { + "name":"Domain to lookup (eg. woodburn)", + "type":"text" + }, + "type": { + "name":"Type of lookup (A,TXT,NS,DS,TLSA)", + "type":"text" + } + }, + "returns": { + "result": + { + "name": "Result", + "type": "list" + } + } + }, + "https_check":{ + "name": "HTTPS Check", + "type": "default", + "description": "Check if a domain has an HTTPS certificate", + "params": { + "domain": { + "name":"Domain to lookup (eg. woodburn)", + "type":"text" + } + }, + "returns": { + "result": + { + "name": "Result", + "type": "text" + } + } + }, + "hip_lookup": { + "name": "Hip Lookup", + "type": "default", + "description": "Look up a domain's hip address", + "params": { + "domain": { + "name": "Domain to lookup", + "type": "text" + } + }, + "returns": { + "result": { + "name": "Result", + "type": "text" + } + } + } +} + +def dns_request(domain: str, rType:str) -> list[dns.rrset.RRset]: + if rType == "": + rType = "A" + rType = dns.rdatatype.from_text(rType.upper()) + + + with httpx.Client() as client: + q = dns.message.make_query(domain, rType) + r = dns.query.https(q, doh_url, session=client) + return r.answer + + +def dig(params, authentication): + domain = params["domain"] + type = params["type"] + result: list[dns.rrset.RRset] = dns_request(domain, type) + print(result) + if result: + if len(result) == 1: + result: dns.rrset.RRset = result[0] + result = result.items + return {"result": result} + + else: + return {"result": result} + else: + return {"result": ["No result"]} + + + +def https_check(params, authentication): + domain = params["domain"] + domain_check = False + try: + # Get the IP + ip = list(dns_request(domain,"A")[0].items.keys()) + if len(ip) == 0: + return {"result": "No IP found"} + ip = ip[0] + print(ip) + + # Run the openssl s_client command + s_client_command = ["openssl","s_client","-showcerts","-connect",f"{ip}:443","-servername",domain,] + + s_client_process = subprocess.Popen(s_client_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) + s_client_output, _ = s_client_process.communicate(input=b"\n") + + certificates = [] + current_cert = "" + for line in s_client_output.split(b"\n"): + current_cert += line.decode("utf-8") + "\n" + if "-----END CERTIFICATE-----" in line.decode("utf-8"): + certificates.append(current_cert) + current_cert = "" + + # Remove anything before -----BEGIN CERTIFICATE----- + certificates = [cert[cert.find("-----BEGIN CERTIFICATE-----"):] for cert in certificates] + + if certificates: + cert = certificates[0] + + with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_cert_file: + temp_cert_file.write(cert) + temp_cert_file.seek(0) # Move back to the beginning of the temporary file + + tlsa_command = ["openssl","x509","-in",temp_cert_file.name,"-pubkey","-noout","|","openssl","pkey","-pubin","-outform","der","|","openssl","dgst","-sha256","-binary",] + + tlsa_process = subprocess.Popen(" ".join(tlsa_command), shell=True, stdout=subprocess.PIPE) + tlsa_output, _ = tlsa_process.communicate() + + tlsa_server = "3 1 1 " + binascii.hexlify(tlsa_output).decode("utf-8") + print(f"TLSA Server: {tlsa_server}") + + + # Get domains + cert_obj = x509.load_pem_x509_certificate(cert.encode("utf-8"), default_backend()) + + domains = [] + for ext in cert_obj.extensions: + if ext.oid == x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME: + san_list = ext.value.get_values_for_type(x509.DNSName) + domains.extend(san_list) + + # Extract the common name (CN) from the subject + common_name = cert_obj.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) + if common_name: + if common_name[0].value not in domains: + domains.append(common_name[0].value) + + if domains: + if domain in domains: + domain_check = True + else: + # Check if matching wildcard domain exists + for d in domains: + if d.startswith("*"): + if domain.split(".")[1:] == d.split(".")[1:]: + domain_check = True + break + + + expiry_date = cert_obj.not_valid_after + # Check if expiry date is past + if expiry_date < datetime.datetime.now(): + return {"result": "Certificate is expired"} + else: + return {"result": "No certificate found"} + + try: + # Check for TLSA record + tlsa = dns_request(f"_443._tcp.{domain}","TLSA") + tlsa = list(tlsa[0].items.keys()) + if len(tlsa) == 0: + return {"result": "No TLSA record found"} + tlsa = tlsa[0] + print(f"TLSA: {tlsa}") + + if not tlsa: + return {"result": "TLSA lookup failed"} + else: + if tlsa_server == str(tlsa): + if domain_check: + add_dns_provider("HNSDoH", "https://hnsdoh.com/dns-query") + + session = DNSOverHTTPSSession("HNSDoH") + r = session.get(f"https://{domain}/",verify=False) + if r.status_code != 200: + return {"result": "Webserver returned status code: " + str(r.status_code)} + return {"result": "HTTPS check successful"} + else: + return {"result": "TLSA record matches certificate, but domain does not match certificate"} + + else: + return {"result": "TLSA record does not match certificate"} + + except Exception as e: + return {"result": "TLSA lookup failed with error: " + str(e)} + + # Catch all exceptions + except Exception as e: + return {"result": "Lookup failed.

Error: " + str(e)} + +def hip_lookup(params, authentication): + domain = params["domain"] + hip = domainLookup.hip2(domain) + return {"result": hip} \ No newline at end of file