hns-email/server.py
Nathan Woodburn 3b118e6923
All checks were successful
Build Docker / BuildImage (push) Successful in 37s
fix: Allow deleting emails sent to e.hns.au emails
2024-10-09 16:04:31 +11:00

490 lines
13 KiB
Python

from functools import cache
import json
from flask import (
Flask,
make_response,
redirect,
request,
jsonify,
render_template,
send_from_directory,
send_file,
)
import os
import json
import requests
from datetime import datetime
import dotenv
from functools import cache
from markdownify import markdownify as md
dotenv.load_dotenv()
app = Flask(__name__)
notification_file = "/data/notifications.json"
if not os.path.exists("/data"):
notification_file = "./notifications.json"
def find(name, path):
for root, dirs, files in os.walk(path):
if name in files:
return os.path.join(root, name)
# Assets routes
@app.route("/assets/<path:path>")
def send_assets(path):
if path.endswith(".json"):
return send_from_directory(
"templates/assets", path, mimetype="application/json"
)
if os.path.isfile("templates/assets/" + path):
return send_from_directory("templates/assets", path)
# Try looking in one of the directories
filename: str = path.split("/")[-1]
if (
filename.endswith(".png")
or filename.endswith(".jpg")
or filename.endswith(".jpeg")
or filename.endswith(".svg")
):
if os.path.isfile("templates/assets/img/" + filename):
return send_from_directory("templates/assets/img", filename)
if os.path.isfile("templates/assets/img/favicon/" + filename):
return send_from_directory("templates/assets/img/favicon", filename)
return render_template("404.html"), 404
# region Special routes
@app.route("/favicon.png")
def faviconPNG():
return send_from_directory("templates/assets/img", "favicon.png")
@app.route("/.well-known/<path:path>")
def wellknown(path):
# Try to proxy to https://nathan.woodburn.au/.well-known/
req = requests.get(f"https://nathan.woodburn.au/.well-known/{path}")
return make_response(
req.content, 200, {"Content-Type": req.headers["Content-Type"]}
)
# endregion
# region Main routes
@app.route("/")
def index():
# Check if user is logged in
token = request.cookies.get("token")
if token:
return redirect("/inbox")
return render_template("login.html", login=f"https://login.hns.au/auth?return={request.url}login")
@app.route("/login")
def login():
# Get username and token from args
token = request.args.get("token")
# Set token cookie
response = make_response(redirect("/refresh"))
response.set_cookie("token", token)
return response
@app.route("/logout")
def logout():
# Delete token cookie
response = make_response(redirect("/"))
response.set_cookie("token", "")
return response
@app.route("/inbox")
def inbox():
# Check if user is logged in
token = request.cookies.get("token")
if not token:
return redirect("/")
# Get user info
user = get_user(token)
if not user:
return redirect("/logout")
email = user["hemail"]
return render_template("inbox.html", emails=get_email(email))
@app.route("/delete/<path:path>")
def delete(path):
# Check if user is logged in
token = request.cookies.get("token")
if not token:
return redirect("/")
# Get user info
user = get_user(token)
if not user:
return redirect("/logout")
email = user["hemail"]
delete_email(path, email)
return redirect("/inbox")
@app.route("/notifications")
def notifications():
# Check if user is logged in
token = request.cookies.get("token")
if not token:
return redirect("/")
# Get user info
user = get_user(token)
if not user:
return redirect("/logout")
email = user["hemail"]
notification = get_notification_settings(email)
if not notification:
return render_template("notifications.html", email=email)
return render_template("notifications.html", email=email, webhook=notification["webhook"], email_alert=notification["email"])
@app.route("/notifications", methods=["POST"])
def notification_set():
# Check if user is logged in
token = request.cookies.get("token")
if not token:
return redirect("/")
# Get user info
user = get_user(token)
if not user:
return redirect("/logout")
email = user["hemail"]
webhook = request.form.get("webhook")
email_alert = request.form.get("email")
save_notification_settings(email, webhook, email_alert)
return redirect("/notifications")
def save_notification_settings(email, webhook, email_alert):
if not os.path.exists(notification_file):
with open(notification_file, "w") as f:
json.dump({
email: {
"webhook": webhook,
"email": email_alert
}
}, f, indent=4)
with open(notification_file, "r") as f:
data = json.load(f)
data[email] = {
"webhook": webhook,
"email": email_alert
}
with open(notification_file, "w") as f:
json.dump(data, f, indent=4)
def get_notification_settings(email):
if not os.path.exists(notification_file):
return False
with open(notification_file, "r") as f:
data = json.load(f)
if email not in data:
return {"webhook": "", "email": ""}
return data[email]
def delete_email(conversation_id, email):
# Get email by id
conversation = get_conversation(conversation_id)
if not conversation:
print("No conversation found")
return
print(conversation)
if len(conversation) < 1:
print("No conversation found")
return
headers = {
"X-FreeScout-API-Key": os.getenv("FREESCOUT_API_KEY"),
}
altemail:str = f"{email.split('@')[0]}@e.hns.au"
for thread in conversation:
for message in thread["messages"]:
if email in message["to"] or altemail in message["to"]:
requests.delete(
f"https://ticket.woodburn.au/api/conversations/{thread['id']}", headers=headers)
clear_cache()
return
@app.route("/api/email", methods=["POST"])
def newemail():
# Get the json
json = request.get_json()
# Get header x-freescout-event
event = request.headers.get("x-freescout-event")
if event == "convo.created":
send_notification(json)
elif event == "convo.customer.reply.created":
send_notification(json)
return jsonify(json)
# Removed for now
# @app.route("/api/email/<path:path>")
# def getemail(path):
# email = f'{path}@login.hns.au'
# return jsonify(get_email(email))
def send_notification(data):
emails = data["cc"]
for email in emails:
notifications = get_notification_settings(email)
if not notifications:
continue
if notifications["email"] != "":
send_email(notifications["email"], data)
if notifications["webhook"] != "":
send_webhook(notifications["webhook"], data)
def send_email(email, data):
print(f"Sending email to {email}")
headers = {
"X-FreeScout-API-Key": os.getenv("FREESCOUT_API_KEY"),
}
sender = f"{data['createdBy']['firstName']} "
sender += f"{data['createdBy']['lastName']} ({data['createdBy']['email']})"
message = data["_embedded"]["threads"][0]["body"]
body = {
"type": "email",
"mailboxId": os.getenv("FREESCOUT_MAILBOX"),
"subject": f"New email from {sender}",
"customer": {
"email": email
},
"threads": [
{
"text": f"You have a new email from {sender}\n\n{message}",
"type": "message",
"user": 1
}
],
"imported": False,
"status": "closed",
}
req = requests.post(
"https://ticket.woodburn.au/api/conversations", json=body, headers=headers)
if req.status_code != 201:
print(f"Failed to send email")
print(req.text)
def send_webhook(webhook, data):
print(f"Sending webhook to {webhook}")
sender = f"{data['createdBy']['firstName']} "
sender += f"{data['createdBy']['lastName']} ({data['createdBy']['email']})"
message = data["_embedded"]["threads"][0]["body"]
message = md(message)
trimmed = len(message) > 3999
# Only keep 4000 chars
message = message[:4000]
subject = data["subject"]
body = {
"content": "You have a new email",
"embeds": [
{
"title": subject,
"description": message,
"url": "https://email.hns.au",
"color": 5814783,
"author": {
"name": sender
}
}
],
"username": "HNS Email",
"avatar_url": "https://hns.au/favicon.png",
"attachments": []
}
if trimmed:
body["embeds"][0]["footer"] = {
"text": "Email has been trimmed"
}
req = requests.post(
webhook, json=body)
@app.route("/refresh")
def refresh():
# Check if user is logged in
token = request.cookies.get("token")
if not token:
return redirect("/")
# Get user info
user = get_user(token)
if not user:
return redirect("/logout")
clear_cache()
return redirect("/inbox")
def clear_cache():
get_email.cache_clear()
@cache
def get_user(token):
req = requests.get(f"https://login.hns.au/auth/user?token={token}")
if req.status_code != 200:
return False
return req.json()
@cache
def get_email(email:str):
altemail:str = f"{email.split('@')[0]}@e.hns.au"
params = {
"embed": "threads",
"mailboxId": os.getenv("FREESCOUT_MAILBOX"),
"status": "active",
"type": "email",
"sortField": "updatedAt",
"sortOrder": "desc",
"page": "1",
"pageSize": "100"
}
headers = {
"X-FreeScout-API-Key": os.getenv("FREESCOUT_API_KEY"),
}
conversations = requests.get(
f"https://ticket.woodburn.au/api/conversations", json=params, headers=headers)
if conversations.status_code != 200:
return jsonify({"email": email, "error": "Could not get conversations"}, status=400)
threads = []
conversations = conversations.json()
for conversation in conversations["_embedded"]["conversations"]:
addresses = conversation["cc"]
for address in addresses:
if address == email or address == altemail:
threads.append(parse_email(conversation))
return {"email": email, "emailalt": altemail,"conversations": threads}
@cache
def get_conversation(id):
params = {
"number": id,
"embed": "threads",
"mailboxId": os.getenv("FREESCOUT_MAILBOX"),
"status": "active",
"type": "email",
"sortField": "updatedAt",
"sortOrder": "desc",
"page": "1",
"pageSize": "100",
}
headers = {
"X-FreeScout-API-Key": os.getenv("FREESCOUT_API_KEY"),
}
conversations = requests.get(
f"https://ticket.woodburn.au/api/conversations", json=params, headers=headers)
if conversations.status_code != 200:
return {"conversations": []}
threads = []
conversations = conversations.json()
for conversation in conversations["_embedded"]["conversations"]:
threads.append(parse_email(conversation))
return threads
@app.route("/<path:path>")
def catch_all(path: str):
if os.path.isfile("templates/" + path):
return render_template(path)
# Try with .html
if os.path.isfile("templates/" + path + ".html"):
return render_template(path + ".html")
if os.path.isfile("templates/" + path.strip("/") + ".html"):
return render_template(path.strip("/") + ".html")
# Try to find a file matching
if path.count("/") < 1:
# Try to find a file matching
filename = find(path, "templates")
if filename:
return send_file(filename)
return render_template("404.html"), 404
# endregion
def parse_email(conversation):
messages = []
threads = conversation["_embedded"]["threads"]
for thread in threads:
messages.append({
"from": thread["createdBy"]["email"],
"name": f'{thread["createdBy"]["firstName"]} {thread["createdBy"]["lastName"]}',
"body": thread["body"],
"date": thread["createdAt"],
"to": thread["to"],
"subject": conversation["subject"]
})
return {"messages": messages, "id": conversation["id"]}
# region Error Catching
# 404 catch all
@app.errorhandler(404)
def not_found(e):
return render_template("404.html"), 404
# endregion
if __name__ == "__main__":
app.run(debug=True, port=5000, host="0.0.0.0")