From 61cc135a6f5a7de877f80d29fabf5f3ed056d82e Mon Sep 17 00:00:00 2001 From: Nathan Woodburn Date: Fri, 25 Jul 2025 15:33:56 +1000 Subject: [PATCH] feat: Add initial telegram functions --- alerts.py | 317 +++++++++++++++++++++++++++++++++ domains.py | 1 + example.env | 4 +- main.py | 17 ++ requirements.txt | 3 +- server.py | 43 ++++- templates/account.html | 42 +++-- templates/assets/css/index.css | 4 + 8 files changed, 413 insertions(+), 18 deletions(-) diff --git a/alerts.py b/alerts.py index 271efc5..16cb4e0 100644 --- a/alerts.py +++ b/alerts.py @@ -7,6 +7,10 @@ from email.mime.text import MIMEText from email.headerregistry import Address import ssl import dotenv +import asyncio +import threading +from telegram import Update +from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes dotenv.load_dotenv() @@ -15,6 +19,12 @@ SMTP_PORT = int(os.getenv('SMTP_PORT', 465)) SMTP_USERNAME = os.getenv('SMTP_USERNAME', None) SMTP_PASSWORD = os.getenv('SMTP_PASSWORD', None) +TG_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', None) +TG_BOT_NAME = os.getenv('TELEGRAM_BOT', None) +TG_app = None +TG_bot_running = False + + NOTIFICATION_TYPES = [ { "type": "discord_webhook", @@ -39,6 +49,24 @@ NOTIFICATION_TYPES = [ } ], "description": "Send an email notification." + }, + { + "type": "telegram", + "fields": [ + { + "name": "username", + "label": "Username", + "type": "username", + "required": True + } + ], + "description": "Send a telegram notification.", + "links": [ + { + "label": "Link your Telegram account", + "url": "/telegram/link" + } + ] } ] @@ -55,6 +83,9 @@ def handle_alert(domain: str, notification: dict, alert_data: dict): elif alert_type == 'email': email(notification['email'], domain, alert_data, notification['blocks']) + elif alert_type == 'telegram': + telegram(notification['username'], domain, + alert_data, notification['blocks']) else: print(f"Unknown alert type: {alert_type} for domain: {domain}") @@ -146,3 +177,289 @@ Visit your FireAlerts account: https://alerts.firewallet.au/account/{domain} f"Connection refused to SMTP server {SMTP_SERVER}:{SMTP_PORT} - {e}") except Exception as e: print(f"Unexpected error sending email to {email_addr}: {e}") + + +async def link_tg(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + # Try to read a token + if not update.message or not update.message.text: + print("No message text found in update.") + return + + # Check to make sure that the message is in format /start + if not update.message.text.startswith('/start '): + await update.message.reply_markdown_v2("Please link your Telegram account from [FireAlerts](https://alerts.firewallet.au/telegram/link)") + return + + token = update.message.text.split(' ', 1)[1].strip() + if not token: + await update.message.reply_text("Please provide a valid token.") + return + + # Try to validate the token + user_data = requests.get(f"https://login.hns.au/auth/user?token={token}") + if user_data.status_code != 200: + await update.message.reply_text("Invalid token. Please try again.") + return + user_data = user_data.json() + user_name = user_data.get('username') + if not user_name: + await update.message.reply_text("Invalid token. Please try again.") + return + + if not os.path.exists('data'): + os.makedirs('data') + if not os.path.exists('data/telegram.json'): + with open('data/telegram.json', 'w') as f: + json.dump({}, f) + + # Load existing Telegram data + with open('data/telegram.json', 'r') as f: + telegram_data = json.load(f) + + if not update.message.from_user: + await update.message.reply_text("Could not retrieve your Telegram user information.") + return + # Update or add the user + telegram_data[user_name] = { + "user_id": update.message.from_user.id, + "username": update.message.from_user.username + } + + # Save the updated data + with open('data/telegram.json', 'w') as f: + json.dump(telegram_data, f, indent=4) + + await update.message.reply_text(f'You have linked your Telegram account with username: {user_name}. You will now receive notifications for your domains.') + + +async def ping_tg(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if not update.message: + print("No message or user found in update.") + return + await update.message.reply_text(f"Pong!") + +async def help_tg(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if not update.message: + print("No message or user found in update.") + return + help_text = ( + "Welcome to FireAlerts Telegram Bot!\n\n" + "Here are the commands you can use:\n" + "/start - Link your Telegram account with FireAlerts.\n" + "/ping - Check if the bot is running.\n" + "/help - Show this help message." + ) + await update.message.reply_text(help_text) + + +def startTGBot(mainThread: bool = False): + """ + Start the Telegram bot in a separate thread. + """ + global TG_bot_running + + if not TG_BOT_TOKEN or not TG_BOT_NAME: + print( + "Telegram bot token or name not set. Notifications via Telegram will not work.") + return + + if TG_bot_running: + print("Telegram bot is already running.") + return + + # Check if this is the Flask reloader process (only skip if not main thread) + if os.environ.get('WERKZEUG_RUN_MAIN') != 'true' and not mainThread: + print("Skipping Telegram bot start in Flask reloader process.") + return + + def run_bot(): + """Run the bot in a separate thread with its own event loop.""" + global TG_bot_running + TG_bot_running = True + loop = None + + try: + # Create a new event loop for this thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + global TG_app + if TG_app is None: + if not TG_BOT_TOKEN: + print("Telegram bot token is not set. Cannot start bot.") + return + + TG_app = ApplicationBuilder().token(TG_BOT_TOKEN).build() + + TG_app.add_handler(CommandHandler("start", link_tg)) + TG_app.add_handler(CommandHandler("ping", ping_tg)) + TG_app.add_handler(CommandHandler("help", help_tg)) + print("Starting Telegram bot...") + + # Use start_polling and idle instead of run_polling + async def start_bot(): + if not TG_app: + print("Telegram app is not initialized. Cannot start bot.") + return + + retry_count = 0 + max_retries = 5 + + while TG_bot_running and retry_count < max_retries: + try: + await TG_app.initialize() + await TG_app.start() + if not TG_app.updater: + print("Telegram app updater is not initialized. Cannot start bot.") + return + + # Start polling with error handling + await TG_app.updater.start_polling( + drop_pending_updates=True, + allowed_updates=["message"], + timeout=30 + ) + print("Telegram bot is now running...") + retry_count = 0 # Reset retry count on successful start + + # Keep the bot running + while TG_bot_running: + await asyncio.sleep(1) + + except Exception as e: + print(f"Telegram bot error (attempt {retry_count + 1}/{max_retries}): {e}") + retry_count += 1 + + if retry_count < max_retries and TG_bot_running: + wait_time = min(2 ** retry_count, 60) # Exponential backoff, max 60 seconds + print(f"Retrying in {wait_time} seconds...") + await asyncio.sleep(wait_time) + else: + print("Max retries reached or bot stopped. Exiting.") + break + + finally: + try: + if TG_app: + await TG_app.stop() + await TG_app.shutdown() + except Exception as e: + print(f"Error stopping Telegram app: {e}") + + # Run the bot + loop.run_until_complete(start_bot()) + + except Exception as e: + print(f"Error running Telegram bot: {e}") + finally: + TG_bot_running = False + try: + if loop and not loop.is_closed(): + loop.close() + except Exception as e: + print(f"Error closing event loop: {e}") + + # Start the bot in a daemon thread so it doesn't prevent the main program from exiting + bot_thread = threading.Thread(target=run_bot, daemon=True) + bot_thread.start() + print("Telegram bot started in background thread") + + +def stopTGBot(): + """ + Stop the Telegram bot. + """ + global TG_bot_running + TG_bot_running = False + print("Stopping Telegram bot...") + + +def telegram(username: str, domain: str, content: dict, alert_blocks: int): + """ + Send a Telegram notification. + """ + # Load Telegram user data + if not os.path.exists('data/telegram.json'): + print( + f"No Telegram data file found. Cannot send notification to {username}") + return + + try: + with open('data/telegram.json', 'r') as f: + telegram_data = json.load(f) + except Exception as e: + print(f"Error reading Telegram data: {e}") + return + + if username not in telegram_data: + print( + f"Username {username} not found in Telegram data. User needs to link their account.") + return + + user_id = telegram_data[username].get('user_id') + if not user_id: + print(f"No user_id found for username {username}") + return + + # Create the message + message = f"""🔥 *FireAlerts Notification* + +Domain: `{domain}` +Expires in: *{content['blocks']} blocks* (~{content['time']}) +Alert threshold: {alert_blocks} blocks + +[Open your FireAlerts account](https://alerts.firewallet.au/account/{domain})""" + + # Send the message in a separate thread with its own bot instance + def send_telegram_message(): + loop = None + local_app = None + try: + # Create a new event loop for this thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + async def send_message(): + nonlocal local_app + try: + if not TG_BOT_TOKEN: + print("Telegram bot token is not set. Cannot send message.") + return + + # Create a new bot instance for this thread + local_app = ApplicationBuilder().token(TG_BOT_TOKEN).build() + await local_app.initialize() + + await local_app.bot.send_message( + chat_id=user_id, + text=message, + parse_mode='Markdown', + disable_web_page_preview=True + ) + print( + f"Telegram notification sent to {username} (ID: {user_id}) for domain {domain}") + except Exception as e: + print(f"Error sending Telegram message to {username}: {e}") + finally: + if local_app: + try: + await local_app.shutdown() + except Exception as e: + print(f"Error shutting down local Telegram app: {e}") + + # Run the async function + loop.run_until_complete(send_message()) + + except Exception as e: + print(f"Error in Telegram message thread: {e}") + finally: + try: + if loop and not loop.is_closed(): + loop.close() + except Exception as e: + print(f"Error closing Telegram message loop: {e}") + + # Start the message sending in a daemon thread + message_thread = threading.Thread( + target=send_telegram_message, daemon=True) + message_thread.start() diff --git a/domains.py b/domains.py index 8c60fbe..a3ed499 100644 --- a/domains.py +++ b/domains.py @@ -18,6 +18,7 @@ HSD_PORTS = { HSD_PORT = HSD_PORTS.get(HSD_NETWORK, 12037) HSD_URL_FULL = f'http://x:{HSD_API_KEY}@{HSD_URL}:{HSD_PORT}' if HSD_API_KEY else f'http://{HSD_URL}:{HSD_PORT}' +print(f"Using HSD URL: {HSD_URL_FULL}") if not os.path.exists('data'): os.makedirs('data') diff --git a/example.env b/example.env index 83c765c..8633ce0 100644 --- a/example.env +++ b/example.env @@ -4,4 +4,6 @@ HSD_API_KEY=your_api_key_here SMTP_SERVER=smtp.hostname.com SMTP_PORT=465 SMTP_USERNAME=noreply@email.au -SMTP_PASSWORD=Secretpassword123 \ No newline at end of file +SMTP_PASSWORD=Secretpassword123 +TELEGRAM_BOT_TOKEN=your_telegram_bot_token_here +TELEGRAM_BOT=telegrambotname \ No newline at end of file diff --git a/main.py b/main.py index a254599..50f96e0 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ import dotenv import threading import time import domains +from alerts import startTGBot, stopTGBot class GunicornApp(BaseApplication): @@ -38,6 +39,19 @@ def run_expiry_checker(): # Wait 2 minutes (120 seconds) time.sleep(120) +def post_worker_init(worker): + """ + Called just after a worker has been forked. + Start the Telegram bot in each worker process. + """ + print(f"Starting Telegram bot in worker {worker.pid}") + startTGBot(mainThread=True) + + # Register cleanup function for this worker + import atexit + atexit.register(stopTGBot) + + if __name__ == '__main__': dotenv.load_dotenv() @@ -46,6 +60,8 @@ if __name__ == '__main__': expiry_thread.start() print("Started background expiry checker thread") + # Don't start the Telegram bot here - it will be started in worker processes + workers = os.getenv('WORKERS', 1) threads = os.getenv('THREADS', 2) workers = int(workers) @@ -55,6 +71,7 @@ if __name__ == '__main__': 'bind': '0.0.0.0:5000', 'workers': workers, 'threads': threads, + 'post_worker_init': post_worker_init, } gunicorn_app = GunicornApp(server.app, options) diff --git a/requirements.txt b/requirements.txt index 8cf7fcb..690b761 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ flask gunicorn requests -python-dotenv \ No newline at end of file +python-dotenv +python-telegram-bot \ No newline at end of file diff --git a/server.py b/server.py index 25181c7..d8e821a 100644 --- a/server.py +++ b/server.py @@ -18,7 +18,8 @@ import dotenv import threading import time import domains -from alerts import NOTIFICATION_TYPES +import atexit +from alerts import NOTIFICATION_TYPES, startTGBot, stopTGBot, handle_alert dotenv.load_dotenv() @@ -209,6 +210,20 @@ def delete_notification(notification_id: str): domains.delete_notification(notification_id, user_data['username']) return redirect(f"{request.host_url}account") +@app.route("/telegram/link") +def telegram_link(): + """ + Redirect to Telegram login. + """ + token = request.cookies.get("token") + if not token: + return redirect(f"https://login.hns.au/auth?return={request.host_url}login") + + TG_NAME = os.getenv("TELEGRAM_BOT", None) + if not TG_NAME: + return jsonify({"error": "Telegram bot name not configured"}), 500 + + return redirect(f"https://t.me/{TG_NAME}?start={token}") @app.route("/account/") def account_domain(domain: str): @@ -327,8 +342,28 @@ def api_add_notification(token: str): # endregion +@app.route("/test") +def test(): + """ + Test route to check if the server is running. + """ + user = request.args.get("user", "nathan.woodburn") + domain_data = { + "blocks": 1008, + "time": f"{1008 // 144} days" # Assuming 144 blocks per day + } + notification = { + "username": "nathan.woodburn", + "blocks": 1008, + "type": "telegram", + "id": "f8b5ad1222b9fe636911421147392385", + "user_name": "nathan.woodburn" + } + handle_alert("woodburn", notification, domain_data) + return jsonify({"message": "Server is running"}) + # region Error Catching # 404 catch all @app.errorhandler(404) @@ -343,4 +378,10 @@ if __name__ == "__main__": expiry_thread.start() print("Started background expiry checker thread") + # Start the Telegram bot in a separate thread (only in main process) + startTGBot() + + # Register cleanup function + atexit.register(stopTGBot) + app.run(debug=True, port=5000, host="127.0.0.1") diff --git a/templates/account.html b/templates/account.html index faced9f..97455cb 100644 --- a/templates/account.html +++ b/templates/account.html @@ -7,7 +7,7 @@ FireAlerts - + @@ -36,13 +36,13 @@

