generated from nathanwoodburn/python-webserver-template
feat: Start building backend domain and notification functions
All checks were successful
Build Docker / BuildImage (push) Successful in 1m0s
All checks were successful
Build Docker / BuildImage (push) Successful in 1m0s
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -4,3 +4,4 @@ __pycache__/
|
|||||||
.env
|
.env
|
||||||
.vs/
|
.vs/
|
||||||
.venv/
|
.venv/
|
||||||
|
data/
|
||||||
121
alerts.py
Normal file
121
alerts.py
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
import requests
|
||||||
|
import smtplib
|
||||||
|
from email.message import EmailMessage
|
||||||
|
from email.mime.text import MIMEText
|
||||||
|
from email.headerregistry import Address
|
||||||
|
import ssl
|
||||||
|
import dotenv
|
||||||
|
|
||||||
|
dotenv.load_dotenv()
|
||||||
|
|
||||||
|
SMTP_SERVER = os.getenv('SMTP_SERVER', 'localhost')
|
||||||
|
SMTP_PORT = int(os.getenv('SMTP_PORT', 465))
|
||||||
|
SMTP_USERNAME = os.getenv('SMTP_USERNAME', None)
|
||||||
|
SMTP_PASSWORD = os.getenv('SMTP_PASSWORD', None)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_alert(domain: str, notification: dict, alert_data: dict):
|
||||||
|
"""
|
||||||
|
Handle the alert for a domain.
|
||||||
|
"""
|
||||||
|
alert_type = notification.get('type')
|
||||||
|
|
||||||
|
if alert_type == 'discord_webhook':
|
||||||
|
discord_webhook(notification['url'], domain,
|
||||||
|
alert_data, notification['blocks'])
|
||||||
|
elif alert_type == 'email':
|
||||||
|
email(notification['email'], domain,
|
||||||
|
alert_data, notification['blocks'])
|
||||||
|
else:
|
||||||
|
print(f"Unknown alert type: {alert_type} for domain: {domain}")
|
||||||
|
|
||||||
|
|
||||||
|
def discord_webhook(webhook_url: str, domain: str, content: str, alert_blocks: int):
|
||||||
|
"""
|
||||||
|
Send a message to a Discord webhook.
|
||||||
|
"""
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"username": "FireAlerts",
|
||||||
|
"avatar_url": "https://firewallet.au/assets/img/FW.png",
|
||||||
|
"components": [
|
||||||
|
{
|
||||||
|
"type": 1,
|
||||||
|
"components": [
|
||||||
|
{
|
||||||
|
"type": 2,
|
||||||
|
"style": 5,
|
||||||
|
"url": f"https://alerts.firewallet.au/account/{domain}",
|
||||||
|
"label": "Open your FireAlerts account"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"embeds": [
|
||||||
|
{
|
||||||
|
"author": {
|
||||||
|
"name": "FireAlerts",
|
||||||
|
"icon_url": "https://firewallet.au/assets/img/FW.png"
|
||||||
|
},
|
||||||
|
"title": f"{domain} is expiring in {content['blocks']} blocks (~{content['time']})",
|
||||||
|
"color": 13041919,
|
||||||
|
"description": f"You set an alert for {domain}. This domain will expire in {content['blocks']} blocks or approximately {content['time']}.",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"name": "Domain",
|
||||||
|
"value": domain,
|
||||||
|
"inline": True
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Notice Blocks",
|
||||||
|
"value": f"{alert_blocks}",
|
||||||
|
"inline": True
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
print(json.dumps(data, indent=4)) # Debugging output
|
||||||
|
response = requests.post(f"{webhook_url}?with_components=true", json=data)
|
||||||
|
if response.status_code != 204:
|
||||||
|
print(
|
||||||
|
f"Failed to send Discord webhook: {response.status_code} - {response.text}")
|
||||||
|
|
||||||
|
|
||||||
|
def email(email_addr: str, domain: str, content: dict, alert_blocks: int):
|
||||||
|
"""
|
||||||
|
Send an email notification.
|
||||||
|
"""
|
||||||
|
|
||||||
|
message = EmailMessage()
|
||||||
|
message['Subject'] = f"{domain} is expiring in {content['blocks']} blocks (~{content['time']})"
|
||||||
|
message['From'] = f'FireAlerts <{SMTP_USERNAME}>'
|
||||||
|
message['To'] = email_addr
|
||||||
|
message.set_content(f"""
|
||||||
|
You set an alert for {domain}. This domain will expire in {content['blocks']} blocks or approximately {content['time']}.
|
||||||
|
|
||||||
|
Domain: {domain}
|
||||||
|
Blocks remaining: {content['blocks']}
|
||||||
|
Time remaining: {content['time']}
|
||||||
|
Alert threshold: {alert_blocks} blocks
|
||||||
|
|
||||||
|
Visit your FireAlerts account: https://alerts.firewallet.au/account/{domain}
|
||||||
|
""")
|
||||||
|
|
||||||
|
try:
|
||||||
|
print(f"Attempting to connect to {SMTP_SERVER}:{SMTP_PORT}")
|
||||||
|
context = ssl.create_default_context()
|
||||||
|
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, context=context) as server:
|
||||||
|
if SMTP_USERNAME and SMTP_PASSWORD:
|
||||||
|
server.login(SMTP_USERNAME, SMTP_PASSWORD)
|
||||||
|
server.send_message(message)
|
||||||
|
print(f"Email sent to {email_addr} for domain {domain}")
|
||||||
|
except smtplib.SMTPException as e:
|
||||||
|
print(f"SMTP error sending email to {email_addr}: {e}")
|
||||||
|
except ConnectionRefusedError as e:
|
||||||
|
print(
|
||||||
|
f"Connection refused to SMTP server {SMTP_SERVER}:{SMTP_PORT} - {e}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Unexpected error sending email to {email_addr}: {e}")
|
||||||
181
domains.py
Normal file
181
domains.py
Normal file
@@ -0,0 +1,181 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
import requests
|
||||||
|
import dotenv
|
||||||
|
import alerts
|
||||||
|
dotenv.load_dotenv()
|
||||||
|
|
||||||
|
HSD_URL = os.getenv('HSD_URL', 'localhost')
|
||||||
|
HSD_API_KEY = os.getenv('HSD_API_KEY', None)
|
||||||
|
HSD_NETWORK = os.getenv('HSD_NETWORK', 'main')
|
||||||
|
|
||||||
|
HSD_PORTS = {
|
||||||
|
'main': 12037,
|
||||||
|
'testnet': 13037,
|
||||||
|
'regtest': 14037,
|
||||||
|
'simnet': 15037,
|
||||||
|
}
|
||||||
|
HSD_PORT = HSD_PORTS.get(HSD_NETWORK, 12037)
|
||||||
|
|
||||||
|
HSD_URL_FULL = f'http://x:{HSD_API_KEY}@{HSD_URL}:{HSD_PORT}' if HSD_API_KEY else f'http://{HSD_URL}:{HSD_PORT}'
|
||||||
|
|
||||||
|
if not os.path.exists('data'):
|
||||||
|
os.makedirs('data')
|
||||||
|
|
||||||
|
if not os.path.exists('data/domains.json'):
|
||||||
|
with open('data/domains.json', 'w') as f:
|
||||||
|
json.dump({}, f)
|
||||||
|
|
||||||
|
|
||||||
|
def get_current_block() -> int:
|
||||||
|
"""
|
||||||
|
Get the current block number from the HSD node.
|
||||||
|
"""
|
||||||
|
response = requests.get(HSD_URL_FULL)
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
print(f"Error fetching current block: {response.status_code} - {response.text}")
|
||||||
|
return -1
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
if 'error' in data and data['error'] is not None:
|
||||||
|
print(f"Error fetching current block: {data['error']}")
|
||||||
|
return -1
|
||||||
|
|
||||||
|
chain_data = data.get('chain', None)
|
||||||
|
if not chain_data or 'height' not in chain_data:
|
||||||
|
print("No chain data or height found in response.")
|
||||||
|
return -1
|
||||||
|
return chain_data['height']
|
||||||
|
|
||||||
|
def get_domain_expiry_block(domain: str) -> int:
|
||||||
|
"""
|
||||||
|
Get the expiry block of a domain.
|
||||||
|
"""
|
||||||
|
response = requests.post(HSD_URL_FULL, json={ "method": "getnameinfo", "params":[domain] })
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
return -1
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
if 'error' in data and data['error'] is not None:
|
||||||
|
print(f"Error fetching data for {domain}: {data['error']}")
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if 'result' not in data or 'info' not in data['result']:
|
||||||
|
print(f"No result or info found for {domain}.")
|
||||||
|
return -1
|
||||||
|
|
||||||
|
if 'stats' not in data['result']['info']:
|
||||||
|
print(f"No stats information found for {domain}.")
|
||||||
|
return -1
|
||||||
|
|
||||||
|
stats = data['result']['info']['stats']
|
||||||
|
if 'renewalPeriodEnd' not in stats:
|
||||||
|
print(f"No renewalPeriodEnd found in stats for {domain}.")
|
||||||
|
return -1
|
||||||
|
|
||||||
|
return stats['renewalPeriodEnd']
|
||||||
|
|
||||||
|
def get_domains() -> dict:
|
||||||
|
"""
|
||||||
|
Get the dict of domains from the JSON file.
|
||||||
|
"""
|
||||||
|
with open('data/domains.json', 'r') as f:
|
||||||
|
domains = json.load(f)
|
||||||
|
return domains
|
||||||
|
|
||||||
|
def add_notification(domain: str, notification: dict):
|
||||||
|
"""
|
||||||
|
Add a notification for a domain.
|
||||||
|
"""
|
||||||
|
domains = get_domains()
|
||||||
|
if domain not in domains:
|
||||||
|
domains[domain] = []
|
||||||
|
domains[domain].append(notification)
|
||||||
|
|
||||||
|
with open('data/domains.json', 'w') as f:
|
||||||
|
json.dump(domains, f, indent=4)
|
||||||
|
|
||||||
|
def update_notification(domain: str, notification: dict):
|
||||||
|
"""
|
||||||
|
Update a notification for a domain.
|
||||||
|
"""
|
||||||
|
domains = get_domains()
|
||||||
|
if domain in domains:
|
||||||
|
for i, existing_notification in enumerate(domains[domain]):
|
||||||
|
if existing_notification['type'] == notification['type'] and existing_notification['id'] == notification['id']:
|
||||||
|
domains[domain][i] = notification
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
domains[domain].append(notification)
|
||||||
|
else:
|
||||||
|
domains[domain] = [notification]
|
||||||
|
|
||||||
|
with open('data/domains.json', 'w') as f:
|
||||||
|
json.dump(domains, f, indent=4)
|
||||||
|
|
||||||
|
def get_account_notifications(user_name: str) -> list:
|
||||||
|
"""
|
||||||
|
Get all notifications for a specific account.
|
||||||
|
"""
|
||||||
|
domains = get_domains()
|
||||||
|
# For each notification check if user_name
|
||||||
|
notifications = []
|
||||||
|
for domain, domain_notifications in domains.items():
|
||||||
|
for notification in domain_notifications:
|
||||||
|
if notification.get('user_name') == user_name:
|
||||||
|
notifications.append({
|
||||||
|
'domain': domain,
|
||||||
|
'notification': notification
|
||||||
|
})
|
||||||
|
return notifications
|
||||||
|
|
||||||
|
|
||||||
|
def notify_expiries():
|
||||||
|
"""
|
||||||
|
Notify about the expiry of domains.
|
||||||
|
"""
|
||||||
|
domains = get_domains()
|
||||||
|
if not domains:
|
||||||
|
print("No domains found.")
|
||||||
|
return
|
||||||
|
|
||||||
|
current_block = get_current_block()
|
||||||
|
|
||||||
|
|
||||||
|
for domain in domains:
|
||||||
|
expiry_block = get_domain_expiry_block(domain)
|
||||||
|
if expiry_block == -1:
|
||||||
|
continue
|
||||||
|
blocks_remaining = expiry_block - current_block
|
||||||
|
domain_data = {
|
||||||
|
"blocks": blocks_remaining,
|
||||||
|
"time": f"{blocks_remaining // 144} days" # Assuming 144 blocks per day
|
||||||
|
}
|
||||||
|
for notification in domains[domain]:
|
||||||
|
print(blocks_remaining, notification['blocks'])
|
||||||
|
if notification['blocks'] <= blocks_remaining and notification['blocks'] >= (blocks_remaining - 1): # Just in case there are 2 blocks really close together
|
||||||
|
# Check if last block notified is more than current block + 5
|
||||||
|
if notification.get('last_block_notified', -1) < (current_block - 5):
|
||||||
|
notification['last_block_notified'] = current_block
|
||||||
|
# Update the notification
|
||||||
|
update_notification(domain, notification)
|
||||||
|
# Handle the alert
|
||||||
|
alerts.handle_alert(domain, notification, domain_data)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Example usage
|
||||||
|
domain = "woodburn"
|
||||||
|
try:
|
||||||
|
expiry = get_domain_expiry_block(domain)
|
||||||
|
print(f"The expiry block for {domain} is: {expiry}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error fetching expiry for {domain}: {e}")
|
||||||
|
|
||||||
|
# Notify about domain expiries
|
||||||
|
notify_expiries()
|
||||||
|
|
||||||
|
print(json.dumps(get_account_notifications('nathan'), indent=4))
|
||||||
7
example.env
Normal file
7
example.env
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
HSD_URL=localhost
|
||||||
|
HSD_NETWORK=main
|
||||||
|
HSD_API_KEY=your_api_key_here
|
||||||
|
SMTP_SERVER=smtp.hostname.com
|
||||||
|
SMTP_PORT=465
|
||||||
|
SMTP_USERNAME=noreply@email.au
|
||||||
|
SMTP_PASSWORD=Secretpassword123
|
||||||
Reference in New Issue
Block a user