from functools import cache import json from flask import ( Flask, make_response, redirect, request, jsonify, render_template, send_from_directory, send_file, ) import os import json import requests from datetime import datetime import dotenv from functools import cache from markdownify import markdownify as md dotenv.load_dotenv() app = Flask(__name__) notification_file = "/data/notifications.json" if not os.path.exists("/data"): notification_file = "./notifications.json" def find(name, path): for root, dirs, files in os.walk(path): if name in files: return os.path.join(root, name) # Assets routes @app.route("/assets/") def send_assets(path): if path.endswith(".json"): return send_from_directory( "templates/assets", path, mimetype="application/json" ) if os.path.isfile("templates/assets/" + path): return send_from_directory("templates/assets", path) # Try looking in one of the directories filename: str = path.split("/")[-1] if ( filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".svg") ): if os.path.isfile("templates/assets/img/" + filename): return send_from_directory("templates/assets/img", filename) if os.path.isfile("templates/assets/img/favicon/" + filename): return send_from_directory("templates/assets/img/favicon", filename) return render_template("404.html"), 404 # region Special routes @app.route("/favicon.png") def faviconPNG(): return send_from_directory("templates/assets/img", "favicon.png") @app.route("/.well-known/") def wellknown(path): # Try to proxy to https://nathan.woodburn.au/.well-known/ req = requests.get(f"https://nathan.woodburn.au/.well-known/{path}") return make_response( req.content, 200, {"Content-Type": req.headers["Content-Type"]} ) # endregion # region Main routes @app.route("/") def index(): # Check if user is logged in token = request.cookies.get("token") if token: return redirect("/inbox") return render_template("login.html", login=f"https://login.hns.au/auth?return={request.url}login") @app.route("/login") def login(): # Get username and token from args token = request.args.get("token") # Set token cookie response = make_response(redirect("/inbox")) response.set_cookie("token", token) return response @app.route("/logout") def logout(): # Delete token cookie response = make_response(redirect("/")) response.set_cookie("token", "") return response @app.route("/inbox") def inbox(): # Check if user is logged in token = request.cookies.get("token") if not token: return redirect("/") # Get user info user = get_user(token) if not user: return redirect("/logout") email = user["hemail"] return render_template("inbox.html", emails=get_email(email)) @app.route("/delete/") def delete(path): # Check if user is logged in token = request.cookies.get("token") if not token: return redirect("/") # Get user info user = get_user(token) if not user: return redirect("/logout") email = user["hemail"] delete_email(path, email) return redirect("/inbox") @app.route("/notifications") def notifications(): # Check if user is logged in token = request.cookies.get("token") if not token: return redirect("/") # Get user info user = get_user(token) if not user: return redirect("/logout") email = user["hemail"] notification = get_notification_settings(email) if not notification: return render_template("notifications.html", email=email) return render_template("notifications.html", email=email, webhook=notification["webhook"], email_alert=notification["email"]) @app.route("/notifications", methods=["POST"]) def notification_set(): # Check if user is logged in token = request.cookies.get("token") if not token: return redirect("/") # Get user info user = get_user(token) if not user: return redirect("/logout") email = user["hemail"] webhook = request.form.get("webhook") email_alert = request.form.get("email") save_notification_settings(email, webhook, email_alert) return redirect("/notifications") def save_notification_settings(email, webhook, email_alert): if not os.path.exists(notification_file): with open(notification_file, "w") as f: json.dump({ email: { "webhook": webhook, "email": email_alert } }, f, indent=4) with open(notification_file, "r") as f: data = json.load(f) data[email] = { "webhook": webhook, "email": email_alert } with open(notification_file, "w") as f: json.dump(data, f, indent=4) def get_notification_settings(email): if not os.path.exists(notification_file): return False with open(notification_file, "r") as f: data = json.load(f) if email not in data: return {"webhook": "", "email": ""} return data[email] def delete_email(conversation_id, email): # Get email by id conversation = get_conversation(conversation_id) if not conversation: return print(conversation) if len(conversation) < 1: return headers = { "X-FreeScout-API-Key": os.getenv("FREESCOUT_API_KEY"), } for thread in conversation: for message in thread["messages"]: if email in message["to"]: requests.delete( f"https://ticket.woodburn.au/api/conversations/{thread['id']}", headers=headers) return @app.route("/api/email", methods=["POST"]) def newemail(): # Get the json json = request.get_json() # Get header x-freescout-event event = request.headers.get("x-freescout-event") if event == "convo.created": send_notification(json) elif event == "convo.customer.reply.created": send_notification(json) return jsonify(json) @app.route("/api/email/") def getemail(path): email = f'{path}@login.hns.au' return jsonify(get_email(email)) def send_notification(data): emails = data["cc"] for email in emails: notifications = get_notification_settings(email) if not notifications: continue if notifications["email"] != "": send_email(notifications["email"], data) if notifications["webhook"] != "": send_webhook(notifications["webhook"], data) def send_email(email, data): print(f"Sending email to {email}") headers = { "X-FreeScout-API-Key": os.getenv("FREESCOUT_API_KEY"), } sender = f"{data['createdBy']['firstName']} " sender += f"{data['createdBy']['lastName']} ({data['createdBy']['email']})" message = data["_embedded"]["threads"][0]["body"] body = { "type": "email", "mailboxId": os.getenv("FREESCOUT_MAILBOX"), "subject": f"New email from {sender}", "customer": { "email": email }, "threads": [ { "text": f"You have a new email from {sender}\n\n{message}", "type": "message", "user": 1 } ], "imported": False, "status": "closed", } req = requests.post( "https://ticket.woodburn.au/api/conversations", json=body, headers=headers) if req.status_code != 201: print(f"Failed to send email") print(req.text) def send_webhook(webhook, data): print(f"Sending webhook to {webhook}") sender = f"{data['createdBy']['firstName']} { data['createdBy']['lastName']} ({data['createdBy']['email']})" message = data["_embedded"]["threads"][0]["body"] message = md(message) trimmed = len(message) > 3999 # Only keep 4000 chars message = message[:4000] subject = data["subject"] body = { "content": "You have a new email", "embeds": [ { "title": subject, "description": message, "url": "https://email.hns.au", "color": 5814783, "author": { "name": sender } } ], "username": "HNS Login Email", "avatar_url": "https://hns.au/favicon.png", "attachments": [] } if trimmed: body["embeds"][0]["footer"] = { "text": "Email has been trimmed" } req = requests.post( webhook, json=body) @cache def get_user(token): req = requests.get(f"https://login.hns.au/auth/user?token={token}") if req.status_code != 200: return False return req.json() @cache def get_email(email): params = { "embed": "threads", "mailboxId": os.getenv("FREESCOUT_MAILBOX"), "status": "active", "type": "email", "sortField": "updatedAt", "sortOrder": "desc", "page": "1", "pageSize": "100" } headers = { "X-FreeScout-API-Key": os.getenv("FREESCOUT_API_KEY"), } conversations = requests.get( f"https://ticket.woodburn.au/api/conversations", json=params, headers=headers) if conversations.status_code != 200: return jsonify({"email": email, "error": "Could not get conversations"}, status=400) threads = [] conversations = conversations.json() for conversation in conversations["_embedded"]["conversations"]: addresses = conversation["cc"] for address in addresses: if address == email: threads.append(parse_email(conversation)) return {"email": email, "conversations": threads} @cache def get_conversation(id): params = { "number": id, "embed": "threads", "mailboxId": os.getenv("FREESCOUT_MAILBOX"), "status": "active", "type": "email", "sortField": "updatedAt", "sortOrder": "desc", "page": "1", "pageSize": "100", } headers = { "X-FreeScout-API-Key": os.getenv("FREESCOUT_API_KEY"), } conversations = requests.get( f"https://ticket.woodburn.au/api/conversations", json=params, headers=headers) if conversations.status_code != 200: return {"conversations": []} threads = [] conversations = conversations.json() for conversation in conversations["_embedded"]["conversations"]: threads.append(parse_email(conversation)) return threads @app.route("/") def catch_all(path: str): if os.path.isfile("templates/" + path): return render_template(path) # Try with .html if os.path.isfile("templates/" + path + ".html"): return render_template(path + ".html") if os.path.isfile("templates/" + path.strip("/") + ".html"): return render_template(path.strip("/") + ".html") # Try to find a file matching if path.count("/") < 1: # Try to find a file matching filename = find(path, "templates") if filename: return send_file(filename) return render_template("404.html"), 404 # endregion def parse_email(conversation): messages = [] threads = conversation["_embedded"]["threads"] for thread in threads: messages.append({ "from": thread["createdBy"]["email"], "name": f'{thread["createdBy"]["firstName"]} {thread["createdBy"]["lastName"]}', "body": thread["body"], "date": thread["createdAt"], "to": thread["to"] }) return {"messages": messages, "id": conversation["id"]} # region Error Catching # 404 catch all @app.errorhandler(404) def not_found(e): return render_template("404.html"), 404 # endregion if __name__ == "__main__": app.run(debug=True, port=5000, host="0.0.0.0")