Type: {{notification.notification.type.replace('_', ' ').title()}}

Blocks before expiry: {{notification.notification.blocks}}

{% for notificationType in NOTIFICATION_TYPES %} - {% if notificationType.type == notification.notification.type %} - {% for field in notificationType.fields %} -

{{field.label}}: {{notification.notification[field.name]}}

- {% endfor %} - {% endif %} + {% if notificationType.type == notification.notification.type %} + {% for field in notificationType.fields %} +

{{field.label}}: {{notification.notification[field.name]}}

{% endfor %} - + {% endif %} + {% endfor %} + Delete @@ -72,15 +72,19 @@ {% for field in notificationType.fields %} + {% if field.type == 'username' %} + + {% else %}
- +
+ {% endif %} {% endfor %} @@ -88,10 +92,18 @@ -

Note: 144 blocks is approximately 1 day, 1008 blocks is approximately 1 week, and 4320 blocks is approximately 1 month.

+

Note: 144 blocks is approximately 1 day, 1008 blocks is approximately 1 week, and 4320 + blocks is approximately 1 month.

- + + + {% if notificationType.links %} + {% for link in notificationType.links %} + {{link.label}} + {% endfor %} + {% endif %} {% endfor %} diff --git a/templates/assets/css/index.css b/templates/assets/css/index.css index d170ffe..4e6c96c 100644 --- a/templates/assets/css/index.css +++ b/templates/assets/css/index.css @@ -129,4 +129,8 @@ span.user { .notification-item p:contains("ID") { font-family: monospace; font-size: 12px; +} + +.hidden { + display: none; } \ No newline at end of file