woodburn-bot/bot.py

248 lines
9.0 KiB
Python
Raw Normal View History

2023-08-11 15:55:44 +10:00
import os
from dotenv import load_dotenv
import discord
from discord import app_commands
2023-08-11 16:35:36 +10:00
import requests
2023-08-11 17:24:11 +10:00
import dns.resolver
2023-08-12 22:39:36 +10:00
import markdownify
2023-09-27 17:27:06 +10:00
import subprocess
import tempfile
2023-08-11 15:55:44 +10:00
load_dotenv()
TOKEN = os.getenv('DISCORD_TOKEN')
2023-08-12 17:14:55 +10:00
ADMINID = 0
2023-08-12 16:52:01 +10:00
KUTT_APIKEY=os.getenv('LINK_API_KEY')
KUTT_URL=os.getenv('LINK_URL')
2023-08-12 17:05:35 +10:00
LOG_CHANNEL = int(os.getenv('LOG_CHANNEL'))
2023-08-11 15:55:44 +10:00
intents = discord.Intents.default()
client = discord.Client(intents=intents)
tree = app_commands.CommandTree(client)
2023-08-11 16:35:36 +10:00
activityMessage="over the server"
statusType="watching"
2023-08-12 16:52:01 +10:00
2023-08-11 15:55:44 +10:00
# Commands
@tree.command(name="ping", description="Check bot connection")
async def ping(ctx):
await ctx.response.send_message("Pong!",ephemeral=True)
2023-08-11 16:35:36 +10:00
@tree.command(name="shortlink", description="Shorten a link")
async def shortlink(ctx, link: str, name: str = None):
2023-08-12 16:52:01 +10:00
if (ctx.user.id != ADMINID):
2023-08-11 16:55:18 +10:00
await log("User: " + str(ctx.user.name) + " tried to use the shortlink command")
2023-08-11 16:35:36 +10:00
await ctx.response.send_message("You don't have permission to use this command",ephemeral=True)
else:
2023-08-12 17:05:35 +10:00
url=f"{KUTT_URL}/api/v2/links"
2023-08-12 16:52:01 +10:00
headers = {'X-API-KEY' : KUTT_APIKEY}
2023-08-11 16:35:36 +10:00
data = {'target' : link, 'customurl' : name}
if (name == None):
data = {'target' : link}
x = requests.post(url, data = data, headers = headers)
print(x.text)
if (x.status_code != 200 and x.status_code != 201):
await ctx.response.send_message("ERROR: " + x.text,ephemeral=True)
link=x.json()['link']
await ctx.response.send_message("Link: " + link,ephemeral=False)
@tree.command(name="botstatus", description="Set the bot status")
async def botstatus(ctx, message: str, statusmethod: str = None):
2023-08-12 16:52:01 +10:00
if (ctx.user.id != ADMINID):
2023-08-11 16:55:18 +10:00
await log("User: " + str(ctx.user.name) + " tried to use the botstatus command")
2023-08-11 16:35:36 +10:00
await ctx.response.send_message("You don't have permission to use this command",ephemeral=True)
else:
global activityMessage
activityMessage=message
global statusType
if (statusmethod == None):
statusmethod="watching"
else:
statusType=statusmethod.lower()
updateStatus()
await ctx.response.send_message("Status updated",ephemeral=True)
2023-08-11 15:55:44 +10:00
2023-08-11 17:24:11 +10:00
@tree.command(name="dig", description="Dig a dns record")
async def dig(ctx, domain: str, record_type: str = "A"):
record_type = record_type.upper()
resolver = dns.resolver.Resolver()
resolver.nameservers = ["100.74.29.146"]
2023-09-02 14:58:31 +10:00
resolver.port = 53
2023-08-11 17:24:11 +10:00
try:
# Query the DNS record
2023-08-11 17:30:07 +10:00
response = resolver.resolve(domain, record_type)
2023-08-11 17:24:11 +10:00
print(response)
records = ""
for record in response:
print(record)
records = records + "\n" + str(record)
# Send the result to the Discord channel
await ctx.response.send_message(f"DNS records for {domain} ({record_type}):{records}")
except dns.resolver.NXDOMAIN:
await ctx.response.send_message(f"Domain {domain} not found.")
except dns.exception.DNSException as e:
await ctx.response.send_message(f"An error occurred: {e}")
2023-08-12 15:18:21 +10:00
@tree.command(name="curl", description="HTTP request")
async def curl(ctx, url: str):
try:
proxyURL = "https://proxy.hnsproxy.au"
response = requests.get(url, proxies={"http": proxyURL, "https": proxyURL},verify=False)
output = response.text
2023-08-12 22:39:36 +10:00
# Get BODY only
output = output.split("<body")[1]
output = output.split("</body>")[0]
output = output.split(">", 1)[1]
# Replace any relative links with absolute links
output = output.replace('href="/', 'href="' + url + '/')
parsed = markdownify.markdownify(output, heading_style="ATX")
# Delete any empty lines
parsed = "\n".join([s for s in parsed.splitlines() if s.strip()])
output = response.text
# Get title
if (output.find("<title>") != -1):
title = output.split("<title>")[1]
title = title.split("</title>")[0]
else:
title = url
if (len(parsed) > 4096):
parsed = parsed[:4096]
# Delete any incomplete lines
parsed = "\n".join(parsed.splitlines()[:-1])
parsed = parsed + "\n..."
# if url is a tld only replace it with the url https://hns.au (due to Discord not allowing tld only links)
if (url.find(".") == -1):
url = "https://hns.au"
# Create an embed
embed = discord.Embed(title=title, url=url, description=parsed)
embed.set_footer(text="Parsed by HNSProxy",icon_url="https://hns.au/assets/img/favicon.png")
embed.timestamp = discord.utils.utcnow()
await ctx.response.send_message(embed=embed)
2023-08-12 15:18:21 +10:00
except requests.exceptions.RequestException as e:
await ctx.response.send_message(f"An error occurred: {e}")
2023-08-12 22:39:36 +10:00
except Exception as e:
await ctx.response.send_message(f"An error occurred: {e}")
2023-09-27 17:27:06 +10:00
@tree.command(name="ssl", description="Check SSL certificate")
async def ssl(ctx, domain: str):
message = ""
resolver = dns.resolver.Resolver()
resolver.nameservers = ["100.74.29.146"]
resolver.port = 53
try:
# Query the DNS record
response = resolver.resolve(domain, "A")
records = []
message = "## A records:\n"
for record in response:
records.append(str(record))
message = message + "- " +str(record) + "\n"
if records.count < 1:
await ctx.response.send_message(f"No A record found for {domain}")
return
# Get the first A record
ip = records[0]
# Run the openssl s_client command
2023-09-27 17:31:22 +10:00
s_client_command = ["openssl","s_client","-showcerts","-connect",f"{ip}:443","-servername",domain,]
2023-09-27 17:27:06 +10:00
2023-09-27 17:31:22 +10:00
s_client_process = subprocess.Popen(s_client_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
s_client_output, _ = s_client_process.communicate(input=b"\n")
2023-09-27 17:27:06 +10:00
certificates = []
current_cert = ""
for line in s_client_output.split(b"\n"):
current_cert += line.decode("utf-8") + "\n"
if "-----END CERTIFICATE-----" in line.decode("utf-8"):
certificates.append(current_cert)
current_cert = ""
if certificates:
cert = certificates[0]
message = message + "\n## Website Certificate:\n`" + cert + "`\n"
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_cert_file:
temp_cert_file.write(cert)
temp_cert_file.seek(0) # Move back to the beginning of the temporary file
tlsa_command = [
"openssl",
"x509",
"-in",
temp_cert_file.name,
"-pubkey",
"-noout",
"|",
"openssl",
"pkey",
"-pubin",
"-outform",
"der",
"|",
"openssl",
"dgst",
"-sha256",
"-binary",
"|",
"xxd",
"-p",
"-u",
"-c",
"32",
]
tlsa_process = subprocess.Popen(" ".join(tlsa_command), shell=True, stdout=subprocess.PIPE)
tlsa_output, _ = tlsa_process.communicate()
message = message + "\n## TLSA Record from webserver: `3 1 1 " + tlsa_output.decode("utf-8") + "`\n"
await ctx.response.send_message(message)
else:
ctx.response.send_message(f"No certificate found for {domain}")
return
# Catch all exceptions
except Exception as e:
await ctx.response.send_message(f"An error occurred: {e}")
2023-08-11 17:24:11 +10:00
2023-08-12 15:38:32 +10:00
@tree.command(name="invite", description="Invite me to your server")
async def invite(ctx):
await ctx.response.send_message("https://discord.com/api/oauth2/authorize?client_id=1006128164218621972&permissions=0&scope=bot",ephemeral=True)
2023-08-11 17:24:11 +10:00
2023-08-11 16:55:18 +10:00
async def log(message):
2023-08-12 16:52:01 +10:00
channel=client.get_channel(LOG_CHANNEL)
2023-08-11 16:55:18 +10:00
await channel.send(message)
2023-08-11 15:55:44 +10:00
def updateStatus():
2023-08-11 16:35:36 +10:00
global activityMessage
global statusType
if (statusType == "watching"):
activity=discord.Activity(type=discord.ActivityType.watching, name=activityMessage)
elif (statusType == "playing"):
activity=discord.Activity(type=discord.ActivityType.playing, name=activityMessage)
elif (statusType == "listening"):
activity=discord.Activity(type=discord.ActivityType.listening, name=activityMessage)
elif (statusType == "competing"):
activity=discord.Activity(type=discord.ActivityType.competing, name=activityMessage)
else:
activity=discord.Activity(type=discord.ActivityType.watching, name=activityMessage)
2023-08-11 15:55:44 +10:00
client.loop.create_task(client.change_presence(activity=activity))
# When the bot is ready
@client.event
async def on_ready():
2023-08-12 17:14:55 +10:00
global ADMINID
ADMINID = client.application.owner.id
await log("Bot is starting...")
2023-08-11 15:55:44 +10:00
await tree.sync()
updateStatus()
2023-08-12 17:14:55 +10:00
await log("Bot is ready")
2023-08-11 15:55:44 +10:00
client.run(TOKEN)