bot: Add ssl notifications
All checks were successful
Build Docker / Build Docker (push) Successful in 18s
All checks were successful
Build Docker / Build Docker (push) Successful in 18s
This commit is contained in:
parent
c782e17028
commit
32817c8c32
@ -3,4 +3,5 @@ COPY requirements.txt /app/
|
||||
WORKDIR /app
|
||||
RUN pip install -r requirements.txt
|
||||
COPY . .
|
||||
VOLUME [ "/mnt" ]
|
||||
CMD ["python3", "bot.py"]
|
88
bot.py
88
bot.py
@ -12,6 +12,8 @@ import binascii
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
import datetime
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
|
||||
|
||||
|
||||
load_dotenv()
|
||||
@ -131,7 +133,7 @@ async def curl(ctx, url: str):
|
||||
await ctx.response.send_message(f"An error occurred: {e}")
|
||||
|
||||
@tree.command(name="ssl", description="Check SSL certificate")
|
||||
async def ssl(ctx, domain: str, showcert: bool = False):
|
||||
async def ssl(ctx, domain: str, showcert: bool = False, notifymeonexpiry: bool = False):
|
||||
# Verify that the domain is valid
|
||||
if not domain:
|
||||
await ctx.response.send_message("Please provide a domain to check")
|
||||
@ -281,7 +283,11 @@ async def ssl(ctx, domain: str, showcert: bool = False):
|
||||
|
||||
else:
|
||||
await ctx.channel.send(message)
|
||||
|
||||
|
||||
if (notifymeonexpiry):
|
||||
with open("/mnt/sslnotify.txt", "a") as file:
|
||||
file.write(str(ctx.user.id) + "," + domain + "\n")
|
||||
|
||||
# Catch all exceptions
|
||||
except Exception as e:
|
||||
await ctx.channel.send(f"An error occurred: {e}")
|
||||
@ -310,6 +316,77 @@ def updateStatus():
|
||||
activity=discord.Activity(type=discord.ActivityType.watching, name=activityMessage)
|
||||
client.loop.create_task(client.change_presence(activity=activity))
|
||||
|
||||
def checkForSSLExpiry():
|
||||
with open("/mnt/sslnotify.txt", "r") as file:
|
||||
lines = file.readlines()
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
userid, domain = line.split(",")
|
||||
resolver = dns.resolver.Resolver()
|
||||
resolver.nameservers = ["100.74.29.146"]
|
||||
resolver.port = 53
|
||||
try:
|
||||
# Query the DNS record
|
||||
response = resolver.resolve(domain, "A")
|
||||
records = []
|
||||
for record in response:
|
||||
records.append(str(record))
|
||||
if not records:
|
||||
continue
|
||||
# Get the first A record
|
||||
ip = records[0]
|
||||
# Run the openssl s_client command
|
||||
s_client_command = ["openssl","s_client","-showcerts","-connect",f"{ip}:443","-servername",domain,]
|
||||
s_client_process = subprocess.Popen(s_client_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
|
||||
s_client_output, _ = s_client_process.communicate(input=b"\n")
|
||||
certificates = []
|
||||
current_cert = ""
|
||||
for line in s_client_output.split(b"\n"):
|
||||
current_cert += line.decode("utf-8") + "\n"
|
||||
if "-----END CERTIFICATE-----" in line.decode("utf-8"):
|
||||
certificates.append(current_cert)
|
||||
current_cert = ""
|
||||
# Remove anything before -----BEGIN CERTIFICATE-----
|
||||
certificates = [cert[cert.find("-----BEGIN CERTIFICATE-----"):] for cert in certificates]
|
||||
if certificates:
|
||||
cert = certificates[0]
|
||||
# Get expiry date
|
||||
cert_obj = x509.load_pem_x509_certificate(cert.encode("utf-8"), default_backend())
|
||||
expiry_date = cert_obj.not_valid_after
|
||||
if expiry_date < datetime.datetime.now() + datetime.timedelta(days=7):
|
||||
user = client.get_user(int(userid))
|
||||
if user:
|
||||
user.send(f"SSL certificate for {domain} expires soon")
|
||||
except:
|
||||
continue
|
||||
|
||||
@tree.command(name="ssldomains", description="List domains with SSL certificates")
|
||||
async def ssldomains(ctx):
|
||||
# Get user id
|
||||
userid = str(ctx.user.id)
|
||||
# Get all domains for user
|
||||
domains = []
|
||||
with open("/mnt/sslnotify.txt", "r") as file:
|
||||
lines = file.readlines()
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
if line.startswith(userid):
|
||||
_, domain = line.split(",")
|
||||
domains.append(domain)
|
||||
if not domains:
|
||||
await ctx.response.send_message("You have no domains in the SSL expiry notification list",ephemeral=True)
|
||||
return
|
||||
|
||||
await ctx.response.send_message("Domains in the SSL expiry notification list:\n" + "\n".join(domains),ephemeral=True)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# When the bot is ready
|
||||
@client.event
|
||||
async def on_ready():
|
||||
@ -318,4 +395,9 @@ async def on_ready():
|
||||
await tree.sync()
|
||||
updateStatus()
|
||||
|
||||
client.run(TOKEN)
|
||||
client.run(TOKEN)
|
||||
|
||||
# Every 12 hours check for SSL expiry
|
||||
scheduler = AsyncIOScheduler()
|
||||
scheduler.add_job(checkForSSLExpiry, 'interval', hours=12)
|
||||
scheduler.start()
|
Loading…
Reference in New Issue
Block a user