import os from dotenv import load_dotenv import discord from discord import app_commands import requests import dns.resolver import markdownify import subprocess import tempfile import re import binascii from cryptography import x509 from cryptography.hazmat.backends import default_backend import datetime import chatai import tools from tools import parse_time, read_reminders, store_reminder, write_reminders import asyncio from discord.ext import tasks, commands from discord.ext.commands import has_permissions, MissingPermissions import support load_dotenv() TOKEN = os.getenv('DISCORD_TOKEN') ADMINID = 0 KUTT_APIKEY=os.getenv('LINK_API_KEY') KUTT_URL=os.getenv('LINK_URL') LOG_CHANNEL = int(os.getenv('LOG_CHANNEL')) intents = discord.Intents.default() client = discord.Client(intents=intents) tree = app_commands.CommandTree(client) activityMessage="over the server" statusType="watching" # Commands @tree.command(name="ping", description="Check bot connection") async def ping(ctx): await ctx.response.send_message("Pong!",ephemeral=True) @tree.command(name="shortlink", description="Shorten a link") async def shortlink(ctx, link: str, name: str = None): if (ctx.user.id != ADMINID): await log("User: " + str(ctx.user.name) + " tried to use the shortlink command") await ctx.response.send_message("You don't have permission to use this command",ephemeral=True) else: url=f"{KUTT_URL}/api/v2/links" headers = {'X-API-KEY' : KUTT_APIKEY} data = {'target' : link, 'customurl' : name} if (name == None): data = {'target' : link} x = requests.post(url, data = data, headers = headers) if (x.status_code != 200 and x.status_code != 201): await ctx.response.send_message("ERROR: " + x.text,ephemeral=True) link=x.json()['link'] await ctx.response.send_message("Link: " + link,ephemeral=False) @tree.command(name="botstatus", description="Set the bot status") async def botstatus(ctx, message: str, statusmethod: str = None): if (ctx.user.id != ADMINID): await log("User: " + str(ctx.user.name) + " tried to use the botstatus command") await ctx.response.send_message("You don't have permission to use this command",ephemeral=True) else: global activityMessage activityMessage=message global statusType if (statusmethod == None): statusmethod="watching" else: statusType=statusmethod.lower() updateStatus() await ctx.response.send_message("Status updated",ephemeral=True) @tree.command(name="dig", description="Dig a dns record") async def dig(ctx, domain: str, record_type: str = "A"): record_type = record_type.upper() resolver = dns.resolver.Resolver() resolver.nameservers = ["10.2.1.15"] resolver.port = 5350 try: # Query the DNS record response = resolver.resolve(domain, record_type) records = "" for record in response: records = records + "\n" + str(record) # Send the result to the Discord channel await ctx.response.send_message(f"DNS records for {domain} ({record_type}):{records}") except dns.resolver.NXDOMAIN: await ctx.response.send_message(f"Domain {domain} not found.") except dns.exception.DNSException as e: await ctx.response.send_message(f"An error occurred: {e}") @tree.command(name="curl", description="HTTP request") async def curl(ctx, url: str): try: proxyURL = "https://proxy.hnsproxy.au" response = requests.get(url, proxies={"http": proxyURL, "https": proxyURL},verify=False) output = response.text # Get BODY only output = output.split("")[0] output = output.split(">", 1)[1] # Replace any relative links with absolute links output = output.replace('href="/', 'href="' + url + '/') parsed = markdownify.markdownify(output, heading_style="ATX") # Delete any empty lines parsed = "\n".join([s for s in parsed.splitlines() if s.strip()]) output = response.text # Get title if (output.find("") != -1): title = output.split("<title>")[1] title = title.split("")[0] else: title = url if (len(parsed) > 4096): parsed = parsed[:4096] # Delete any incomplete lines parsed = "\n".join(parsed.splitlines()[:-1]) parsed = parsed + "\n..." # if url is a tld only replace it with the url https://hns.au (due to Discord not allowing tld only links) if (url.find(".") == -1): url = "https://hns.au" # Create an embed embed = discord.Embed(title=title, url=url, description=parsed) embed.set_footer(text="Parsed by HNSProxy",icon_url="https://hns.au/assets/img/favicon.png") embed.timestamp = discord.utils.utcnow() await ctx.response.send_message(embed=embed) except requests.exceptions.RequestException as e: await ctx.response.send_message(f"An error occurred: {e}") except Exception as e: await ctx.response.send_message(f"An error occurred: {e}") @tree.command(name="ssl", description="Check SSL certificate") async def ssl(ctx, domain: str, showcert: bool = False, notifymeonexpiry: bool = False): # Verify that the domain is valid if not domain: await ctx.response.send_message("Please provide a domain to check") return regexmatch = re.match(r"^([a-z0-9]+(-[a-z0-9]+)*\.)*([a-z0-9]+(-[a-z0-9]+)*)$", domain) if not regexmatch: await ctx.response.send_message("Please provide a valid domain to check") return await ctx.response.send_message(f"Checking SSL certificate for {domain}...") message = "" resolver = dns.resolver.Resolver() resolver.nameservers = ["10.2.1.15"] resolver.port = 5350 domain_check = False try: # Query the DNS record response = resolver.resolve(domain, "A") records = [] message = "## A records:\n" for record in response: records.append(str(record)) message = message + "- " +str(record) + "\n" if not records: await ctx.channel.send(f"No A record found for {domain}") return # Get the first A record ip = records[0] # Run the openssl s_client command s_client_command = ["openssl","s_client","-showcerts","-connect",f"{ip}:443","-servername",domain,] s_client_process = subprocess.Popen(s_client_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) s_client_output, _ = s_client_process.communicate(input=b"\n") certificates = [] current_cert = "" for line in s_client_output.split(b"\n"): current_cert += line.decode("utf-8") + "\n" if "-----END CERTIFICATE-----" in line.decode("utf-8"): certificates.append(current_cert) current_cert = "" # Remove anything before -----BEGIN CERTIFICATE----- certificates = [cert[cert.find("-----BEGIN CERTIFICATE-----"):] for cert in certificates] if certificates: cert = certificates[0] if showcert: message = message + "\n## Website Certificate:\n```\n" + cert + "\n```\n" with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_cert_file: temp_cert_file.write(cert) temp_cert_file.seek(0) # Move back to the beginning of the temporary file tlsa_command = ["openssl","x509","-in",temp_cert_file.name,"-pubkey","-noout","|","openssl","pkey","-pubin","-outform","der","|","openssl","dgst","-sha256","-binary",] tlsa_process = subprocess.Popen(" ".join(tlsa_command), shell=True, stdout=subprocess.PIPE) tlsa_output, _ = tlsa_process.communicate() tlsa_server = "3 1 1 " + binascii.hexlify(tlsa_output).decode("utf-8") message = message + "\n## TLSA Record from webserver:\n`" + tlsa_server + "`\n" # Get domains cert_obj = x509.load_pem_x509_certificate(cert.encode("utf-8"), default_backend()) domains = [] for ext in cert_obj.extensions: if ext.oid == x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME: san_list = ext.value.get_values_for_type(x509.DNSName) domains.extend(san_list) # Extract the common name (CN) from the subject common_name = cert_obj.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) if common_name: if common_name[0].value not in domains: domains.append(common_name[0].value) if domains: message = message + "\n## SSL Domains:\n" for cn in domains: message = message + "- " + cn + "\n" if domain in domains: domain_check = True expiry_date = cert_obj.not_valid_after # Check if expiry date is past if expiry_date < datetime.datetime.utcnow(): message = message + "\n## Expiry Date:\n:x: Certificate has expired\n" elif expiry_date < datetime.datetime.utcnow() + datetime.timedelta(days=7): message = message + "\n## Expiry Date:\n:warning: Certificate expires soon\n" else: message = message + "\n## Expiry Date:\n:white_check_mark: Certificate is valid\n" message = message + expiry_date.strftime("%d %B %Y %H:%M:%S") + "\n" else: message = message + "\n## Website Certificate:\n:x: No certificate found\n" message = message + "\n## TLSA Record from webserver:\n:x: No certificate found\n" try: # Check for TLSA record response = resolver.resolve("_443._tcp."+domain, "TLSA") tlsa_records = [] message = message + "\n## TLSA Records from DNS:\n" for record in response: tlsa_records.append(str(record)) message = message + "- " +str(record) + "\n" if not tlsa_records: message = message + "\n## Result:\n:x: No TLSA record found\n" else: if tlsa_server == tlsa_records[0]: if domain_check: message = message + "\n## Result:\n:white_check_mark: TLSA record matches certificate\n" else: message = message + "\n## Result:\n:x: TLSA record matches certificate but domain does not match\n" else: message = message + "\n## Result:\n:x: TLSA record does not match certificate\n" except: message = message + "\n## TLSA Records from DNS:\n:x: No TLSA record found\n" message = message + "\n## Result:\n:x: No TLSA record found\n" # If message is too long, send it in 2 messages if (len(message) > 2000): # Split on the last line under 2000 characters message1 = message[:2000] message1 = message1[:message1.rfind("\n")] message2 = message[len(message1):] await ctx.channel.send(message1) await ctx.channel.send(message2) else: await ctx.channel.send(message) if (notifymeonexpiry): with open("/mnt/sslnotify.txt", "a") as file: file.write(str(ctx.user.id) + "," + domain + "\n") # Catch all exceptions except Exception as e: await ctx.channel.send(f"An error occurred: {e}") @tree.command(name="invite", description="Invite me to your server") async def invite(ctx): await ctx.response.send_message("https://discord.com/api/oauth2/authorize?client_id=1006128164218621972&permissions=0&scope=bot",ephemeral=True) async def log(message): channel=client.get_channel(LOG_CHANNEL) await channel.send(message) def updateStatus(): global activityMessage global statusType if (statusType == "watching"): activity=discord.Activity(type=discord.ActivityType.watching, name=activityMessage) elif (statusType == "playing"): activity=discord.Activity(type=discord.ActivityType.playing, name=activityMessage) elif (statusType == "listening"): activity=discord.Activity(type=discord.ActivityType.listening, name=activityMessage) elif (statusType == "competing"): activity=discord.Activity(type=discord.ActivityType.competing, name=activityMessage) else: activity=discord.Activity(type=discord.ActivityType.watching, name=activityMessage) client.loop.create_task(client.change_presence(activity=activity)) @tasks.loop(hours=24) async def checkForSSLExpiry(): with open("/mnt/sslnotify.txt", "r") as file: lines = file.readlines() for line in lines: line = line.strip() if not line: print("No line", flush=True) continue userid, domain = line.split(",") print(f"Checking SSL certificate for {domain}...", flush=True) resolver = dns.resolver.Resolver() resolver.nameservers = ["10.2.1.15"] resolver.port = 5350 try: # Query the DNS record response = resolver.resolve(domain, "A") records = [] for record in response: records.append(str(record)) if not records: print(f"No A record found for {domain}", flush=True) continue # Get the first A record ip = records[0] # Run the openssl s_client command s_client_command = ["openssl","s_client","-showcerts","-connect",f"{ip}:443","-servername",domain,] s_client_process = subprocess.Popen(s_client_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) s_client_output, _ = s_client_process.communicate(input=b"\n") certificates = [] current_cert = "" for line in s_client_output.split(b"\n"): current_cert += line.decode("utf-8") + "\n" if "-----END CERTIFICATE-----" in line.decode("utf-8"): certificates.append(current_cert) current_cert = "" # Remove anything before -----BEGIN CERTIFICATE----- certificates = [cert[cert.find("-----BEGIN CERTIFICATE-----"):] for cert in certificates] if certificates: cert = certificates[0] # Get expiry date cert_obj = x509.load_pem_x509_certificate(cert.encode("utf-8"), default_backend()) expiry_date = cert_obj.not_valid_after if expiry_date < datetime.datetime.utcnow() + datetime.timedelta(days=7): user = await client.fetch_user(int(userid)) if user: await user.send(f"SSL certificate for {domain} expires soon") print(f"SSL certificate for {domain} expires soon", flush=True) else: await log(f"User {userid} not found") else: print(f"No certificate found for {domain}", flush=True) await user.send(f"No certificate found for {domain}") continue except Exception as e: print(e, flush=True) await log(e) continue print("SSL check complete", flush=True) @tree.command(name="ssldomains", description="List domains with SSL certificates") async def ssldomains(ctx): # Get user id userid = str(ctx.user.id) # Get all domains for user domains = [] with open("/mnt/sslnotify.txt", "r") as file: lines = file.readlines() for line in lines: line = line.strip() if not line: continue if line.startswith(userid): _, domain = line.split(",") domains.append(domain) if not domains: await ctx.response.send_message("You have no domains in the SSL expiry notification list",ephemeral=True) return await ctx.response.send_message("Domains in the SSL expiry notification list:\n" + "\n".join(domains),ephemeral=True) @tree.command(name="sslremove", description="Remove a domain from the SSL expiry notification list") async def sslremove(ctx, domain: str): # Get user id userid = str(ctx.user.id) # Get all domains for user domains = [] with open("/mnt/sslnotify.txt", "r") as file: lines = file.readlines() for line in lines: line = line.strip() if not line: continue if line.startswith(userid): _, dom = line.split(",") domains.append(dom) if not domains: await ctx.response.send_message("You have no domains in the SSL expiry notification list",ephemeral=True) return if domain not in domains: await ctx.response.send_message("Domain not found in the SSL expiry notification list",ephemeral=True) return with open("/mnt/sslnotify.txt", "w") as file: for line in lines: line = line.strip() if not line: continue if not line.startswith(userid): file.write(line + "\n") for dom in domains: if domain != dom: file.write(userid + "," + dom + "\n") await ctx.response.send_message("Domain removed from the SSL expiry notification list",ephemeral=True) @tree.command(name="manualsslcheck", description="Manually check SSL certificate") async def manualsslcheck(ctx): if (ctx.user.id != ADMINID): await log("User: " + str(ctx.user.name) + " tried to use the manualsslcheck command") await ctx.response.send_message("You don't have permission to use this command",ephemeral=True) await ctx.response.send_message("SSL checking",ephemeral=True) await checkForSSLExpiry() await ctx.channel.send("SSL check complete",ephemeral=True) @tree.command(name="ai", description="AI Chat") async def ai(ctx, message: str): prompt = "Name: " + str(ctx.user.name) + "\n" prompt = prompt + "Message: " + message + "\n" await ctx.response.send_message(chatai.chat(prompt)) @tree.command(name="remindme", description="Remind me") async def remindme(ctx, when: str, reminder: str, public: bool = False): time_delta = parse_time(when) if time_delta is not None: # Schedule the reminder reminder_time = datetime.datetime.now() + time_delta if not public: store_reminder(ctx.user.id, reminder_time, reminder) await ctx.response.send_message("I've set a reminder for you in " + when + ".\n" + reminder,ephemeral=True) else: store_reminder(ctx.user.id, reminder_time, reminder, True, ctx.channel.id) # await ctx.response.send_message("I've set a reminder for you in " + when + ".\n" + reminder) await ctx.response.send_message(embed=tools.embed("Reminder", "I'll send you a reminder in " + when + ".\nReason: " + reminder + "\n\nI'll send you a DM and also post a public message in <#"+str(ctx.channel.id)+">")) else: await ctx.response.send_message("Invalid time format. Please use something like `1d 3h` or `4hr`.",ephemeral=True) @tree.command(name="reminders", description="List reminders") async def reminders(ctx): reminders = read_reminders() if len(reminders) == 0: await ctx.response.send_message("You have no reminders.",ephemeral=True) else: user_reminders = [] for reminder in reminders: # Only show reminders for the user who requested them if reminder['user_id'] == str(ctx.user.id): user_reminders.append(reminder) if len(user_reminders) == 0: await ctx.response.send_message("You have no reminders.",ephemeral=True) else: message = "Reminders:\n" for reminder in user_reminders: time = datetime.datetime.strptime(reminder['time'], "%Y-%m-%d %H:%M:%S") time = tools.timestamp_relative(time) message += f"{time}: {reminder['text']}\n" await ctx.response.send_message(message,ephemeral=True) @tree.command(name="timestamp", description="Convert timestamp") async def timestamp(ctx, when: str): when = when.strip() time_delta = parse_time(when) if time_delta is not None: # Schedule the reminder time = datetime.datetime.now() + time_delta if when.endswith("ago"): time = datetime.datetime.now() - time_delta time = tools.timestamp_all_raw(time) await ctx.response.send_message(time,ephemeral=True) else: await ctx.response.send_message("Invalid time format. Please use something like `1d 3h` or `4hr`. End with `ago` to convert to past time",ephemeral=True) #region Tickets @tree.command(name="ticket", description="Create a ticket") async def ticket(ctx): if (ctx.guild == None): await ctx.response.send_message("This command can only be used in a server",ephemeral=True) return server = ctx.guild.id if not support.is_server_valid(str(server)): await ctx.response.send_message("This server is not registered",ephemeral=True) return await ctx.response.send_message("Creating ticket...",ephemeral=True) await support.create_ticket(str(ctx.user.id), str(ctx.guild.id)) @tree.command(name="ticketaddserver", description="Add a server to the ticket system") @commands.has_permissions(administrator=True) async def ticketaddserver(ctx, category: str, modrole: discord.Role, closedcategory: str): if (ctx.user.id != ADMINID): await log("User: " + str(ctx.user.name) + " tried to use the ticketAddServer command") await ctx.response.send_message("You don't have permission to use this command",ephemeral=True) else: await ctx.response.send_message("Adding server to ticket system",ephemeral=True) result = await support.ticketAddServer(ctx.guild.id, category, modrole.id,closedcategory) await ctx.channel.send(result) @tree.command(name="adduser", description="Add a user to a ticket") async def adduser(ctx, user: discord.User): if (ctx.guild == None): await ctx.response.send_message("This command can only be used in a server",ephemeral=True) return server = ctx.guild.id if not support.is_server_valid(str(server)): await ctx.response.send_message("This server is not registered",ephemeral=True) return result = await support.addMemberToTicket(user.id,str(ctx.channel.id), str(ctx.guild.id)) await ctx.response.send_message(result,ephemeral=True) @tree.command(name="removeuser", description="Remove a user from a ticket") async def removeuser(ctx, user: discord.User): if (ctx.guild == None): await ctx.response.send_message("This command can only be used in a server",ephemeral=True) return server = ctx.guild.id if not support.is_server_valid(str(server)): await ctx.response.send_message("This server is not registered",ephemeral=True) return result = await support.removeMemberFromTicket(user.id,str(ctx.channel.id), str(ctx.guild.id)) await ctx.response.send_message(result,ephemeral=True) @tree.command(name="closeticket", description="Close a ticket") async def closeticket(ctx): if (ctx.guild == None): await ctx.response.send_message("This command can only be used in a server",ephemeral=True) return server = ctx.guild.id if not support.is_server_valid(str(server)): await ctx.response.send_message("This server is not registered",ephemeral=True) return await ctx.response.send_message("Closing ticket",ephemeral=True) await support.close_ticket(str(ctx.user.id),str(ctx.channel.id), str(ctx.guild.id)) @tree.command(name="reopenticket", description="Reopen a ticket") async def reopenticket(ctx): if (ctx.guild == None): await ctx.response.send_message("This command can only be used in a server",ephemeral=True) return server = ctx.guild.id if not support.is_server_valid(str(server)): await ctx.response.send_message("This server is not registered",ephemeral=True) return await ctx.response.send_message("Reopening ticket",ephemeral=True) await support.reopen_ticket(str(ctx.user.id),str(ctx.channel.id), str(ctx.guild.id)) @tree.command(name="renameticket", description="Rename a ticket") async def renameticket(ctx, name: str): if (ctx.guild == None): await ctx.response.send_message("This command can only be used in a server",ephemeral=True) return server = ctx.guild.id if not support.is_server_valid(str(server)): await ctx.response.send_message("This server is not registered",ephemeral=True) return result = await support.rename_ticket(str(ctx.user.id),str(ctx.channel.id), str(ctx.guild.id),name) await ctx.response.send_message(result,ephemeral=True) #endregion @tasks.loop(seconds=10) async def check_reminders(): now = datetime.datetime.now() reminders = read_reminders() for reminder in reminders: reminder_time = datetime.datetime.strptime(reminder['time'], "%Y-%m-%d %H:%M:%S") if reminder_time <= now: user = await client.fetch_user(int(reminder['user_id'])) await user.send(embed=tools.embed("Reminder", reminder['text'])) if 'public' in reminder: if reminder['public']: channel = client.get_channel(int(reminder['channel_id'])) await channel.send(embed=tools.embed("Reminder", reminder['text'])) print("Reminder sent for "+str(reminder), flush=True) reminders.remove(reminder) write_reminders(reminders) # When the bot is ready @client.event async def on_ready(): global ADMINID ADMINID = client.application.owner.id support.set_client(client) await tree.sync() updateStatus() check_reminders.start() checkForSSLExpiry.start() client.run(TOKEN)