hnsalert/domains.py

338 lines
11 KiB
Python
Raw Normal View History

2023-12-11 18:00:58 +11:00
import mysql.connector
from mysql.connector import Error
import dotenv
import os
import json
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
import re
2023-12-11 20:53:26 +11:00
import account
2023-12-11 18:00:58 +11:00
dotenv.load_dotenv()
db_config = {
'host': os.getenv('DB_HOST'),
'database': os.getenv('DB_NAME'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD')
}
def addDomain(userID:int, domain:str):
# Get user domains
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT domains FROM users WHERE id = %s", (userID,))
user = cursor.fetchall()
cursor.close()
conn.close()
# Read json domains from first user
user = user[0][0]
userDomains = json.loads(user)
# Check if domain is already in use
for userDomain in userDomains:
if (userDomain['name'] == domain):
return False
# Add domain to user
userDomains.append({
"name": domain,
"status": "pending"
})
# Update user domains
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("UPDATE users SET domains = %s WHERE id = %s", (json.dumps(userDomains), userID))
conn.commit()
cursor.close()
conn.close()
return True
def verifyDomain(domain:str):
# Remove any trailing slashes
domain = domain.rstrip('/')
# Remove any protocol
domain = domain.replace('https://', '')
domain = domain.replace('http://', '')
domain = domain.lower()
# Verify domain contains only valid characters
if (re.match("^[a-z0-9.-]*$", domain) == None):
return False
return domain
def deleteDomain(userID:int, domain:str):
# Get user domains
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT domains FROM users WHERE id = %s", (userID,))
user = cursor.fetchall()
cursor.close()
conn.close()
# Read json domains from first user
user = user[0][0]
userDomains = json.loads(user)
# Check if domain is already in use
for userDomain in userDomains:
if (userDomain['name'] == domain):
userDomains.remove(userDomain)
break
# Update user domains
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("UPDATE users SET domains = %s WHERE id = %s", (json.dumps(userDomains), userID))
conn.commit()
cursor.close()
conn.close()
return True
def syncDomains():
# Verify connection to HSD node
try:
r = requests.get('http://x:' + os.getenv('HSD_API_KEY') + '@' + os.getenv('HSD_IP')
+ ':' + os.getenv('HSD_PORT'))
if (r.status_code != 200):
return "HSD node is not responding"
# Check to make sure the node is synced
data = r.json()
if (data['chain']['progress'] < 0.999):
return "HSD node is not synced"
except:
return False
# Get all users
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT id, domains FROM users")
users = cursor.fetchall()
cursor.close()
conn.close()
# Loop through users
for user in users:
userID = user[0]
userDomains = json.loads(user[1])
# Loop through user domains
for userDomain in userDomains:
2023-12-11 20:53:26 +11:00
customNotifications = False
if 'notifications' in userDomain:
customNotifications = userDomain['notifications']
2023-12-11 18:00:58 +11:00
# Check if domain is pending
if (userDomain['status'] == 'pending'):
try:
r = requests.post('http://x:' + os.getenv('HSD_API_KEY') + '@' + os.getenv('HSD_IP')
+ ':' + os.getenv('HSD_WALLET_PORT'), json={'method': 'importname', 'params': [userDomain['name']]})
if (r.status_code == 200):
# Update domain status
userDomain['status'] = 'added'
# Set some defaults
userDomain['transfering'] = 0
userDomain['next'] = 'none'
userDomain['when'] = 0
2023-12-11 20:53:26 +11:00
userDomain['records'] = ''
2023-12-11 18:00:58 +11:00
# Update user domains
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("UPDATE users SET domains = %s WHERE id = %s", (json.dumps(userDomains), userID))
conn.commit()
cursor.close()
conn.close()
except:
pass
# Get domain info
r = requests.post('http://x:' + os.getenv('HSD_API_KEY') + '@' + os.getenv('HSD_IP')
+ ':' + os.getenv('HSD_PORT'), json={'method': 'getnameinfo', 'params': [userDomain['name']]})
if (r.status_code == 200):
data = r.json()
2023-12-11 20:53:26 +11:00
print(json.dumps(data, indent=4))
2023-12-11 18:00:58 +11:00
# Check if domain is registered
info = data['result']['info']
if (userDomain['transfering'] != info['transfer']):
# Update domain status
2023-12-11 20:53:26 +11:00
if (notify('transfer', customNotifications, userID)):
alert('transfer', userDomain['name'], userID)
2023-12-11 18:00:58 +11:00
userDomain['transfering'] = info['transfer']
2023-12-11 20:53:26 +11:00
if (userDomain['records'] != info['data'] and userDomain['records'] != ''):
# Update domain status
if (notify('dns', customNotifications, userID)):
alert('dns', userDomain['name'], userID)
userDomain['records'] = info['data']
2023-12-11 18:00:58 +11:00
if 'stats' in info:
if 'blocksUntilExpire' in info['stats']:
# Update domain status
userDomain['next'] = 'expires'
previous = userDomain['when']
userDomain['when'] = info['stats']['blocksUntilExpire']
2023-12-11 20:53:26 +11:00
if (crossTimeAlert(previous, userDomain['when']) or True):
if (notify('expire', customNotifications, userID,
crossTimeAlert(previous, userDomain['when']))):
alert('expire', userDomain['name'], userID, crossTimeAlert(previous, userDomain['when']))
2023-12-11 18:00:58 +11:00
elif 'blocksUntilBidding' in info['stats']:
# Update domain status
userDomain['next'] = 'opens for bidding'
userDomain['when'] = info['stats']['blocksUntilBidding']
# Update user domains
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("UPDATE users SET domains = %s WHERE id = %s", (json.dumps(userDomains), userID))
conn.commit()
cursor.close()
conn.close()
else:
return "Failed to get info about domain: " + userDomain['name'] + "<br>" + str(r.text)
return "Finished syncing domains"
2023-12-11 20:53:26 +11:00
def alert(event,domain,userID, time=False):
notification_name = {
'transfer': 'transfer_notifications',
'dns': 'edit_notifications'
}
# Get user
user = account.getUserFromID(userID)
# Check if domain has custom notifications
customNotifications = False
for domainInfo in user['domains']:
if (domainInfo['name'] == domain):
if 'notifications' in domainInfo:
customNotifications = domainInfo['notifications']
break
if (customNotifications):
# Send custom notification
if (event != 'expire'):
send(customNotifications[notification_name[event]], domain, event,userID)
elif (time == 'month'):
send(customNotifications['expiry_month'], domain, event,userID)
elif (time == 'week'):
send(customNotifications['expiry_week'], domain, event,userID)
else:
if (event != 'expire'):
send(user['notifications'][notification_name[event]], domain, event,userID)
elif (time == 'month'):
send(user['notifications']['expiry_month'], domain, event,userID)
elif (time == 'week'):
send(user['notifications']['expiry_week'], domain, event,userID)
def send(providers,domain:str,event,userID):
user = account.getUserFromID(userID)
title = {
'transfer': 'Transfer Alert for {domain}',
'dns': '{domain} has had a DNS update',
'expire': '{domain} will expire soon'
}
content = {
'transfer': 'The domain {domain} has started a transfer to a new wallet',
'dns': 'The domain {domain} has had a DNS update',
'expire': '{domain} will expire in {time}'
}
2023-12-11 18:00:58 +11:00
2023-12-11 20:53:26 +11:00
title = title[event].replace('{domain}',domain.capitalize()+'/')
content = content[event].replace('{domain}',domain.capitalize()+'/')
if (event == 'expire'):
domainInfo = getCachedDomainInfo(domain)
content = content.replace('{time}',blocksToTime(domainInfo['when']))
if (providers['email']):
account.sendEmail(user['email'],title,content)
if (providers['discord']):
if ('discord' in user['notifications']):
account.sendDiscordWebhook(user['notifications']['discord'],title,content)
def notify(event, customNotifications, userID, time=False):
# TODO this will check if the user has notifications enabled for the event
# This should make the sync a bit faster but it's not a huge deal
if (event == 'transfer'):
return True
if (event == 'dns'):
return True
if (event == 'expire'):
return True
return True
def crossTimeAlert(was, now):
2023-12-11 18:00:58 +11:00
# Check for each of these times to see if we should alert
month = 4320
week = 1008
# If the time crossed the month mark
if (was > month and now <= month):
2023-12-11 20:53:26 +11:00
return "month"
2023-12-11 18:00:58 +11:00
# If the time crossed the week mark
if (was > week and now <= week):
2023-12-11 20:53:26 +11:00
return "week"
2023-12-11 18:00:58 +11:00
return False
def getCachedDomainInfo(domain):
# Get domain info from user domains
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT domains FROM users")
users = cursor.fetchall()
cursor.close()
conn.close()
# Loop through users
for user in users:
userDomains = json.loads(user[0])
# Loop through user domains
for userDomain in userDomains:
if (userDomain['name'] == domain):
return userDomain
return False
def blocksToTime(blocks):
# Get minutes
minutes = blocks * 10
years, minutes = divmod(minutes, 525600)
days, minutes = divmod(minutes, 1440)
hours, minutes = divmod(minutes, 60)
# Build the string
time_string = ""
if years:
time_string += f"{years} {'year' if years == 1 else 'years'}, "
if days:
time_string += f"{days} {'day' if days == 1 else 'days'}, "
if hours and not years:
time_string += f"{hours} {'hour' if hours == 1 else 'hours'}, "
if minutes and not years and not days:
time_string += f"{minutes} {'min' if minutes == 1 else 'mins'}"
return time_string.rstrip(', ')