generated from nathanwoodburn/python-webserver-template
All checks were successful
Build Docker / BuildImage (push) Successful in 30s
493 lines
13 KiB
Python
493 lines
13 KiB
Python
from functools import cache
|
|
import json
|
|
from flask import (
|
|
Flask,
|
|
make_response,
|
|
redirect,
|
|
request,
|
|
jsonify,
|
|
render_template,
|
|
send_from_directory,
|
|
send_file,
|
|
)
|
|
import os
|
|
import json
|
|
import requests
|
|
from datetime import datetime
|
|
import dotenv
|
|
from functools import cache
|
|
from markdownify import markdownify as md
|
|
|
|
dotenv.load_dotenv()
|
|
|
|
app = Flask(__name__)
|
|
|
|
notification_file = "/data/notifications.json"
|
|
if not os.path.exists("/data"):
|
|
notification_file = "./notifications.json"
|
|
|
|
|
|
def find(name, path):
|
|
for root, dirs, files in os.walk(path):
|
|
if name in files:
|
|
return os.path.join(root, name)
|
|
|
|
# Assets routes
|
|
|
|
|
|
@app.route("/assets/<path:path>")
|
|
def send_assets(path):
|
|
if path.endswith(".json"):
|
|
return send_from_directory(
|
|
"templates/assets", path, mimetype="application/json"
|
|
)
|
|
|
|
if os.path.isfile("templates/assets/" + path):
|
|
return send_from_directory("templates/assets", path)
|
|
|
|
# Try looking in one of the directories
|
|
filename: str = path.split("/")[-1]
|
|
if (
|
|
filename.endswith(".png")
|
|
or filename.endswith(".jpg")
|
|
or filename.endswith(".jpeg")
|
|
or filename.endswith(".svg")
|
|
):
|
|
if os.path.isfile("templates/assets/img/" + filename):
|
|
return send_from_directory("templates/assets/img", filename)
|
|
if os.path.isfile("templates/assets/img/favicon/" + filename):
|
|
return send_from_directory("templates/assets/img/favicon", filename)
|
|
|
|
return render_template("404.html"), 404
|
|
|
|
|
|
# region Special routes
|
|
@app.route("/favicon.png")
|
|
def faviconPNG():
|
|
return send_from_directory("templates/assets/img", "favicon.png")
|
|
|
|
|
|
@app.route("/.well-known/<path:path>")
|
|
def wellknown(path):
|
|
# Try to proxy to https://nathan.woodburn.au/.well-known/
|
|
req = requests.get(f"https://nathan.woodburn.au/.well-known/{path}")
|
|
return make_response(
|
|
req.content, 200, {"Content-Type": req.headers["Content-Type"]}
|
|
)
|
|
|
|
|
|
# endregion
|
|
|
|
|
|
# region Main routes
|
|
@app.route("/")
|
|
def index():
|
|
# Check if user is logged in
|
|
token = request.cookies.get("token")
|
|
if token:
|
|
return redirect("/inbox")
|
|
|
|
return render_template("login.html", login=f"https://login.hns.au/auth?return={request.url}login")
|
|
|
|
|
|
@app.route("/login")
|
|
def login():
|
|
# Get username and token from args
|
|
token = request.args.get("token")
|
|
# Set token cookie
|
|
response = make_response(redirect("/refresh"))
|
|
response.set_cookie("token", token)
|
|
return response
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
# Delete token cookie
|
|
response = make_response(redirect("/"))
|
|
response.set_cookie("token", "")
|
|
return response
|
|
|
|
|
|
@app.route("/inbox")
|
|
def inbox():
|
|
# Check if user is logged in
|
|
token = request.cookies.get("token")
|
|
if not token:
|
|
return redirect("/")
|
|
|
|
# Get user info
|
|
|
|
user = get_user(token)
|
|
if not user:
|
|
return redirect("/logout")
|
|
|
|
email = user["hemail"]
|
|
return render_template("inbox.html", emails=get_email(email))
|
|
|
|
|
|
@app.route("/delete/<path:path>")
|
|
def delete(path):
|
|
# Check if user is logged in
|
|
token = request.cookies.get("token")
|
|
if not token:
|
|
return redirect("/")
|
|
|
|
# Get user info
|
|
user = get_user(token)
|
|
if not user:
|
|
return redirect("/logout")
|
|
|
|
email = user["hemail"]
|
|
delete_email(path, email)
|
|
return redirect("/inbox")
|
|
|
|
|
|
@app.route("/notifications")
|
|
def notifications():
|
|
# Check if user is logged in
|
|
token = request.cookies.get("token")
|
|
if not token:
|
|
return redirect("/")
|
|
|
|
# Get user info
|
|
user = get_user(token)
|
|
if not user:
|
|
return redirect("/logout")
|
|
|
|
email = user["hemail"]
|
|
notification = get_notification_settings(email)
|
|
if not notification:
|
|
return render_template("notifications.html", email=email)
|
|
return render_template("notifications.html", email=email, webhook=notification["webhook"], email_alert=notification["email"])
|
|
|
|
|
|
@app.route("/notifications", methods=["POST"])
|
|
def notification_set():
|
|
# Check if user is logged in
|
|
token = request.cookies.get("token")
|
|
if not token:
|
|
return redirect("/")
|
|
|
|
# Get user info
|
|
user = get_user(token)
|
|
if not user:
|
|
return redirect("/logout")
|
|
|
|
email = user["hemail"]
|
|
webhook = request.form.get("webhook")
|
|
email_alert = request.form.get("email")
|
|
|
|
save_notification_settings(email, webhook, email_alert)
|
|
|
|
return redirect("/notifications")
|
|
|
|
|
|
def save_notification_settings(email, webhook, email_alert):
|
|
if not os.path.exists(notification_file):
|
|
with open(notification_file, "w") as f:
|
|
json.dump({
|
|
email: {
|
|
"webhook": webhook,
|
|
"email": email_alert
|
|
}
|
|
}, f, indent=4)
|
|
|
|
with open(notification_file, "r") as f:
|
|
data = json.load(f)
|
|
data[email] = {
|
|
"webhook": webhook,
|
|
"email": email_alert
|
|
}
|
|
with open(notification_file, "w") as f:
|
|
json.dump(data, f, indent=4)
|
|
|
|
|
|
def get_notification_settings(email):
|
|
if not os.path.exists(notification_file):
|
|
return False
|
|
|
|
# Reformat email
|
|
email = email.split('@')[0] + '@login.hns.au'
|
|
|
|
with open(notification_file, "r") as f:
|
|
data = json.load(f)
|
|
if email not in data:
|
|
return {"webhook": "", "email": ""}
|
|
return data[email]
|
|
|
|
|
|
def delete_email(conversation_id, email):
|
|
# Get email by id
|
|
conversation = get_conversation(conversation_id)
|
|
if not conversation:
|
|
print("No conversation found")
|
|
return
|
|
print(conversation)
|
|
|
|
if len(conversation) < 1:
|
|
print("No conversation found")
|
|
return
|
|
|
|
headers = {
|
|
"X-FreeScout-API-Key": os.getenv("FREESCOUT_API_KEY"),
|
|
}
|
|
altemail:str = f"{email.split('@')[0]}@e.hns.au"
|
|
|
|
for thread in conversation:
|
|
for message in thread["messages"]:
|
|
if email in message["to"] or altemail in message["to"]:
|
|
requests.delete(
|
|
f"https://ticket.woodburn.au/api/conversations/{thread['id']}", headers=headers)
|
|
clear_cache()
|
|
return
|
|
|
|
|
|
@app.route("/api/email", methods=["POST"])
|
|
def newemail():
|
|
# Get the json
|
|
json = request.get_json()
|
|
|
|
# Get header x-freescout-event
|
|
event = request.headers.get("x-freescout-event")
|
|
|
|
if event == "convo.created":
|
|
send_notification(json)
|
|
elif event == "convo.customer.reply.created":
|
|
send_notification(json)
|
|
|
|
return jsonify(json)
|
|
|
|
# Removed for now
|
|
# @app.route("/api/email/<path:path>")
|
|
# def getemail(path):
|
|
# email = f'{path}@login.hns.au'
|
|
# return jsonify(get_email(email))
|
|
|
|
|
|
def send_notification(data):
|
|
emails = data["cc"]
|
|
for email in emails:
|
|
notifications = get_notification_settings(email)
|
|
if not notifications:
|
|
continue
|
|
if notifications["email"] != "":
|
|
send_email(notifications["email"], data)
|
|
if notifications["webhook"] != "":
|
|
send_webhook(notifications["webhook"], data)
|
|
|
|
|
|
def send_email(email, data):
|
|
print(f"Sending email to {email}")
|
|
headers = {
|
|
"X-FreeScout-API-Key": os.getenv("FREESCOUT_API_KEY"),
|
|
}
|
|
sender = f"{data['createdBy']['firstName']} "
|
|
sender += f"{data['createdBy']['lastName']} ({data['createdBy']['email']})"
|
|
message = data["_embedded"]["threads"][0]["body"]
|
|
|
|
body = {
|
|
"type": "email",
|
|
"mailboxId": os.getenv("FREESCOUT_MAILBOX"),
|
|
"subject": f"New email from {sender}",
|
|
"customer": {
|
|
"email": email
|
|
},
|
|
"threads": [
|
|
{
|
|
"text": f"You have a new email from {sender}\n\n{message}",
|
|
"type": "message",
|
|
"user": 1
|
|
}
|
|
],
|
|
"imported": False,
|
|
"status": "closed",
|
|
}
|
|
req = requests.post(
|
|
"https://ticket.woodburn.au/api/conversations", json=body, headers=headers)
|
|
if req.status_code != 201:
|
|
print(f"Failed to send email")
|
|
print(req.text)
|
|
|
|
|
|
def send_webhook(webhook, data):
|
|
print(f"Sending webhook to {webhook}")
|
|
sender = f"{data['createdBy']['firstName']} "
|
|
sender += f"{data['createdBy']['lastName']} ({data['createdBy']['email']})"
|
|
message = data["_embedded"]["threads"][0]["body"]
|
|
|
|
message = md(message)
|
|
|
|
trimmed = len(message) > 3999
|
|
# Only keep 4000 chars
|
|
message = message[:4000]
|
|
|
|
subject = data["subject"]
|
|
body = {
|
|
"content": "You have a new email",
|
|
"embeds": [
|
|
{
|
|
"title": subject,
|
|
"description": message,
|
|
"url": "https://email.hns.au",
|
|
"color": 5814783,
|
|
"author": {
|
|
"name": sender
|
|
}
|
|
}
|
|
],
|
|
"username": "HNS Email",
|
|
"avatar_url": "https://hns.au/favicon.png",
|
|
"attachments": []
|
|
}
|
|
if trimmed:
|
|
body["embeds"][0]["footer"] = {
|
|
"text": "Email has been trimmed"
|
|
}
|
|
|
|
req = requests.post(
|
|
webhook, json=body)
|
|
|
|
@app.route("/refresh")
|
|
def refresh():
|
|
# Check if user is logged in
|
|
token = request.cookies.get("token")
|
|
if not token:
|
|
return redirect("/")
|
|
|
|
# Get user info
|
|
user = get_user(token)
|
|
if not user:
|
|
return redirect("/logout")
|
|
|
|
clear_cache()
|
|
return redirect("/inbox")
|
|
|
|
|
|
def clear_cache():
|
|
get_email.cache_clear()
|
|
|
|
@cache
|
|
def get_user(token):
|
|
req = requests.get(f"https://login.hns.au/auth/user?token={token}")
|
|
if req.status_code != 200:
|
|
return False
|
|
return req.json()
|
|
|
|
|
|
@cache
|
|
def get_email(email:str):
|
|
altemail:str = f"{email.split('@')[0]}@e.hns.au"
|
|
params = {
|
|
"embed": "threads",
|
|
"mailboxId": os.getenv("FREESCOUT_MAILBOX"),
|
|
"status": "active",
|
|
"type": "email",
|
|
"sortField": "updatedAt",
|
|
"sortOrder": "desc",
|
|
"page": "1",
|
|
"pageSize": "100"
|
|
}
|
|
headers = {
|
|
"X-FreeScout-API-Key": os.getenv("FREESCOUT_API_KEY"),
|
|
}
|
|
|
|
conversations = requests.get(
|
|
f"https://ticket.woodburn.au/api/conversations", json=params, headers=headers)
|
|
|
|
if conversations.status_code != 200:
|
|
return jsonify({"email": email, "error": "Could not get conversations"}, status=400)
|
|
|
|
threads = []
|
|
conversations = conversations.json()
|
|
for conversation in conversations["_embedded"]["conversations"]:
|
|
addresses = conversation["cc"]
|
|
for address in addresses:
|
|
if address == email or address == altemail:
|
|
threads.append(parse_email(conversation))
|
|
|
|
return {"email": email, "emailalt": altemail,"conversations": threads}
|
|
|
|
|
|
@cache
|
|
def get_conversation(id):
|
|
params = {
|
|
"number": id,
|
|
"embed": "threads",
|
|
"mailboxId": os.getenv("FREESCOUT_MAILBOX"),
|
|
"status": "active",
|
|
"type": "email",
|
|
"sortField": "updatedAt",
|
|
"sortOrder": "desc",
|
|
"page": "1",
|
|
"pageSize": "100",
|
|
|
|
}
|
|
headers = {
|
|
"X-FreeScout-API-Key": os.getenv("FREESCOUT_API_KEY"),
|
|
}
|
|
|
|
conversations = requests.get(
|
|
f"https://ticket.woodburn.au/api/conversations", json=params, headers=headers)
|
|
|
|
if conversations.status_code != 200:
|
|
return {"conversations": []}
|
|
|
|
threads = []
|
|
conversations = conversations.json()
|
|
for conversation in conversations["_embedded"]["conversations"]:
|
|
threads.append(parse_email(conversation))
|
|
|
|
return threads
|
|
|
|
|
|
@app.route("/<path:path>")
|
|
def catch_all(path: str):
|
|
if os.path.isfile("templates/" + path):
|
|
return render_template(path)
|
|
|
|
# Try with .html
|
|
if os.path.isfile("templates/" + path + ".html"):
|
|
return render_template(path + ".html")
|
|
|
|
if os.path.isfile("templates/" + path.strip("/") + ".html"):
|
|
return render_template(path.strip("/") + ".html")
|
|
|
|
# Try to find a file matching
|
|
if path.count("/") < 1:
|
|
# Try to find a file matching
|
|
filename = find(path, "templates")
|
|
if filename:
|
|
return send_file(filename)
|
|
|
|
return render_template("404.html"), 404
|
|
|
|
|
|
# endregion
|
|
def parse_email(conversation):
|
|
messages = []
|
|
threads = conversation["_embedded"]["threads"]
|
|
for thread in threads:
|
|
messages.append({
|
|
"from": thread["createdBy"]["email"],
|
|
"name": f'{thread["createdBy"]["firstName"]} {thread["createdBy"]["lastName"]}',
|
|
"body": thread["body"],
|
|
"date": thread["createdAt"],
|
|
"to": thread["to"],
|
|
"subject": conversation["subject"]
|
|
})
|
|
|
|
return {"messages": messages, "id": conversation["id"]}
|
|
|
|
# region Error Catching
|
|
# 404 catch all
|
|
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(e):
|
|
return render_template("404.html"), 404
|
|
|
|
|
|
# endregion
|
|
if __name__ == "__main__":
|
|
app.run(debug=True, port=5000, host="0.0.0.0")
|