Nathan Woodburn
32c6f0fb09
All checks were successful
Build Docker / Build Docker (push) Successful in 16s
310 lines
12 KiB
Python
310 lines
12 KiB
Python
import os
|
|
from dotenv import load_dotenv
|
|
import discord
|
|
from discord import app_commands
|
|
import requests
|
|
import dns.resolver
|
|
import markdownify
|
|
import subprocess
|
|
import tempfile
|
|
import re
|
|
import binascii
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
|
load_dotenv()
|
|
TOKEN = os.getenv('DISCORD_TOKEN')
|
|
ADMINID = 0
|
|
KUTT_APIKEY=os.getenv('LINK_API_KEY')
|
|
KUTT_URL=os.getenv('LINK_URL')
|
|
LOG_CHANNEL = int(os.getenv('LOG_CHANNEL'))
|
|
|
|
intents = discord.Intents.default()
|
|
client = discord.Client(intents=intents)
|
|
tree = app_commands.CommandTree(client)
|
|
activityMessage="over the server"
|
|
statusType="watching"
|
|
|
|
|
|
# Commands
|
|
@tree.command(name="ping", description="Check bot connection")
|
|
async def ping(ctx):
|
|
await ctx.response.send_message("Pong!",ephemeral=True)
|
|
|
|
@tree.command(name="shortlink", description="Shorten a link")
|
|
async def shortlink(ctx, link: str, name: str = None):
|
|
if (ctx.user.id != ADMINID):
|
|
await log("User: " + str(ctx.user.name) + " tried to use the shortlink command")
|
|
await ctx.response.send_message("You don't have permission to use this command",ephemeral=True)
|
|
else:
|
|
url=f"{KUTT_URL}/api/v2/links"
|
|
headers = {'X-API-KEY' : KUTT_APIKEY}
|
|
|
|
data = {'target' : link, 'customurl' : name}
|
|
if (name == None):
|
|
data = {'target' : link}
|
|
x = requests.post(url, data = data, headers = headers)
|
|
if (x.status_code != 200 and x.status_code != 201):
|
|
await ctx.response.send_message("ERROR: " + x.text,ephemeral=True)
|
|
link=x.json()['link']
|
|
await ctx.response.send_message("Link: " + link,ephemeral=False)
|
|
|
|
@tree.command(name="botstatus", description="Set the bot status")
|
|
async def botstatus(ctx, message: str, statusmethod: str = None):
|
|
if (ctx.user.id != ADMINID):
|
|
await log("User: " + str(ctx.user.name) + " tried to use the botstatus command")
|
|
await ctx.response.send_message("You don't have permission to use this command",ephemeral=True)
|
|
else:
|
|
global activityMessage
|
|
activityMessage=message
|
|
global statusType
|
|
if (statusmethod == None):
|
|
statusmethod="watching"
|
|
else:
|
|
statusType=statusmethod.lower()
|
|
updateStatus()
|
|
await ctx.response.send_message("Status updated",ephemeral=True)
|
|
|
|
@tree.command(name="dig", description="Dig a dns record")
|
|
async def dig(ctx, domain: str, record_type: str = "A"):
|
|
record_type = record_type.upper()
|
|
resolver = dns.resolver.Resolver()
|
|
resolver.nameservers = ["100.74.29.146"]
|
|
resolver.port = 53
|
|
try:
|
|
# Query the DNS record
|
|
response = resolver.resolve(domain, record_type)
|
|
records = ""
|
|
for record in response:
|
|
records = records + "\n" + str(record)
|
|
|
|
# Send the result to the Discord channel
|
|
await ctx.response.send_message(f"DNS records for {domain} ({record_type}):{records}")
|
|
|
|
except dns.resolver.NXDOMAIN:
|
|
await ctx.response.send_message(f"Domain {domain} not found.")
|
|
except dns.exception.DNSException as e:
|
|
await ctx.response.send_message(f"An error occurred: {e}")
|
|
|
|
@tree.command(name="curl", description="HTTP request")
|
|
async def curl(ctx, url: str):
|
|
try:
|
|
proxyURL = "https://proxy.hnsproxy.au"
|
|
response = requests.get(url, proxies={"http": proxyURL, "https": proxyURL},verify=False)
|
|
output = response.text
|
|
# Get BODY only
|
|
output = output.split("<body")[1]
|
|
output = output.split("</body>")[0]
|
|
output = output.split(">", 1)[1]
|
|
# Replace any relative links with absolute links
|
|
output = output.replace('href="/', 'href="' + url + '/')
|
|
parsed = markdownify.markdownify(output, heading_style="ATX")
|
|
# Delete any empty lines
|
|
parsed = "\n".join([s for s in parsed.splitlines() if s.strip()])
|
|
|
|
output = response.text
|
|
# Get title
|
|
if (output.find("<title>") != -1):
|
|
title = output.split("<title>")[1]
|
|
title = title.split("</title>")[0]
|
|
else:
|
|
title = url
|
|
if (len(parsed) > 4096):
|
|
parsed = parsed[:4096]
|
|
# Delete any incomplete lines
|
|
parsed = "\n".join(parsed.splitlines()[:-1])
|
|
parsed = parsed + "\n..."
|
|
|
|
# if url is a tld only replace it with the url https://hns.au (due to Discord not allowing tld only links)
|
|
if (url.find(".") == -1):
|
|
url = "https://hns.au"
|
|
# Create an embed
|
|
embed = discord.Embed(title=title, url=url, description=parsed)
|
|
embed.set_footer(text="Parsed by HNSProxy",icon_url="https://hns.au/assets/img/favicon.png")
|
|
embed.timestamp = discord.utils.utcnow()
|
|
await ctx.response.send_message(embed=embed)
|
|
except requests.exceptions.RequestException as e:
|
|
await ctx.response.send_message(f"An error occurred: {e}")
|
|
except Exception as e:
|
|
await ctx.response.send_message(f"An error occurred: {e}")
|
|
|
|
@tree.command(name="ssl", description="Check SSL certificate")
|
|
async def ssl(ctx, domain: str):
|
|
# Verify that the domain is valid
|
|
if not domain:
|
|
await ctx.response.send_message("Please provide a domain to check")
|
|
return
|
|
|
|
regexmatch = re.match(r"^([a-z0-9]+(-[a-z0-9]+)*\.)*([a-z0-9]+(-[a-z0-9]+)*)$", domain)
|
|
if not regexmatch:
|
|
await ctx.response.send_message("Please provide a valid domain to check")
|
|
return
|
|
|
|
await ctx.response.send_message(f"Checking SSL certificate for {domain}...")
|
|
message = ""
|
|
resolver = dns.resolver.Resolver()
|
|
resolver.nameservers = ["100.74.29.146"]
|
|
resolver.port = 53
|
|
domain_check = False
|
|
try:
|
|
# Query the DNS record
|
|
response = resolver.resolve(domain, "A")
|
|
records = []
|
|
message = "## A records:\n"
|
|
for record in response:
|
|
records.append(str(record))
|
|
message = message + "- " +str(record) + "\n"
|
|
|
|
|
|
if not records:
|
|
await ctx.channel.send(f"No A record found for {domain}")
|
|
return
|
|
|
|
# Get the first A record
|
|
ip = records[0]
|
|
|
|
# Run the openssl s_client command
|
|
s_client_command = ["openssl","s_client","-showcerts","-connect",f"{ip}:443","-servername",domain,]
|
|
|
|
s_client_process = subprocess.Popen(s_client_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
|
|
s_client_output, _ = s_client_process.communicate(input=b"\n")
|
|
|
|
certificates = []
|
|
current_cert = ""
|
|
for line in s_client_output.split(b"\n"):
|
|
current_cert += line.decode("utf-8") + "\n"
|
|
if "-----END CERTIFICATE-----" in line.decode("utf-8"):
|
|
certificates.append(current_cert)
|
|
current_cert = ""
|
|
|
|
# Remove anything before -----BEGIN CERTIFICATE-----
|
|
certificates = [cert[cert.find("-----BEGIN CERTIFICATE-----"):] for cert in certificates]
|
|
|
|
if certificates:
|
|
cert = certificates[0]
|
|
message = message + "\n## Website Certificate:\n```\n" + cert + "\n```\n"
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_cert_file:
|
|
temp_cert_file.write(cert)
|
|
temp_cert_file.seek(0) # Move back to the beginning of the temporary file
|
|
|
|
tlsa_command = ["openssl","x509","-in",temp_cert_file.name,"-pubkey","-noout","|","openssl","pkey","-pubin","-outform","der","|","openssl","dgst","-sha256","-binary",]
|
|
|
|
tlsa_process = subprocess.Popen(" ".join(tlsa_command), shell=True, stdout=subprocess.PIPE)
|
|
tlsa_output, _ = tlsa_process.communicate()
|
|
|
|
tlsa_server = "3 1 1 " + binascii.hexlify(tlsa_output).decode("utf-8")
|
|
|
|
|
|
message = message + "\n## TLSA Record from webserver:\n`" + tlsa_server + "`\n"
|
|
|
|
|
|
# Get domains
|
|
cert_obj = x509.load_pem_x509_certificate(cert.encode("utf-8"), default_backend())
|
|
|
|
domains = []
|
|
for ext in cert_obj.extensions:
|
|
if ext.oid == x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
|
|
san_list = ext.value.get_values_for_type(x509.DNSName)
|
|
domains.extend(san_list)
|
|
|
|
# Extract the common name (CN) from the subject
|
|
common_name = cert_obj.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
|
|
if common_name:
|
|
if common_name[0].value not in domains:
|
|
domains.append(common_name[0].value)
|
|
|
|
|
|
if domains:
|
|
message = message + "\n## SSL Domains:\n"
|
|
for cn in domains:
|
|
message = message + "- " + cn + "\n"
|
|
if domain in domains:
|
|
domain_check = True
|
|
|
|
|
|
else:
|
|
message = message + "\n## Website Certificate:\n:x: No certificate found\n"
|
|
message = message + "\n## TLSA Record from webserver:\n:x: No certificate found\n"
|
|
|
|
try:
|
|
# Check for TLSA record
|
|
response = resolver.resolve("_443._tcp."+domain, "TLSA")
|
|
tlsa_records = []
|
|
message = message + "\n## TLSA Records from DNS:\n"
|
|
for record in response:
|
|
tlsa_records.append(str(record))
|
|
message = message + "- " +str(record) + "\n"
|
|
|
|
if not tlsa_records:
|
|
message = message + "\n## Result:\n:x: No TLSA record found\n"
|
|
else:
|
|
if tlsa_server == tlsa_records[0]:
|
|
if domain_check:
|
|
message = message + "\n## Result:\n:white_check_mark: TLSA record matches certificate\n"
|
|
else:
|
|
message = message + "\n## Result:\n:x: TLSA record matches certificate but domain does not match\n"
|
|
|
|
else:
|
|
message = message + "\n## Result:\n:x: TLSA record does not match certificate\n"
|
|
|
|
except:
|
|
message = message + "\n## TLSA Records from DNS:\n:x: No TLSA record found\n"
|
|
message = message + "\n## Result:\n:x: No TLSA record found\n"
|
|
|
|
|
|
# If message is too long, send it in 2 messages
|
|
if (len(message) > 2000):
|
|
# Split on the last line under 2000 characters
|
|
message1 = message[:2000]
|
|
message1 = message1[:message1.rfind("\n")]
|
|
message2 = message[len(message1):]
|
|
|
|
|
|
await ctx.channel.send(message1)
|
|
await ctx.channel.send(message2)
|
|
|
|
|
|
else:
|
|
await ctx.channel.send(message)
|
|
|
|
# Catch all exceptions
|
|
except Exception as e:
|
|
await ctx.channel.send(f"An error occurred: {e}")
|
|
|
|
|
|
@tree.command(name="invite", description="Invite me to your server")
|
|
async def invite(ctx):
|
|
await ctx.response.send_message("https://discord.com/api/oauth2/authorize?client_id=1006128164218621972&permissions=0&scope=bot",ephemeral=True)
|
|
|
|
async def log(message):
|
|
channel=client.get_channel(LOG_CHANNEL)
|
|
await channel.send(message)
|
|
|
|
def updateStatus():
|
|
global activityMessage
|
|
global statusType
|
|
if (statusType == "watching"):
|
|
activity=discord.Activity(type=discord.ActivityType.watching, name=activityMessage)
|
|
elif (statusType == "playing"):
|
|
activity=discord.Activity(type=discord.ActivityType.playing, name=activityMessage)
|
|
elif (statusType == "listening"):
|
|
activity=discord.Activity(type=discord.ActivityType.listening, name=activityMessage)
|
|
elif (statusType == "competing"):
|
|
activity=discord.Activity(type=discord.ActivityType.competing, name=activityMessage)
|
|
else:
|
|
activity=discord.Activity(type=discord.ActivityType.watching, name=activityMessage)
|
|
client.loop.create_task(client.change_presence(activity=activity))
|
|
|
|
# When the bot is ready
|
|
@client.event
|
|
async def on_ready():
|
|
global ADMINID
|
|
ADMINID = client.application.owner.id
|
|
await log("Bot is starting...")
|
|
await tree.sync()
|
|
updateStatus()
|
|
await log("Bot is ready")
|
|
|
|
client.run(TOKEN) |