hnsalert/account.py

348 lines
10 KiB
Python
Raw Normal View History

2023-12-11 15:28:20 +11:00
import mysql.connector
from mysql.connector import Error
import dotenv
import os
import json
import time
from email_validator import validate_email, EmailNotValidError
import bcrypt
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
dotenv.load_dotenv()
db_config = {
'host': os.getenv('DB_HOST'),
'database': os.getenv('DB_NAME'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD')
}
def verifyUser(userToken:str):
# Get userID from userToken
if (userToken.count('$') != 1):
return False
userToken = userToken.split('$')
userID = userToken[0]
userToken = userToken[1]
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT user_token FROM users WHERE id = %s", (userID,))
user = cursor.fetchall()
cursor.close()
conn.close()
# Read json token from first user
user = user[0][0]
userTokens = json.loads(user)
for token in userTokens:
if (token['token'] == userToken):
if (token['expires'] > time.time()):
return True
return False
def createUser(email:str, password:str):
# Check if email is valid
try:
valid = validate_email(email)
email = valid.email
except EmailNotValidError as e:
# email is not valid, exception message is human-readable
return False
# Check if email is already in use
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT email FROM users WHERE email = %s", (email,))
user = cursor.fetchall()
cursor.close()
conn.close()
if (len(user) != 0):
return False
passwordHash = hashPassword(password)
# Create user
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("INSERT INTO users (email, password, admin, notifications, user_token, domains) VALUES (%s, %s, %s, %s, %s, %s)", (email, passwordHash, False, json.dumps({}), json.dumps([]), json.dumps([])))
conn.commit()
cursor.close()
conn.close()
return True
def login(email:str, password:str):
# Check if email is valid
try:
valid = validate_email(email)
email = valid.email
except EmailNotValidError as e:
# email is not valid, exception message is human-readable
return False
# Check if email is already in use
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT id, password FROM users WHERE email = %s", (email,))
user = cursor.fetchall()
cursor.close()
conn.close()
if (len(user) == 0):
return False
# Check if password is correct
userID = user[0][0]
if (checkPassword(password, user[0][1])):
# Create new user token
userToken = genToken()
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT user_token FROM users WHERE id = %s", (userID,))
user = cursor.fetchall()
cursor.close()
conn.close()
# Read json token from first user
user = user[0][0]
print(user)
userTokens = json.loads(user)
userTokens.append({'token': userToken, 'expires': time.time() + 86400})
# Update user token
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("UPDATE users SET user_token = %s WHERE id = %s", (json.dumps(userTokens), userID,))
conn.commit()
cursor.close()
conn.close()
return str(userID) + '$' + userToken
else:
return False
def getUser(userToken:str):
# Get userID from userToken
if (userToken.count('$') != 1):
return False
userToken = userToken.split('$')
userID = userToken[0]
userToken = userToken[1]
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT id, email, admin, notifications, domains, created_at FROM users WHERE id = %s", (userID,))
user = cursor.fetchall()
cursor.close()
conn.close()
# Read json token from first user
user = user[0]
return {'id': user[0], 'email': user[1], 'admin': user[2], 'notifications': json.loads(user[3]), 'domains': json.loads(user[4]), 'created_at': user[5]}
2023-12-11 20:53:26 +11:00
def getUserFromID(userID:int):
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT id, email, admin, notifications, domains, created_at FROM users WHERE id = %s", (userID,))
user = cursor.fetchall()
cursor.close()
conn.close()
# Read json token from first user
user = user[0]
return {'id': user[0], 'email': user[1], 'admin': user[2], 'notifications': json.loads(user[3]), 'domains': json.loads(user[4]), 'created_at': user[5]}
2023-12-11 15:28:20 +11:00
def logoutUser(userToken:str):
pass
def hashPassword(password:str):
# Use bcrypt to hash password
password = password.encode('utf-8')
salt = bcrypt.gensalt()
passwordHash = bcrypt.hashpw(password, salt)
passwordHash = passwordHash.decode('utf-8')
return passwordHash
def checkPassword(password:str, passwordHash:str):
# Use bcrypt to check password
password = password.encode('utf-8')
passwordHash = passwordHash.encode('utf-8')
if (bcrypt.checkpw(password, passwordHash)):
return True
else:
return False
def genToken():
# Generate a random token
token = os.urandom(32)
token = token.hex()
return token
def updateNotifications(token, notifications):
# Get userID from userToken
if (token.count('$') != 1):
return False
token = token.split('$')
userID = token[0]
token = token[1]
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("UPDATE users SET notifications = %s WHERE id = %s", (json.dumps(notifications), userID,))
conn.commit()
cursor.close()
conn.close()
return True
2023-12-11 18:00:58 +11:00
def updateDomainNotifications(token, domain, notifications):
# Get userID from userToken
if (token.count('$') != 1):
return False
token = token.split('$')
userID = token[0]
token = token[1]
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT domains FROM users WHERE id = %s", (userID,))
user = cursor.fetchall()
cursor.close()
conn.close()
print(domain)
print(notifications)
# Read json token from first user
user = user[0][0]
user = json.loads(user)
for userDomain in user:
if (userDomain['name'] == domain):
userDomain['notifications'] = notifications
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("UPDATE users SET domains = %s WHERE id = %s", (json.dumps(user), userID,))
conn.commit()
cursor.close()
conn.close()
return True
2023-12-11 15:28:20 +11:00
def updateNotificationProvider(token, provider,account):
# Get userID from userToken
if (token.count('$') != 1):
return False
token = token.split('$')
userID = token[0]
token = token[1]
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT notifications FROM users WHERE id = %s", (userID,))
user = cursor.fetchall()
cursor.close()
conn.close()
# Read json token from first user
user = user[0][0]
user = json.loads(user)
user[provider] = account
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("UPDATE users SET notifications = %s WHERE id = %s", (json.dumps(user), userID,))
conn.commit()
cursor.close()
conn.close()
return True
def sendNotification(service, service_account, title, content):
if (service == 'email'):
sendEmail(service_account, title, content)
elif (service == 'discord'):
sendDiscordWebhook(service_account, title, content)
elif (service == 'telegram'):
pass
else:
return False
return True
def sendEmail(email, title, content):
sender_email = os.getenv('EMAIL_FROM')
sender_password = os.getenv('EMAIL_PASSWORD')
recipient_email = email
# Create the MIME object
message = MIMEMultipart()
message['From'] = sender_email
message['To'] = recipient_email
message['Subject'] = title
# Add the email body
body = content
message.attach(MIMEText(body, 'plain'))
# Connect to the SMTP server
with smtplib.SMTP(os.getenv('EMAIL_HOST'), os.getenv('EMAIL_PORT')) as server:
try:
if (os.getenv('EMAIL_USE_TLS') == 'True'):
server.starttls()
server.login(sender_email, sender_password)
# Send the email
server.sendmail(sender_email, recipient_email, message.as_string())
print("Email sent successfully.")
except Exception as e:
print(e)
print("Email failed to send.")
def sendDiscordWebhook(webhook, title, content):
# Create the webhook object
headers = {
'Content-Type': 'application/json'
}
payload = {
"content": None,
"embeds": [
{
"title": title,
"description": content,
"color": 5907868,
"footer": {
"text": "Powered by Woodburn",
"icon_url": "https://woodburn.au/favicon.png"
},
# "timestamp": "2023-12-11T04:17:00.000Z"
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.localtime())
}
],
"username": "HNS Alert",
"avatar_url": "https://woodburn.au/favicon.png",
"attachments": []
}
response = requests.post(webhook, data=json.dumps(payload), headers=headers)
if response.status_code == 204:
print("Message sent successfully to Discord webhook.")
else:
print(f"Failed to send message to Discord webhook. Status code: {response.status_code}")