feat: Add inital plugins

This commit is contained in:
Nathan Woodburn 2025-01-28 16:49:46 +11:00
commit a96907462c
Signed by: nathanwoodburn
GPG Key ID: 203B000478AD0EF1
4 changed files with 351 additions and 0 deletions

3
README.md Normal file
View File

@ -0,0 +1,3 @@
# Some additional plugins for FireWallet
These plugins aren't included by default as there are already a lot of plugins included

70
clean-list.py Normal file
View File

@ -0,0 +1,70 @@
import json
import account
import requests
# Plugin Data
info = {
"name": "Clean Domain List",
"description": "Remove existing domains from domain list",
"version": "1.0",
"author": "Nathan.Woodburn/"
}
# Functions
functions = {
"main":{
"name": "Clean list",
"type": "default",
"description": "Paste list here and clean",
"params": {
"list": {
"name": "List",
"type": "longText",
}
},
"returns": {
"status":
{
"name": "Status of the function",
"type": "text"
},
"domains": {
"name": "List of domains",
"type": "text"
},
"removed": {
"name": "Removed domains",
"type": "list"
}
}
}
}
def main(params, authentication):
domains = params["list"].split("\n")
domains = [domain.strip() for domain in domains]
domains = [domain for domain in domains if domain != ""]
finalList = []
removedList = []
for domain in domains:
if domain not in finalList:
# Check if domain exists already
info = account.getDomain(domain)
if 'error' in info:
removedList.append(domain)
print(f"Domain {domain} error")
continue
if 'info' in info:
if info['info'] is not None:
removedList.append(domain)
print(f"Domain {domain} has info")
continue
finalList.append(domain)
finalList = "<br>".join(finalList)
removed = [f"<div class='domain'>Domain: {domain}&nbsp;&nbsp;Info: {account.getDomain(domain)}</div>" for domain in removedList]
return {"status": "Success","domains": finalList, "removed": removed}

41
public_info.py Normal file
View File

@ -0,0 +1,41 @@
import json
import account
import requests
# Plugin Data
info = {
"name": "Public Node Dashboard",
"description": "Dashboard modules for public nodes",
"version": "1.0",
"author": "Nathan.Woodburn/"
}
# Functions
functions = {
"main":{
"name": "Info Dashboard widget",
"type": "dashboard",
"description": "This creates the widget that shows on the dashboard",
"params": {},
"returns": {
"status":
{
"name": "Status of Node",
"type": "text"
}
}
}
}
def main(params, authentication):
info = account.hsd.getInfo()
status = f"Version: {info['version']}<br>Inbound Connections: {info['pool']['inbound']}<br>Outbound Connections: {info['pool']['outbound']}<br>"
if info['pool']['public']['listen']:
status += f"Public Node: Yes<br>Host: {info['pool']['public']['host']}<br>Port: {info['pool']['public']['port']}<br>"
else:
status += f"Public Node: No<br>"
status += f"Agent: {info['pool']['agent']}<br>Services: {info['pool']['services']}<br>"
return {"status": status}

237
troubleshooting.py Normal file
View File

