feat: Add latest api route
All checks were successful
Build Docker / BuildImage (push) Successful in 29s

This commit is contained in:
Nathan Woodburn 2024-10-02 14:39:15 +10:00
parent 4d1581c4a0
commit c69cc026a3
Signed by: nathanwoodburn
GPG Key ID: 203B000478AD0EF1
2 changed files with 113 additions and 1 deletions

3
.gitignore vendored
View File

@ -3,4 +3,5 @@ __pycache__/
.env .env
.vs/ .vs/
logs/ logs/
.venv/

111
server.py
View File

@ -700,6 +700,10 @@ def api_index():
], ],
}, },
{"route": "/api/refresh", "description": "Force a status check of all nodes"}, {"route": "/api/refresh", "description": "Force a status check of all nodes"},
{
"route": "/api/latest",
"description": "Get the latest status of all nodes",
}
] ]
if any(agent.lower().find(x) != -1 for x in nonBrowser): if any(agent.lower().find(x) != -1 for x in nonBrowser):
@ -759,6 +763,113 @@ def api_refresh():
node_status = check_nodes() node_status = check_nodes()
return jsonify(node_status) return jsonify(node_status)
@app.route("/api/latest")
def api_errors():
node_status = check_nodes_from_log()
alerts = []
warnings = []
for node in node_status:
node["class"] = "normal"
if not node["plain_dns"]:
node["class"] = "error"
alerts.append(f"{node['name']} does not support plain DNS")
if not node["doh"]:
node["class"] = "error"
alerts.append(f"{node['name']} does not support DoH")
if not node["dot"]:
node["class"] = "error"
alerts.append(f"{node['name']} does not support DoT")
if not node["cert"]["valid"]:
node["class"] = "error"
alerts.append(f"{node['name']} has an invalid certificate")
if not node["cert_853"]["valid"]:
node["class"] = "error"
alerts.append(f"{node['name']} has an invalid certificate on port 853")
cert_expiry = datetime.strptime(
node["cert"]["expiry_date"], "%b %d %H:%M:%S %Y GMT"
)
if cert_expiry < datetime.now():
node["class"] = "error"
alerts.append(f"The {node['name']} node's certificate has expired")
continue
elif cert_expiry < datetime.now() + relativedelta.relativedelta(days=7):
node["class"] = "warning"
warnings.append(
f"The {node['name']} node's certificate is expiring {format_relative_time(cert_expiry)}"
)
continue
cert_853_expiry = datetime.strptime(
node["cert_853"]["expiry_date"], "%b %d %H:%M:%S %Y GMT"
)
if cert_853_expiry < datetime.now():
node["class"] = "error"
alerts.append(
f"The {node['name']} node's certificate has expired for DNS over TLS (port 853)"
)
continue
elif cert_853_expiry < datetime.now() + relativedelta.relativedelta(days=7):
node["class"] = "warning"
warnings.append(
f"The {node['name']} node's certificate is expiring {format_relative_time(cert_853_expiry)} for DNS over TLS (port 853)"
)
history_days = 30
if "history" in request.args:
try:
history_days = int(request.args["history"])
except:
pass
history = get_history(history_days)
history_summary = summarize_history(history)
# Convert time to relative time
for node in history_summary["nodes"]:
for key in ["plain_dns", "doh", "dot"]:
if node[key]["last_down"] == "Never":
node[key]["last_down"] = "over 30 days ago"
else:
node[key]["last_down"] = format_last_check(
datetime.strptime(node[key]["last_down"], "%Y-%m-%d %H:%M:%S")
)
for key in ["plain_dns", "doh", "dot"]:
if history_summary["overall"][key]["last_down"] == "Never":
continue
history_summary["overall"][key]["last_down"] = format_last_check(
datetime.strptime(history_summary["overall"][key]["last_down"], "%Y-%m-%d %H:%M:%S")
)
history_summary["nodes"] = convert_nodes_to_dict(history_summary["nodes"])
last_check = format_last_check(last_log)
# Convert alerts and warnings to a string
alert_string = ""
for alert in alerts:
alert_string += f"{alert}\n"
warning_string = ""
for warning in warnings:
warning_string += f"{warning}\n"
status_string = f"Warnings: {len(warnings)} | Alerts: {len(alerts)}"
if (len(alerts) == 0) and (len(warnings) == 0):
status_string = "HNSDoH is up and running!"
return jsonify({
"warnings": warnings,
"warnings_string": warning_string,
"alerts": alerts,
"alerts_string": alert_string,
"last_check": last_check,
"status_string": status_string,
})
# endregion # endregion