From 3ddf7c5dfe9fab97538caa2a31b38315c3f69465 Mon Sep 17 00:00:00 2001 From: Nathan Woodburn Date: Thu, 24 Jul 2025 16:25:30 +1000 Subject: [PATCH] feat: Start building backend domain and notification functions --- .gitignore | 1 + alerts.py | 121 +++++++++++++++++++++++++++++++++++ domains.py | 181 ++++++++++++++++++++++++++++++++++++++++++++++++++++ example.env | 7 ++ 4 files changed, 310 insertions(+) create mode 100644 alerts.py create mode 100644 domains.py create mode 100644 example.env diff --git a/.gitignore b/.gitignore index 7d847cc..f97c19d 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ __pycache__/ .env .vs/ .venv/ +data/ \ No newline at end of file diff --git a/alerts.py b/alerts.py new file mode 100644 index 0000000..1c8b393 --- /dev/null +++ b/alerts.py @@ -0,0 +1,121 @@ +import json +import os +import requests +import smtplib +from email.message import EmailMessage +from email.mime.text import MIMEText +from email.headerregistry import Address +import ssl +import dotenv + +dotenv.load_dotenv() + +SMTP_SERVER = os.getenv('SMTP_SERVER', 'localhost') +SMTP_PORT = int(os.getenv('SMTP_PORT', 465)) +SMTP_USERNAME = os.getenv('SMTP_USERNAME', None) +SMTP_PASSWORD = os.getenv('SMTP_PASSWORD', None) + + +def handle_alert(domain: str, notification: dict, alert_data: dict): + """ + Handle the alert for a domain. + """ + alert_type = notification.get('type') + + if alert_type == 'discord_webhook': + discord_webhook(notification['url'], domain, + alert_data, notification['blocks']) + elif alert_type == 'email': + email(notification['email'], domain, + alert_data, notification['blocks']) + else: + print(f"Unknown alert type: {alert_type} for domain: {domain}") + + +def discord_webhook(webhook_url: str, domain: str, content: str, alert_blocks: int): + """ + Send a message to a Discord webhook. + """ + + data = { + "username": "FireAlerts", + "avatar_url": "https://firewallet.au/assets/img/FW.png", + "components": [ + { + "type": 1, + "components": [ + { + "type": 2, + "style": 5, + "url": f"https://alerts.firewallet.au/account/{domain}", + "label": "Open your FireAlerts account" + } + ] + } + ], + "embeds": [ + { + "author": { + "name": "FireAlerts", + "icon_url": "https://firewallet.au/assets/img/FW.png" + }, + "title": f"{domain} is expiring in {content['blocks']} blocks (~{content['time']})", + "color": 13041919, + "description": f"You set an alert for {domain}. This domain will expire in {content['blocks']} blocks or approximately {content['time']}.", + "fields": [ + { + "name": "Domain", + "value": domain, + "inline": True + }, + { + "name": "Notice Blocks", + "value": f"{alert_blocks}", + "inline": True + } + ] + } + ] + } + print(json.dumps(data, indent=4)) # Debugging output + response = requests.post(f"{webhook_url}?with_components=true", json=data) + if response.status_code != 204: + print( + f"Failed to send Discord webhook: {response.status_code} - {response.text}") + + +def email(email_addr: str, domain: str, content: dict, alert_blocks: int): + """ + Send an email notification. + """ + + message = EmailMessage() + message['Subject'] = f"{domain} is expiring in {content['blocks']} blocks (~{content['time']})" + message['From'] = f'FireAlerts <{SMTP_USERNAME}>' + message['To'] = email_addr + message.set_content(f""" +You set an alert for {domain}. This domain will expire in {content['blocks']} blocks or approximately {content['time']}. + +Domain: {domain} +Blocks remaining: {content['blocks']} +Time remaining: {content['time']} +Alert threshold: {alert_blocks} blocks + +Visit your FireAlerts account: https://alerts.firewallet.au/account/{domain} +""") + + try: + print(f"Attempting to connect to {SMTP_SERVER}:{SMTP_PORT}") + context = ssl.create_default_context() + with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, context=context) as server: + if SMTP_USERNAME and SMTP_PASSWORD: + server.login(SMTP_USERNAME, SMTP_PASSWORD) + server.send_message(message) + print(f"Email sent to {email_addr} for domain {domain}") + except smtplib.SMTPException as e: + print(f"SMTP error sending email to {email_addr}: {e}") + except ConnectionRefusedError as e: + print( + f"Connection refused to SMTP server {SMTP_SERVER}:{SMTP_PORT} - {e}") + except Exception as e: + print(f"Unexpected error sending email to {email_addr}: {e}") diff --git a/domains.py b/domains.py new file mode 100644 index 0000000..fddd640 --- /dev/null +++ b/domains.py @@ -0,0 +1,181 @@ +import json +import os +import requests +import dotenv +import alerts +dotenv.load_dotenv() + +HSD_URL = os.getenv('HSD_URL', 'localhost') +HSD_API_KEY = os.getenv('HSD_API_KEY', None) +HSD_NETWORK = os.getenv('HSD_NETWORK', 'main') + +HSD_PORTS = { + 'main': 12037, + 'testnet': 13037, + 'regtest': 14037, + 'simnet': 15037, +} +HSD_PORT = HSD_PORTS.get(HSD_NETWORK, 12037) + +HSD_URL_FULL = f'http://x:{HSD_API_KEY}@{HSD_URL}:{HSD_PORT}' if HSD_API_KEY else f'http://{HSD_URL}:{HSD_PORT}' + +if not os.path.exists('data'): + os.makedirs('data') + +if not os.path.exists('data/domains.json'): + with open('data/domains.json', 'w') as f: + json.dump({}, f) + + +def get_current_block() -> int: + """ + Get the current block number from the HSD node. + """ + response = requests.get(HSD_URL_FULL) + + if response.status_code != 200: + print(f"Error fetching current block: {response.status_code} - {response.text}") + return -1 + + data = response.json() + if 'error' in data and data['error'] is not None: + print(f"Error fetching current block: {data['error']}") + return -1 + + chain_data = data.get('chain', None) + if not chain_data or 'height' not in chain_data: + print("No chain data or height found in response.") + return -1 + return chain_data['height'] + +def get_domain_expiry_block(domain: str) -> int: + """ + Get the expiry block of a domain. + """ + response = requests.post(HSD_URL_FULL, json={ "method": "getnameinfo", "params":[domain] }) + + if response.status_code != 200: + return -1 + + data = response.json() + if 'error' in data and data['error'] is not None: + print(f"Error fetching data for {domain}: {data['error']}") + return -1 + + if 'result' not in data or 'info' not in data['result']: + print(f"No result or info found for {domain}.") + return -1 + + if 'stats' not in data['result']['info']: + print(f"No stats information found for {domain}.") + return -1 + + stats = data['result']['info']['stats'] + if 'renewalPeriodEnd' not in stats: + print(f"No renewalPeriodEnd found in stats for {domain}.") + return -1 + + return stats['renewalPeriodEnd'] + +def get_domains() -> dict: + """ + Get the dict of domains from the JSON file. + """ + with open('data/domains.json', 'r') as f: + domains = json.load(f) + return domains + +def add_notification(domain: str, notification: dict): + """ + Add a notification for a domain. + """ + domains = get_domains() + if domain not in domains: + domains[domain] = [] + domains[domain].append(notification) + + with open('data/domains.json', 'w') as f: + json.dump(domains, f, indent=4) + +def update_notification(domain: str, notification: dict): + """ + Update a notification for a domain. + """ + domains = get_domains() + if domain in domains: + for i, existing_notification in enumerate(domains[domain]): + if existing_notification['type'] == notification['type'] and existing_notification['id'] == notification['id']: + domains[domain][i] = notification + break + else: + domains[domain].append(notification) + else: + domains[domain] = [notification] + + with open('data/domains.json', 'w') as f: + json.dump(domains, f, indent=4) + +def get_account_notifications(user_name: str) -> list: + """ + Get all notifications for a specific account. + """ + domains = get_domains() + # For each notification check if user_name + notifications = [] + for domain, domain_notifications in domains.items(): + for notification in domain_notifications: + if notification.get('user_name') == user_name: + notifications.append({ + 'domain': domain, + 'notification': notification + }) + return notifications + + +def notify_expiries(): + """ + Notify about the expiry of domains. + """ + domains = get_domains() + if not domains: + print("No domains found.") + return + + current_block = get_current_block() + + + for domain in domains: + expiry_block = get_domain_expiry_block(domain) + if expiry_block == -1: + continue + blocks_remaining = expiry_block - current_block + domain_data = { + "blocks": blocks_remaining, + "time": f"{blocks_remaining // 144} days" # Assuming 144 blocks per day + } + for notification in domains[domain]: + print(blocks_remaining, notification['blocks']) + if notification['blocks'] <= blocks_remaining and notification['blocks'] >= (blocks_remaining - 1): # Just in case there are 2 blocks really close together + # Check if last block notified is more than current block + 5 + if notification.get('last_block_notified', -1) < (current_block - 5): + notification['last_block_notified'] = current_block + # Update the notification + update_notification(domain, notification) + # Handle the alert + alerts.handle_alert(domain, notification, domain_data) + + + +if __name__ == "__main__": + # Example usage + domain = "woodburn" + try: + expiry = get_domain_expiry_block(domain) + print(f"The expiry block for {domain} is: {expiry}") + except Exception as e: + print(f"Error fetching expiry for {domain}: {e}") + + # Notify about domain expiries + notify_expiries() + + print(json.dumps(get_account_notifications('nathan'), indent=4)) \ No newline at end of file diff --git a/example.env b/example.env new file mode 100644 index 0000000..83c765c --- /dev/null +++ b/example.env @@ -0,0 +1,7 @@ +HSD_URL=localhost +HSD_NETWORK=main +HSD_API_KEY=your_api_key_here +SMTP_SERVER=smtp.hostname.com +SMTP_PORT=465 +SMTP_USERNAME=noreply@email.au +SMTP_PASSWORD=Secretpassword123 \ No newline at end of file