feat: Add fixes for time and alerts
All checks were successful
Build Docker / BuildImage (push) Successful in 37s

This commit is contained in:
Nathan Woodburn 2024-09-12 16:02:37 +10:00
parent 8f950f8de5
commit 95628a75f4
Signed by: nathanwoodburn
GPG Key ID: 203B000478AD0EF1

112
server.py
View File

@ -147,20 +147,21 @@ def build_dns_query(domain: str, qtype: str = "A"):
def check_doh(ip: str) -> bool: def check_doh(ip: str) -> bool:
status = False status = False
dns_query = build_dns_query("2.wdbrn", "TXT") try:
request = ( dns_query = build_dns_query("2.wdbrn", "TXT")
f"POST /dns-query HTTP/1.1\r\n" request = (
f"Host: hnsdoh.com\r\n" f"POST /dns-query HTTP/1.1\r\n"
"Content-Type: application/dns-message\r\n" f"Host: hnsdoh.com\r\n"
f"Content-Length: {len(dns_query)}\r\n" "Content-Type: application/dns-message\r\n"
"Connection: close\r\n" f"Content-Length: {len(dns_query)}\r\n"
"\r\n" "Connection: close\r\n"
) "\r\n"
wireframe_request = request.encode() + dns_query )
sock = socket.create_connection((ip, 443)) wireframe_request = request.encode() + dns_query
context = ssl.create_default_context() sock = socket.create_connection((ip, 443))
ssock = context.wrap_socket(sock, server_hostname="hnsdoh.com") context = ssl.create_default_context()
try: ssock = context.wrap_socket(sock, server_hostname="hnsdoh.com")
ssock.sendall(wireframe_request) ssock.sendall(wireframe_request)
response_data = b"" response_data = b""
while True: while True:
@ -182,7 +183,8 @@ def check_doh(ip: str) -> bool:
finally: finally:
# Close the socket connection # Close the socket connection
ssock.close() if ssock:
ssock.close()
return status return status
@ -246,6 +248,22 @@ def format_relative_time(expiry_date: datetime) -> str:
else: else:
return f"in {delta.seconds} seconds" if delta.seconds > 1 else "in 1 second" return f"in {delta.seconds} seconds" if delta.seconds > 1 else "in 1 second"
def format_last_check(last_log: datetime) -> str:
now = datetime.now()
delta = now - last_log
if delta.days > 0:
return f"{delta.days} days ago" if delta.days > 1 else "1 day ago"
elif delta.days < 0:
return f"in {-delta.days} days" if -delta.days > 1 else "in 1 day"
elif delta.seconds >= 3600:
hours = delta.seconds // 3600
return f"{hours} hours ago" if hours > 1 else "1 hour ago"
elif delta.seconds >= 60:
minutes = delta.seconds // 60
return f"{minutes} minutes ago" if minutes > 1 else "1 minute ago"
else:
return f"{delta.seconds} seconds ago" if delta.seconds > 1 else "1 second ago"
def check_nodes() -> list: def check_nodes() -> list:
global nodes global nodes
@ -306,7 +324,7 @@ def check_nodes() -> list:
for node in node_status: for node in node_status:
if not node["plain_dns"] or not node["doh"] or not node["dot"] or not node["cert"]["valid"] or not node["cert_853"]["valid"]: if not node["plain_dns"] or not node["doh"] or not node["dot"] or not node["cert"]["valid"] or not node["cert_853"]["valid"]:
send_down_notification(node) send_down_notification(node)
break continue
# Check if cert is expiring in 7 days # Check if cert is expiring in 7 days
cert_expiry = datetime.strptime( cert_expiry = datetime.strptime(
node["cert"]["expiry_date"], "%b %d %H:%M:%S %Y GMT" node["cert"]["expiry_date"], "%b %d %H:%M:%S %Y GMT"
@ -315,10 +333,10 @@ def check_nodes() -> list:
if node["ip"] not in sent_notifications: if node["ip"] not in sent_notifications:
sent_notifications[node["ip"]] = datetime.now() sent_notifications[node["ip"]] = datetime.now()
send_down_notification(node) send_down_notification(node)
break continue
if sent_notifications[node["ip"]] < datetime.now() - relativedelta.relativedelta(days=1): if sent_notifications[node["ip"]] < datetime.now() - relativedelta.relativedelta(days=1):
send_down_notification(node) send_down_notification(node)
break continue
cert_853_expiry = datetime.strptime( cert_853_expiry = datetime.strptime(
node["cert_853"]["expiry_date"], "%b %d %H:%M:%S %Y GMT" node["cert_853"]["expiry_date"], "%b %d %H:%M:%S %Y GMT"
) )
@ -326,50 +344,34 @@ def check_nodes() -> list:
if node["ip"] not in sent_notifications: if node["ip"] not in sent_notifications:
sent_notifications[node["ip"]] = datetime.now() sent_notifications[node["ip"]] = datetime.now()
send_down_notification(node) send_down_notification(node)
break continue
if sent_notifications[node["ip"]] < datetime.now() - relativedelta.relativedelta(days=1): if sent_notifications[node["ip"]] < datetime.now() - relativedelta.relativedelta(days=1):
send_down_notification(node) send_down_notification(node)
break continue
return node_status return node_status
def check_nodes_from_log() -> list: def check_nodes_from_log() -> list:
global last_log global last_log
if last_log > datetime.now() - relativedelta.relativedelta(minutes=10): # Load the last log
# Load the last log with open(f"{log_dir}/node_status.json", "r") as file:
with open(f"{log_dir}/node_status.json", "r") as file: data = json.load(file)
data = json.load(file) newest = {
newest = { "date": datetime.now() - relativedelta.relativedelta(years=1),
"date": datetime.now() - relativedelta.relativedelta(years=1), "nodes": [],
"nodes": [], }
} for entry in data:
for entry in data: if datetime.strptime(entry["date"], "%Y-%m-%d %H:%M:%S") > newest["date"]:
if datetime.strptime(entry["date"], "%Y-%m-%d %H:%M:%S") > newest["date"]: newest = entry
newest = entry newest["date"] = datetime.strptime(newest["date"], "%Y-%m-%d %H:%M:%S")
newest["date"] = datetime.strptime(newest["date"], "%Y-%m-%d %H:%M:%S") node_status = newest["nodes"]
node_status = newest["nodes"] if datetime.now() > newest["date"] + relativedelta.relativedelta(minutes=10):
elif last_log < datetime.now() - relativedelta.relativedelta(days=7): print("Failed to get a new enough log, checking nodes", flush=True)
# Load the log file print(newest["date"])
with open(f"{log_dir}/node_status.json", "r") as file:
data = json.load(file)
newest = {
"date": datetime.now() - relativedelta.relativedelta(years=1),
"nodes": [],
}
for entry in data:
if datetime.strptime(entry["date"], "%Y-%m-%d %H:%M:%S") > newest["date"]:
newest = entry
newest["date"] = datetime.strptime(newest["date"], "%Y-%m-%d %H:%M:%S")
node_status = newest["nodes"]
if newest["date"] > datetime.now() - relativedelta.relativedelta(minutes=10):
last_log = newest["date"]
print(f"Loaded log from {last_log}", flush=True)
return node_status
else:
node_status = check_nodes() node_status = check_nodes()
else:
last_log = newest["date"]
return node_status return node_status
def send_notification(title, description,author): def send_notification(title, description,author):
discord_hook = os.getenv("DISCORD_HOOK") discord_hook = os.getenv("DISCORD_HOOK")
if discord_hook: if discord_hook:
@ -682,8 +684,8 @@ def index():
history = get_history(history_days) history = get_history(history_days)
history_summary = summarize_history(history) history_summary = summarize_history(history)
history_summary["nodes"] = convert_nodes_to_dict(history_summary["nodes"]) history_summary["nodes"] = convert_nodes_to_dict(history_summary["nodes"])
last_check = last_log.strftime("%Y-%m-%d %H:%M:%S") last_check = format_last_check(last_log)
print(f"Last check: {last_check}", flush=True)
return render_template( return render_template(
"index.html", "index.html",
nodes=node_status, nodes=node_status,