315 lines
11 KiB
Python
315 lines
11 KiB
Python
from flask import Blueprint, request, render_template, session
|
|
from tools import json_response, getClientIP
|
|
from datetime import datetime
|
|
|
|
terminal_bp = Blueprint('terminal', __name__)
|
|
|
|
|
|
@terminal_bp.route("/terminal")
|
|
def index():
|
|
return render_template("terminal.html", user=getClientIP(request))
|
|
|
|
|
|
COMMANDS = {
|
|
"help": "Show this help message",
|
|
"about": "About this terminal",
|
|
"clear": "Clear the terminal",
|
|
"echo [text]": "Echo text back",
|
|
"whoami": "Display the current user",
|
|
"ls": "List directory contents",
|
|
"pwd": "Print working directory",
|
|
"date": "Show current date and time with optional format",
|
|
"cd [path]": "Change directory to path",
|
|
"cat [file]": "Display file contents",
|
|
"reset": "Reset the terminal session",
|
|
"exit": "Exit the terminal session",
|
|
}
|
|
|
|
|
|
def get_nodes_in_directory(path: str) -> list[str]:
|
|
"""Simulate getting files in a directory for the terminal."""
|
|
# If path is valid, get files from session
|
|
if session.get("paths", None) is None:
|
|
setup_path_session()
|
|
|
|
# Split path into parts
|
|
parts = path.strip("/").split("/")
|
|
if not parts or parts == [""]:
|
|
return session["paths"]
|
|
|
|
current_level = session["paths"]
|
|
for part in parts:
|
|
found = False
|
|
for item in current_level:
|
|
if item["name"] == part and item["type"] == 0:
|
|
current_level = item.get("children", [])
|
|
found = True
|
|
break
|
|
if not found:
|
|
return []
|
|
|
|
return current_level
|
|
|
|
|
|
def setup_path_session():
|
|
"""Initialize the session path variables if not already set."""
|
|
if "pwd" not in session:
|
|
session["pwd"] = f"/home/{getClientIP(request)}"
|
|
if "paths" not in session:
|
|
session["paths"] = [
|
|
{"name": "home", "type": 0, "children": [
|
|
{"name": str(getClientIP(request)), "type": 0, "children": [
|
|
{"name": "Readme.txt", "type": 1, "content": "This is a README file.", "permissions": 2},
|
|
], "permissions": 2}
|
|
], "permissions": 1},
|
|
{"name": "bin", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "boot", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "dev", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "etc", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "lib", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "lib64", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "mnt", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "nix", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "opt", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "proc", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "root", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "run", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "sbin", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "srv", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "sys", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "tmp", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "usr", "type": 0, "children": [], "permissions": 1},
|
|
{"name": "var", "type": 0, "children": [], "permissions": 1},
|
|
]
|
|
|
|
|
|
def is_valid_path(path: str) -> bool:
|
|
"""Check if the given path is valid in the simulated terminal."""
|
|
if session.get("paths", None) is None:
|
|
setup_path_session()
|
|
|
|
# Split path into parts
|
|
parts = path.strip("/").split("/")
|
|
if not parts or parts == [""]:
|
|
return True # Root path is valid
|
|
|
|
current_level = session["paths"]
|
|
for part in parts:
|
|
found = False
|
|
for item in current_level:
|
|
if item["name"] == part and item["type"] == 0:
|
|
current_level = item.get("children", [])
|
|
found = True
|
|
break
|
|
if not found:
|
|
return False
|
|
|
|
return True
|
|
|
|
def is_valid_file(path: str) -> bool:
|
|
"""Check if the given file exists in the current directory."""
|
|
if session.get("paths", None) is None:
|
|
setup_path_session()
|
|
|
|
# Get path parts
|
|
parts = path.split("/")
|
|
# Get files in the directory
|
|
if is_valid_path("/".join(parts[:-1])):
|
|
files = get_nodes_in_directory("/".join(parts[:-1]))
|
|
for item in files:
|
|
if item["name"] == parts[-1] and item["type"] == 1:
|
|
return True
|
|
return False
|
|
|
|
def get_node(path: str) -> dict:
|
|
"""Get the node (file or directory) at the given path."""
|
|
if session.get("paths", None) is None:
|
|
setup_path_session()
|
|
|
|
parts = path.strip("/").split("/")
|
|
current_level = session["paths"]
|
|
for part in parts:
|
|
for item in current_level:
|
|
if item["name"] == part:
|
|
if part == parts[-1]:
|
|
return item
|
|
else:
|
|
current_level = item.get("children", [])
|
|
break
|
|
return {}
|
|
|
|
def sanitize_path(path: str) -> str:
|
|
"""Sanitize the given path to prevent directory traversal."""
|
|
parts = path.strip("/").split("/")
|
|
sanitized_parts = []
|
|
for part in parts:
|
|
if part == "" or part == ".":
|
|
continue
|
|
elif part == "..":
|
|
if sanitized_parts:
|
|
sanitized_parts.pop()
|
|
else:
|
|
sanitized_parts.append(part)
|
|
return "/" + "/".join(sanitized_parts)
|
|
|
|
|
|
@terminal_bp.route("/terminal/execute/ls", methods=["POST"])
|
|
def ls():
|
|
data = request.get_json() or {}
|
|
args = data.get("args", "")
|
|
args = args.split()
|
|
# Check if -a flag is provided
|
|
# Pop if -a flag from args
|
|
all_files = False
|
|
if "-a" in args:
|
|
all_files = True
|
|
args.remove("-a")
|
|
elif "--all" in args:
|
|
all_files = True
|
|
args.remove("--all")
|
|
|
|
path = session.get("pwd", f"/home/{getClientIP(request)}")
|
|
if args:
|
|
path = args[0]
|
|
if not path.startswith("/"):
|
|
# Relative path
|
|
path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path)
|
|
if not is_valid_path(path):
|
|
return json_response(request, {"output": f"ls: cannot access '{path}': No such file or directory"}, 200)
|
|
|
|
files = get_nodes_in_directory(path)
|
|
output = [file["name"] for file in files]
|
|
output = " ".join(output)
|
|
|
|
return json_response(request, {"output": output}, 200)
|
|
|
|
|
|
@terminal_bp.route("/terminal/pwd")
|
|
def pwd():
|
|
if "pwd" not in session:
|
|
session["pwd"] = f"/home/{getClientIP(request)}"
|
|
pwd = session["pwd"]
|
|
if pwd == "/home/" + getClientIP(request):
|
|
pwd = "~"
|
|
return json_response(request, {"output": pwd, "raw": session["pwd"]}, 200)
|
|
|
|
|
|
@terminal_bp.route("/terminal/execute/cd", methods=["POST"])
|
|
def cd():
|
|
data = request.get_json() or {}
|
|
args = data.get("args", "")
|
|
args = args.split()
|
|
|
|
if not args:
|
|
# No path provided, go to home
|
|
session["pwd"] = f"/home/{getClientIP(request)}"
|
|
output = ""
|
|
else:
|
|
path = args[0]
|
|
# Simulate changing directory
|
|
if path == "~":
|
|
session["pwd"] = f"/home/{getClientIP(request)}"
|
|
output = ""
|
|
|
|
if not path.startswith("/"):
|
|
path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path)
|
|
|
|
if is_valid_path(path):
|
|
session["pwd"] = sanitize_path(path)
|
|
output = ""
|
|
else:
|
|
output = f"bash: cd: {path}: No such file or directory"
|
|
|
|
return json_response(request, {"output": output}, 200)
|
|
|
|
@terminal_bp.route("/terminal/execute/cat", methods=["POST"])
|
|
def cat():
|
|
data = request.get_json() or {}
|
|
args = data.get("args", "")
|
|
args = args.split()
|
|
|
|
if not args:
|
|
return json_response(request, {"output": "cat: missing file operand"}, 200)
|
|
|
|
path = args[0]
|
|
if not path.startswith("/"):
|
|
path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path)
|
|
|
|
# Check if path is valid
|
|
if not is_valid_file(path):
|
|
return json_response(request, {"output": f"cat: {path}: No such file or directory"}, 200)
|
|
|
|
output = get_node(path).get("content", "")
|
|
|
|
return json_response(request, {"output": output}, 200)
|
|
|
|
@terminal_bp.route("/terminal/execute/echo", methods=["POST"])
|
|
def echo():
|
|
data = request.get_json() or {}
|
|
args = data.get("args", "")
|
|
return json_response(request, {"output": args}, 200)
|
|
|
|
|
|
@terminal_bp.route("/terminal/execute/date", methods=["POST"])
|
|
def date():
|
|
# See if any args are passed
|
|
data = request.get_json() or {}
|
|
args = data.get("args", "")
|
|
|
|
# Use arguments to format date if needed
|
|
if args:
|
|
# If args if --help or -h, show help message
|
|
if args in ("--help", "-h"):
|
|
output = (
|
|
"Usage: date [FORMAT]\n\n"
|
|
"Display the current date and time.\n\n"
|
|
"FORMAT controls the output. Some common format specifiers:\n"
|
|
" %a Abbreviated weekday name (e.g., 'Mon')\n"
|
|
" %b Abbreviated month name (e.g., 'Jan')\n"
|
|
" %d Day of the month (01 to 31)\n"
|
|
" %H Hour (00 to 23)\n"
|
|
" %M Minute (00 to 59)\n"
|
|
" %S Second (00 to 60)\n"
|
|
" %Y Year with century (e.g., 2024)\n\n"
|
|
"Example: date '%Y-%m-%d %H:%M:%S'"
|
|
)
|
|
return json_response(request, {"output": output}, 200)
|
|
|
|
try:
|
|
output = datetime.now().strftime(args)
|
|
except Exception:
|
|
output = "Invalid date format."
|
|
else:
|
|
output = datetime.now().strftime("%a %b %d %H:%M:%S %Z %Y")
|
|
return json_response(request, {"output": output}, 200)
|
|
|
|
|
|
@terminal_bp.route("/terminal/execute/<command>", methods=["POST"])
|
|
def execute_catch(command):
|
|
data = request.get_json() or {}
|
|
args = data.get("args", "")
|
|
|
|
# Basic command processing
|
|
if command == "help":
|
|
output = "Available commands:\n" + \
|
|
"\n".join(f" {cmd}: {desc}" for cmd, desc in COMMANDS.items())
|
|
elif command == "about":
|
|
output = "This is a simulated terminal interface created by Nathan Woodburn."
|
|
elif command == "whoami":
|
|
output = getClientIP(request)
|
|
elif command == "pwd":
|
|
# Get pwd from session or simulate
|
|
output = session.get("pwd", f"/home/{getClientIP(request)}")
|
|
else:
|
|
output = f"Command not found: {command}"
|
|
|
|
return json_response(request, {"output": output}, 200)
|
|
|
|
|
|
@terminal_bp.route("/terminal/execute/reset", methods=["POST"])
|
|
def reset():
|
|
# Clear the session data related to terminal
|
|
session.pop("pwd", None)
|
|
session.pop("paths", None)
|
|
output = "Terminal session has been reset."
|
|
return json_response(request, {"output": output}, 200) |