feat: Add custom plugins
All checks were successful
Build Docker / Build Image (push) Successful in 46s
All checks were successful
Build Docker / Build Image (push) Successful in 46s
This commit is contained in:
parent
cfb814d006
commit
2ae618c68a
3
.gitignore
vendored
3
.gitignore
vendored
@ -11,4 +11,5 @@ plugins/signatures.json
|
|||||||
|
|
||||||
.venv/
|
.venv/
|
||||||
|
|
||||||
user_data/
|
user_data/
|
||||||
|
customPlugins/
|
21
main.py
21
main.py
@ -1300,8 +1300,8 @@ def plugins_index():
|
|||||||
wallet_status=account_module.getWalletStatus(),
|
wallet_status=account_module.getWalletStatus(),
|
||||||
plugins=plugins)
|
plugins=plugins)
|
||||||
|
|
||||||
@app.route('/plugin/<plugin>')
|
@app.route('/plugin/<ptype>/<path:plugin>')
|
||||||
def plugin(plugin):
|
def plugin(ptype,plugin):
|
||||||
# Check if the user is logged in
|
# Check if the user is logged in
|
||||||
if request.cookies.get("account") is None:
|
if request.cookies.get("account") is None:
|
||||||
return redirect("/login")
|
return redirect("/login")
|
||||||
@ -1310,7 +1310,10 @@ def plugin(plugin):
|
|||||||
if not account:
|
if not account:
|
||||||
return redirect("/logout")
|
return redirect("/logout")
|
||||||
|
|
||||||
|
plugin = f"{ptype}/{plugin}"
|
||||||
|
|
||||||
if not plugins_module.pluginExists(plugin):
|
if not plugins_module.pluginExists(plugin):
|
||||||
|
print(f"Plugin {plugin} not found")
|
||||||
return redirect("/plugins")
|
return redirect("/plugins")
|
||||||
|
|
||||||
data = plugins_module.getPluginData(plugin)
|
data = plugins_module.getPluginData(plugin)
|
||||||
@ -1330,10 +1333,10 @@ def plugin(plugin):
|
|||||||
wallet_status=account_module.getWalletStatus(),
|
wallet_status=account_module.getWalletStatus(),
|
||||||
name=data['name'],description=data['description'],
|
name=data['name'],description=data['description'],
|
||||||
author=data['author'],version=data['version'],
|
author=data['author'],version=data['version'],
|
||||||
functions=functions,error=error)
|
source=data['source'],functions=functions,error=error)
|
||||||
|
|
||||||
@app.route('/plugin/<plugin>/verify')
|
@app.route('/plugin/<ptype>/<path:plugin>/verify')
|
||||||
def plugin_verify(plugin):
|
def plugin_verify(ptype,plugin):
|
||||||
# Check if the user is logged in
|
# Check if the user is logged in
|
||||||
if request.cookies.get("account") is None:
|
if request.cookies.get("account") is None:
|
||||||
return redirect("/login")
|
return redirect("/login")
|
||||||
@ -1341,6 +1344,8 @@ def plugin_verify(plugin):
|
|||||||
account = account_module.check_account(request.cookies.get("account"))
|
account = account_module.check_account(request.cookies.get("account"))
|
||||||
if not account:
|
if not account:
|
||||||
return redirect("/logout")
|
return redirect("/logout")
|
||||||
|
|
||||||
|
plugin = f"{ptype}/{plugin}"
|
||||||
|
|
||||||
if not plugins_module.pluginExists(plugin):
|
if not plugins_module.pluginExists(plugin):
|
||||||
return redirect("/plugins")
|
return redirect("/plugins")
|
||||||
@ -1352,8 +1357,8 @@ def plugin_verify(plugin):
|
|||||||
|
|
||||||
return redirect("/plugin/" + plugin)
|
return redirect("/plugin/" + plugin)
|
||||||
|
|
||||||
@app.route('/plugin/<plugin>/<function>', methods=["POST"])
|
@app.route('/plugin/<ptype>/<path:plugin>/<function>', methods=["POST"])
|
||||||
def plugin_function(plugin,function):
|
def plugin_function(ptype,plugin,function):
|
||||||
# Check if the user is logged in
|
# Check if the user is logged in
|
||||||
if request.cookies.get("account") is None:
|
if request.cookies.get("account") is None:
|
||||||
return redirect("/login")
|
return redirect("/login")
|
||||||
@ -1362,6 +1367,8 @@ def plugin_function(plugin,function):
|
|||||||
if not account:
|
if not account:
|
||||||
return redirect("/logout")
|
return redirect("/logout")
|
||||||
|
|
||||||
|
plugin = f"{ptype}/{plugin}"
|
||||||
|
|
||||||
if not plugins_module.pluginExists(plugin):
|
if not plugins_module.pluginExists(plugin):
|
||||||
return redirect("/plugins")
|
return redirect("/plugins")
|
||||||
|
|
||||||
|
64
plugin.py
64
plugin.py
@ -3,10 +3,12 @@ import json
|
|||||||
import importlib
|
import importlib
|
||||||
import sys
|
import sys
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
def listPlugins():
|
def listPlugins():
|
||||||
plugins = []
|
plugins = []
|
||||||
|
customPlugins = []
|
||||||
for file in os.listdir("plugins"):
|
for file in os.listdir("plugins"):
|
||||||
if file.endswith(".py"):
|
if file.endswith(".py"):
|
||||||
if file != "main.py":
|
if file != "main.py":
|
||||||
@ -14,9 +16,40 @@ def listPlugins():
|
|||||||
if "info" not in dir(plugin):
|
if "info" not in dir(plugin):
|
||||||
continue
|
continue
|
||||||
details = plugin.info
|
details = plugin.info
|
||||||
details["link"] = file[:-3]
|
details["source"] = "built-in"
|
||||||
|
details["link"] = f"plugins/{file[:-3]}"
|
||||||
plugins.append(details)
|
plugins.append(details)
|
||||||
|
|
||||||
|
# Check for imported plugins
|
||||||
|
if not os.path.exists("user_data/plugins.json"):
|
||||||
|
with open("user_data/plugins.json", "w") as f:
|
||||||
|
json.dump([], f)
|
||||||
|
|
||||||
|
with open("user_data/plugins.json", "r") as f:
|
||||||
|
importurls = json.load(f)
|
||||||
|
|
||||||
|
for importurl in importurls:
|
||||||
|
# Get only repo name
|
||||||
|
importPath = importurl.split("/")[-1].removesuffix(".git")
|
||||||
|
|
||||||
|
# Git clone into customPlugins/<importPath>
|
||||||
|
if not os.path.exists(f"customPlugins/{importPath}"):
|
||||||
|
os.system(f"git clone {importurl} customPlugins/{importPath}")
|
||||||
|
else:
|
||||||
|
os.system(f"cd customPlugins/{importPath} && git pull")
|
||||||
|
|
||||||
|
# Import plugins from customPlugins/<importPath>
|
||||||
|
for file in os.listdir(f"customPlugins/{importPath}"):
|
||||||
|
if file.endswith(".py"):
|
||||||
|
if file != "main.py":
|
||||||
|
plugin = importlib.import_module(f"customPlugins.{importPath}."+file[:-3])
|
||||||
|
if "info" not in dir(plugin):
|
||||||
|
continue
|
||||||
|
details = plugin.info
|
||||||
|
details["source"] = importPath
|
||||||
|
details["link"] = f"customPlugins/{importPath}/{file[:-3]}"
|
||||||
|
plugins.append(details)
|
||||||
|
|
||||||
# Verify plugin signature
|
# Verify plugin signature
|
||||||
signatures = []
|
signatures = []
|
||||||
try:
|
try:
|
||||||
@ -39,10 +72,7 @@ def listPlugins():
|
|||||||
|
|
||||||
|
|
||||||
def pluginExists(plugin: str):
|
def pluginExists(plugin: str):
|
||||||
for file in os.listdir("plugins"):
|
return os.path.exists(plugin+".py")
|
||||||
if file == plugin+".py":
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def verifyPlugin(plugin: str):
|
def verifyPlugin(plugin: str):
|
||||||
@ -66,7 +96,7 @@ def verifyPlugin(plugin: str):
|
|||||||
def hashPlugin(plugin: str):
|
def hashPlugin(plugin: str):
|
||||||
BUF_SIZE = 65536
|
BUF_SIZE = 65536
|
||||||
sha256 = hashlib.sha256()
|
sha256 = hashlib.sha256()
|
||||||
with open("plugins/"+plugin+".py", 'rb') as f:
|
with open(plugin+".py", 'rb') as f:
|
||||||
while True:
|
while True:
|
||||||
data = f.read(BUF_SIZE)
|
data = f.read(BUF_SIZE)
|
||||||
if not data:
|
if not data:
|
||||||
@ -76,7 +106,7 @@ def hashPlugin(plugin: str):
|
|||||||
|
|
||||||
|
|
||||||
def getPluginData(pluginStr: str):
|
def getPluginData(pluginStr: str):
|
||||||
plugin = importlib.import_module("plugins."+pluginStr)
|
plugin = importlib.import_module(pluginStr.replace("/","."))
|
||||||
|
|
||||||
# Check if the plugin is verified
|
# Check if the plugin is verified
|
||||||
signatures = []
|
signatures = []
|
||||||
@ -89,6 +119,18 @@ def getPluginData(pluginStr: str):
|
|||||||
json.dump(signatures, f)
|
json.dump(signatures, f)
|
||||||
|
|
||||||
info = plugin.info
|
info = plugin.info
|
||||||
|
info["source"] = "built-in"
|
||||||
|
|
||||||
|
# Check if the plugin is in customPlugins
|
||||||
|
if pluginStr.startswith("customPlugins"):
|
||||||
|
# Get git url for dir
|
||||||
|
print(f"cd customPlugins/{pluginStr.split('/')[-2]} && git remote get-url origin")
|
||||||
|
url = subprocess.check_output(f"cd customPlugins/{pluginStr.split('/')[-2]} && git remote get-url origin", shell=True).decode("utf-8").strip()
|
||||||
|
info["source"] = url
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Hash the plugin file
|
# Hash the plugin file
|
||||||
pluginHash = hashPlugin(pluginStr)
|
pluginHash = hashPlugin(pluginStr)
|
||||||
if pluginHash not in signatures:
|
if pluginHash not in signatures:
|
||||||
@ -100,12 +142,12 @@ def getPluginData(pluginStr: str):
|
|||||||
|
|
||||||
|
|
||||||
def getPluginFunctions(plugin: str):
|
def getPluginFunctions(plugin: str):
|
||||||
plugin = importlib.import_module("plugins."+plugin)
|
plugin = importlib.import_module(plugin.replace("/","."))
|
||||||
return plugin.functions
|
return plugin.functions
|
||||||
|
|
||||||
|
|
||||||
def runPluginFunction(plugin: str, function: str, params: dict, authentication: str):
|
def runPluginFunction(plugin: str, function: str, params: dict, authentication: str):
|
||||||
plugin_module = importlib.import_module("plugins."+plugin)
|
plugin_module = importlib.import_module(plugin.replace("/","."))
|
||||||
if function not in plugin_module.functions:
|
if function not in plugin_module.functions:
|
||||||
return {"error": "Function not found"}
|
return {"error": "Function not found"}
|
||||||
|
|
||||||
@ -141,12 +183,12 @@ def runPluginFunction(plugin: str, function: str, params: dict, authentication:
|
|||||||
|
|
||||||
|
|
||||||
def getPluginFunctionInputs(plugin: str, function: str):
|
def getPluginFunctionInputs(plugin: str, function: str):
|
||||||
plugin = importlib.import_module("plugins."+plugin)
|
plugin = importlib.import_module(plugin.replace("/","."))
|
||||||
return plugin.functions[function]["params"]
|
return plugin.functions[function]["params"]
|
||||||
|
|
||||||
|
|
||||||
def getPluginFunctionReturns(plugin: str, function: str):
|
def getPluginFunctionReturns(plugin: str, function: str):
|
||||||
plugin = importlib.import_module("plugins."+plugin)
|
plugin = importlib.import_module(plugin.replace("/","."))
|
||||||
return plugin.functions[function]["returns"]
|
return plugin.functions[function]["returns"]
|
||||||
|
|
||||||
|
|
||||||
|
114
plugins/customPlugins.py
Normal file
114
plugins/customPlugins.py
Normal file
@ -0,0 +1,114 @@
|
|||||||
|
import json
|
||||||
|
import account
|
||||||
|
import requests
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Plugin Data
|
||||||
|
info = {
|
||||||
|
"name": "Custom Plugin Manager",
|
||||||
|
"description": "Import custom plugins from git repositories",
|
||||||
|
"version": "1.0",
|
||||||
|
"author": "Nathan.Woodburn/"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Functions
|
||||||
|
functions = {
|
||||||
|
"add":{
|
||||||
|
"name": "Add Plugin repo",
|
||||||
|
"type": "default",
|
||||||
|
"description": "Add a plugin repo",
|
||||||
|
"params": {
|
||||||
|
"url": {
|
||||||
|
"name":"URL",
|
||||||
|
"type":"text"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"returns": {
|
||||||
|
"status":
|
||||||
|
{
|
||||||
|
"name": "Status of the function",
|
||||||
|
"type": "text"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"remove":{
|
||||||
|
"name": "Remove Plugins",
|
||||||
|
"type": "default",
|
||||||
|
"description": "Remove a plugin repo from the list",
|
||||||
|
"params": {
|
||||||
|
"url": {
|
||||||
|
"name":"URL",
|
||||||
|
"type":"text"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"returns": {
|
||||||
|
"status":
|
||||||
|
{
|
||||||
|
"name": "Status of the function",
|
||||||
|
"type": "text"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"list":{
|
||||||
|
"name": "List Plugins",
|
||||||
|
"type": "default",
|
||||||
|
"description": "List all imported plugins",
|
||||||
|
"params": {},
|
||||||
|
"returns": {
|
||||||
|
"plugins":
|
||||||
|
{
|
||||||
|
"name": "List of plugins",
|
||||||
|
"type": "list"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def add(params, authentication):
|
||||||
|
url = params["url"]
|
||||||
|
if not os.path.exists("user_data/plugins.json"):
|
||||||
|
with open("user_data/plugins.json", "w") as f:
|
||||||
|
json.dump([], f)
|
||||||
|
|
||||||
|
with open("user_data/plugins.json", "r") as f:
|
||||||
|
importurls = json.load(f)
|
||||||
|
|
||||||
|
# Check if the plugin is already imported
|
||||||
|
if url in importurls:
|
||||||
|
return {"status": "Plugin already imported"}
|
||||||
|
|
||||||
|
importurls.append(url)
|
||||||
|
with open("user_data/plugins.json", "w") as f:
|
||||||
|
json.dump(importurls, f)
|
||||||
|
|
||||||
|
return {"status": "Imported"}
|
||||||
|
|
||||||
|
|
||||||
|
def remove(params, authentication):
|
||||||
|
url = params["url"]
|
||||||
|
if not os.path.exists("user_data/plugins.json"):
|
||||||
|
with open("user_data/plugins.json", "w") as f:
|
||||||
|
json.dump([], f)
|
||||||
|
|
||||||
|
with open("user_data/plugins.json", "r") as f:
|
||||||
|
importurls = json.load(f)
|
||||||
|
|
||||||
|
# Check if the plugin is already imported
|
||||||
|
if url not in importurls:
|
||||||
|
return {"status": "Plugin not imported"}
|
||||||
|
|
||||||
|
importurls.remove(url)
|
||||||
|
with open("user_data/plugins.json", "w") as f:
|
||||||
|
json.dump(importurls, f)
|
||||||
|
|
||||||
|
return {"status": "Removed"}
|
||||||
|
|
||||||
|
def list(params, authentication):
|
||||||
|
if not os.path.exists("user_data/plugins.json"):
|
||||||
|
with open("user_data/plugins.json", "w") as f:
|
||||||
|
json.dump([], f)
|
||||||
|
|
||||||
|
with open("user_data/plugins.json", "r") as f:
|
||||||
|
importurls = json.load(f)
|
||||||
|
|
||||||
|
return {"plugins": importurls}
|
@ -1,41 +0,0 @@
|
|||||||
import json
|
|
||||||
import account
|
|
||||||
import requests
|
|
||||||
|
|
||||||
# Plugin Data
|
|
||||||
info = {
|
|
||||||
"name": "Public Node Dashboard",
|
|
||||||
"description": "Dashboard modules for public nodes",
|
|
||||||
"version": "1.0",
|
|
||||||
"author": "Nathan.Woodburn/"
|
|
||||||
}
|
|
||||||
|
|
||||||
# Functions
|
|
||||||
functions = {
|
|
||||||
"main":{
|
|
||||||
"name": "Info Dashboard widget",
|
|
||||||
"type": "dashboard",
|
|
||||||
"description": "This creates the widget that shows on the dashboard",
|
|
||||||
"params": {},
|
|
||||||
"returns": {
|
|
||||||
"status":
|
|
||||||
{
|
|
||||||
"name": "Status of Node",
|
|
||||||
"type": "text"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def main(params, authentication):
|
|
||||||
info = account.hsd.getInfo()
|
|
||||||
|
|
||||||
status = f"Version: {info['version']}<br>Inbound Connections: {info['pool']['inbound']}<br>Outbound Connections: {info['pool']['outbound']}<br>"
|
|
||||||
if info['pool']['public']['listen']:
|
|
||||||
status += f"Public Node: Yes<br>Host: {info['pool']['public']['host']}<br>Port: {info['pool']['public']['port']}<br>"
|
|
||||||
else:
|
|
||||||
status += f"Public Node: No<br>"
|
|
||||||
status += f"Agent: {info['pool']['agent']}<br>Services: {info['pool']['services']}<br>"
|
|
||||||
|
|
||||||
return {"status": status}
|
|
||||||
|
|
@ -1,237 +0,0 @@
|
|||||||
import json
|
|
||||||
import account
|
|
||||||
import requests
|
|
||||||
|
|
||||||
import dns.resolver
|
|
||||||
import dns.message
|
|
||||||
import dns.query
|
|
||||||
import dns.rdatatype
|
|
||||||
import dns.rrset
|
|
||||||
from cryptography import x509
|
|
||||||
from cryptography.hazmat.backends import default_backend
|
|
||||||
import tempfile
|
|
||||||
import subprocess
|
|
||||||
import binascii
|
|
||||||
import datetime
|
|
||||||
import dns.asyncresolver
|
|
||||||
import httpx
|
|
||||||
from requests_doh import DNSOverHTTPSSession, add_dns_provider
|
|
||||||
import domainLookup
|
|
||||||
|
|
||||||
doh_url = "https://hnsdoh.com/dns-query"
|
|
||||||
|
|
||||||
# Plugin Data
|
|
||||||
info = {
|
|
||||||
"name": "Troubleshooting",
|
|
||||||
"description": "Various troubleshooting functions",
|
|
||||||
"version": "1.0",
|
|
||||||
"author": "Nathan.Woodburn/"
|
|
||||||
}
|
|
||||||
|
|
||||||
# Functions
|
|
||||||
functions = {
|
|
||||||
"dig":{
|
|
||||||
"name": "DNS Lookup",
|
|
||||||
"type": "default",
|
|
||||||
"description": "Do DNS lookups on a domain",
|
|
||||||
"params": {
|
|
||||||
"domain": {
|
|
||||||
"name":"Domain to lookup (eg. woodburn)",
|
|
||||||
"type":"text"
|
|
||||||
},
|
|
||||||
"type": {
|
|
||||||
"name":"Type of lookup (A,TXT,NS,DS,TLSA)",
|
|
||||||
"type":"text"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"returns": {
|
|
||||||
"result":
|
|
||||||
{
|
|
||||||
"name": "Result",
|
|
||||||
"type": "list"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"https_check":{
|
|
||||||
"name": "HTTPS Check",
|
|
||||||
"type": "default",
|
|
||||||
"description": "Check if a domain has an HTTPS certificate",
|
|
||||||
"params": {
|
|
||||||
"domain": {
|
|
||||||
"name":"Domain to lookup (eg. woodburn)",
|
|
||||||
"type":"text"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"returns": {
|
|
||||||
"result":
|
|
||||||
{
|
|
||||||
"name": "Result",
|
|
||||||
"type": "text"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"hip_lookup": {
|
|
||||||
"name": "Hip Lookup",
|
|
||||||
"type": "default",
|
|
||||||
"description": "Look up a domain's hip address",
|
|
||||||
"params": {
|
|
||||||
"domain": {
|
|
||||||
"name": "Domain to lookup",
|
|
||||||
"type": "text"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"returns": {
|
|
||||||
"result": {
|
|
||||||
"name": "Result",
|
|
||||||
"type": "text"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def dns_request(domain: str, rType:str) -> list[dns.rrset.RRset]:
|
|
||||||
if rType == "":
|
|
||||||
rType = "A"
|
|
||||||
rType = dns.rdatatype.from_text(rType.upper())
|
|
||||||
|
|
||||||
|
|
||||||
with httpx.Client() as client:
|
|
||||||
q = dns.message.make_query(domain, rType)
|
|
||||||
r = dns.query.https(q, doh_url, session=client)
|
|
||||||
return r.answer
|
|
||||||
|
|
||||||
|
|
||||||
def dig(params, authentication):
|
|
||||||
domain = params["domain"]
|
|
||||||
type = params["type"]
|
|
||||||
result: list[dns.rrset.RRset] = dns_request(domain, type)
|
|
||||||
print(result)
|
|
||||||
if result:
|
|
||||||
if len(result) == 1:
|
|
||||||
result: dns.rrset.RRset = result[0]
|
|
||||||
result = result.items
|
|
||||||
return {"result": result}
|
|
||||||
|
|
||||||
else:
|
|
||||||
return {"result": result}
|
|
||||||
else:
|
|
||||||
return {"result": ["No result"]}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def https_check(params, authentication):
|
|
||||||
domain = params["domain"]
|
|
||||||
domain_check = False
|
|
||||||
try:
|
|
||||||
# Get the IP
|
|
||||||
ip = list(dns_request(domain,"A")[0].items.keys())
|
|
||||||
if len(ip) == 0:
|
|
||||||
return {"result": "No IP found"}
|
|
||||||
ip = ip[0]
|
|
||||||
print(ip)
|
|
||||||
|
|
||||||
# Run the openssl s_client command
|
|
||||||
s_client_command = ["openssl","s_client","-showcerts","-connect",f"{ip}:443","-servername",domain,]
|
|
||||||
|
|
||||||
s_client_process = subprocess.Popen(s_client_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
|
|
||||||
s_client_output, _ = s_client_process.communicate(input=b"\n")
|
|
||||||
|
|
||||||
certificates = []
|
|
||||||
current_cert = ""
|
|
||||||
for line in s_client_output.split(b"\n"):
|
|
||||||
current_cert += line.decode("utf-8") + "\n"
|
|
||||||
if "-----END CERTIFICATE-----" in line.decode("utf-8"):
|
|
||||||
certificates.append(current_cert)
|
|
||||||
current_cert = ""
|
|
||||||
|
|
||||||
# Remove anything before -----BEGIN CERTIFICATE-----
|
|
||||||
certificates = [cert[cert.find("-----BEGIN CERTIFICATE-----"):] for cert in certificates]
|
|
||||||
|
|
||||||
if certificates:
|
|
||||||
cert = certificates[0]
|
|
||||||
|
|
||||||
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_cert_file:
|
|
||||||
temp_cert_file.write(cert)
|
|
||||||
temp_cert_file.seek(0) # Move back to the beginning of the temporary file
|
|
||||||
|
|
||||||
tlsa_command = ["openssl","x509","-in",temp_cert_file.name,"-pubkey","-noout","|","openssl","pkey","-pubin","-outform","der","|","openssl","dgst","-sha256","-binary",]
|
|
||||||
|
|
||||||
tlsa_process = subprocess.Popen(" ".join(tlsa_command), shell=True, stdout=subprocess.PIPE)
|
|
||||||
tlsa_output, _ = tlsa_process.communicate()
|
|
||||||
|
|
||||||
tlsa_server = "3 1 1 " + binascii.hexlify(tlsa_output).decode("utf-8")
|
|
||||||
print(f"TLSA Server: {tlsa_server}")
|
|
||||||
|
|
||||||
|
|
||||||
# Get domains
|
|
||||||
cert_obj = x509.load_pem_x509_certificate(cert.encode("utf-8"), default_backend())
|
|
||||||
|
|
||||||
domains = []
|
|
||||||
for ext in cert_obj.extensions:
|
|
||||||
if ext.oid == x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
|
|
||||||
san_list = ext.value.get_values_for_type(x509.DNSName)
|
|
||||||
domains.extend(san_list)
|
|
||||||
|
|
||||||
# Extract the common name (CN) from the subject
|
|
||||||
common_name = cert_obj.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
|
|
||||||
if common_name:
|
|
||||||
if common_name[0].value not in domains:
|
|
||||||
domains.append(common_name[0].value)
|
|
||||||
|
|
||||||
if domains:
|
|
||||||
if domain in domains:
|
|
||||||
domain_check = True
|
|
||||||
else:
|
|
||||||
# Check if matching wildcard domain exists
|
|
||||||
for d in domains:
|
|
||||||
if d.startswith("*"):
|
|
||||||
if domain.split(".")[1:] == d.split(".")[1:]:
|
|
||||||
domain_check = True
|
|
||||||
break
|
|
||||||
|
|
||||||
|
|
||||||
expiry_date = cert_obj.not_valid_after
|
|
||||||
# Check if expiry date is past
|
|
||||||
if expiry_date < datetime.datetime.now():
|
|
||||||
return {"result": "Certificate is expired"}
|
|
||||||
else:
|
|
||||||
return {"result": "No certificate found"}
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Check for TLSA record
|
|
||||||
tlsa = dns_request(f"_443._tcp.{domain}","TLSA")
|
|
||||||
tlsa = list(tlsa[0].items.keys())
|
|
||||||
if len(tlsa) == 0:
|
|
||||||
return {"result": "No TLSA record found"}
|
|
||||||
tlsa = tlsa[0]
|
|
||||||
print(f"TLSA: {tlsa}")
|
|
||||||
|
|
||||||
if not tlsa:
|
|
||||||
return {"result": "TLSA lookup failed"}
|
|
||||||
else:
|
|
||||||
if tlsa_server == str(tlsa):
|
|
||||||
if domain_check:
|
|
||||||
add_dns_provider("HNSDoH", "https://hnsdoh.com/dns-query")
|
|
||||||
|
|
||||||
session = DNSOverHTTPSSession("HNSDoH")
|
|
||||||
r = session.get(f"https://{domain}/",verify=False)
|
|
||||||
if r.status_code != 200:
|
|
||||||
return {"result": "Webserver returned status code: " + str(r.status_code)}
|
|
||||||
return {"result": "HTTPS check successful"}
|
|
||||||
else:
|
|
||||||
return {"result": "TLSA record matches certificate, but domain does not match certificate"}
|
|
||||||
|
|
||||||
else:
|
|
||||||
return {"result": "TLSA record does not match certificate"}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
return {"result": "TLSA lookup failed with error: " + str(e)}
|
|
||||||
|
|
||||||
# Catch all exceptions
|
|
||||||
except Exception as e:
|
|
||||||
return {"result": "Lookup failed.<br><br>Error: " + str(e)}
|
|
||||||
|
|
||||||
def hip_lookup(params, authentication):
|
|
||||||
domain = params["domain"]
|
|
||||||
hip = domainLookup.hip2(domain)
|
|
||||||
return {"result": hip}
|
|
@ -67,7 +67,7 @@
|
|||||||
<div class="container-fluid" style="margin-bottom: 20px;">
|
<div class="container-fluid" style="margin-bottom: 20px;">
|
||||||
<h3 class="text-dark mb-1">{{name}}</h3>
|
<h3 class="text-dark mb-1">{{name}}</h3>
|
||||||
<h4 class="text-dark mb-1">{{description}}</h4>
|
<h4 class="text-dark mb-1">{{description}}</h4>
|
||||||
<h6 class="text-dark mb-1">Author: {{author}}<br>Version: {{version}}</h6>{{functions|safe}}
|
<h6 class="text-dark mb-1">Author: {{author}}<br>Version: {{version}}<br>Source: {{source}}</h6>{{functions|safe}}
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
<footer class="sticky-footer" style="background: var(--bs-primary-text-emphasis);">
|
<footer class="sticky-footer" style="background: var(--bs-primary-text-emphasis);">
|
||||||
|
Loading…
Reference in New Issue
Block a user