@ -0,0 +1,237 @@
import json
import account
import requests
import dns.resolver
import dns.message
import dns.query
import dns.rdatatype
import dns.rrset
from cryptography import x509
from cryptography.hazmat.backends import default_backend
import tempfile
import subprocess
import binascii
import datetime
import dns.asyncresolver
import httpx
from requests_doh import DNSOverHTTPSSession, add_dns_provider
import domainLookup
doh_url = "https://hnsdoh.com/dns-query"
# Plugin Data
info = {
"name": "Troubleshooting",
"description": "Various troubleshooting functions",
"version": "1.0",
"author": "Nathan.Woodburn/"
}
# Functions
functions = {
"dig":{
"name": "DNS Lookup",
"type": "default",
"description": "Do DNS lookups on a domain",
"params": {
"domain": {
"name":"Domain to lookup (eg. woodburn)",
"type":"text"
},
"type": {
"name":"Type of lookup (A,TXT,NS,DS,TLSA)",
"type":"text"
}
},
"returns": {
"result":
{
"name": "Result",
"type": "list"
}
}
},
"https_check":{
"name": "HTTPS Check",
"type": "default",
"description": "Check if a domain has an HTTPS certificate",
"params": {
"domain": {
"name":"Domain to lookup (eg. woodburn)",
"type":"text"
}
},
"returns": {
"result":
{
"name": "Result",
"type": "text"
}
}
},
"hip_lookup": {
"name": "Hip Lookup",
"type": "default",
"description": "Look up a domain's hip address",
"params": {
"domain": {
"name": "Domain to lookup",
"type": "text"
}
},
"returns": {
"result": {
"name": "Result",
"type": "text"
}
}
}
}
def dns_request(domain: str, rType:str) -> list[dns.rrset.RRset]:
if rType == "":
rType = "A"
rType = dns.rdatatype.from_text(rType.upper())
with httpx.Client() as client:
q = dns.message.make_query(domain, rType)
r = dns.query.https(q, doh_url, session=client)
return r.answer
def dig(params, authentication):
domain = params["domain"]
type = params["type"]
result: list[dns.rrset.RRset] = dns_request(domain, type)
print(result)
if result:
if len(result) == 1:
result: dns.rrset.RRset = result[0]
result = result.items
return {"result": result}
else:
return {"result": result}
else:
return {"result": ["No result"]}
def https_check(params, authentication):
domain = params["domain"]
domain_check = False
try:
# Get the IP
ip = list(dns_request(domain,"A")[0].items.keys())
if len(ip) == 0:
return {"result": "No IP found"}
ip = ip[0]
print(ip)
# Run the openssl s_client command
s_client_command = ["openssl","s_client","-showcerts","-connect",f"{ip}:443","-servername",domain,]
s_client_process = subprocess.Popen(s_client_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
s_client_output, _ = s_client_process.communicate(input=b"\n")
certificates = []
current_cert = ""
for line in s_client_output.split(b"\n"):
current_cert += line.decode("utf-8") + "\n"
if "-----END CERTIFICATE-----" in line.decode("utf-8"):
certificates.append(current_cert)
current_cert = ""
# Remove anything before -----BEGIN CERTIFICATE-----
certificates = [cert[cert.find("-----BEGIN CERTIFICATE-----"):] for cert in certificates]
if certificates:
cert = certificates[0]
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_cert_file:
temp_cert_file.write(cert)
temp_cert_file.seek(0) # Move back to the beginning of the temporary file
tlsa_command = ["openssl","x509","-in",temp_cert_file.name,"-pubkey","-noout","|","openssl","pkey","-pubin","-outform","der","|","openssl","dgst","-sha256","-binary",]
tlsa_process = subprocess.Popen(" ".join(tlsa_command), shell=True, stdout=subprocess.PIPE)
tlsa_output, _ = tlsa_process.communicate()
tlsa_server = "3 1 1 " + binascii.hexlify(tlsa_output).decode("utf-8")
print(f"TLSA Server: {tlsa_server}")
# Get domains
cert_obj = x509.load_pem_x509_certificate(cert.encode("utf-8"), default_backend())
domains = []
for ext in cert_obj.extensions:
if ext.oid == x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
san_list = ext.value.get_values_for_type(x509.DNSName)
domains.extend(san_list)
# Extract the common name (CN) from the subject
common_name = cert_obj.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
if common_name:
if common_name[0].value not in domains:
domains.append(common_name[0].value)
if domains:
if domain in domains:
domain_check = True
else:
# Check if matching wildcard domain exists
for d in domains:
if d.startswith("*"):
if domain.split(".")[1:] == d.split(".")[1:]:
domain_check = True
break
expiry_date = cert_obj.not_valid_after
# Check if expiry date is past
if expiry_date < datetime.datetime.now():
return {"result": "Certificate is expired"}
else:
return {"result": "No certificate found"}
try:
# Check for TLSA record
tlsa = dns_request(f"_443._tcp.{domain}","TLSA")
tlsa = list(tlsa[0].items.keys())
if len(tlsa) == 0:
return {"result": "No TLSA record found"}
tlsa = tlsa[0]
print(f"TLSA: {tlsa}")
if not tlsa:
return {"result": "TLSA lookup failed"}
else:
if tlsa_server == str(tlsa):
if domain_check:
add_dns_provider("HNSDoH", "https://hnsdoh.com/dns-query")
session = DNSOverHTTPSSession("HNSDoH")
r = session.get(f"https://{domain}/",verify=False)
if r.status_code != 200:
return {"result": "Webserver returned status code: " + str(r.status_code)}
return {"result": "HTTPS check successful"}
else:
return {"result": "TLSA record matches certificate, but domain does not match certificate"}
else:
return {"result": "TLSA record does not match certificate"}
except Exception as e:
return {"result": "TLSA lookup failed with error: " + str(e)}
# Catch all exceptions
except Exception as e:
return {"result": "Lookup failed.<br><br>Error: " + str(e)}
def hip_lookup(params, authentication):
domain = params["domain"]
hip = domainLookup.hip2(domain)
return {"result": hip}