diff --git a/blueprints/terminal.py b/blueprints/terminal.py new file mode 100644 index 0000000..ef2f42d --- /dev/null +++ b/blueprints/terminal.py @@ -0,0 +1,665 @@ +from flask import Blueprint, request, render_template, session +from tools import json_response, getClientIP +from datetime import datetime + +terminal_bp = Blueprint('terminal', __name__) + + +@terminal_bp.route("/terminal") +def index(): + return render_template("terminal.html", user=getClientIP(request)) + + +COMMANDS = { + "help": "Show this help message", + "about": "About this terminal", + "clear": "Clear the terminal", + "echo [text]": "Echo text back", + "whoami": "Display the current user", + "ls": "List directory contents", + "pwd": "Print working directory", + "date": "Show current date and time with optional format", + "cd [path]": "Change directory to path", + "cat [file]": "Display file contents", + "rm [file]": "Remove a file", + "tree [path]": "Display directory tree", + "touch [file]": "Create a new empty file", + "nano [file]": "Edit a file", + "reset": "Reset the terminal session", + "exit": "Exit the terminal session", +} + +BOOT_FILES = [ + "amd-ucode.img", + "EFI", + "initramfs-linux-fallback.img", + "initramfs-linux.img", + "initramfs-linux-lts-fallback.img", + "initramfs-linux-lts.img", + "intel-ucode.img", + "loader", + "vmlinuz-linux", + "vmlinuz-linux-lts" +] + + +def get_nodes_in_directory(path: str) -> list[str]: + """Simulate getting files in a directory for the terminal.""" + # If path is valid, get files from session + if session.get("paths", None) is None: + setup_path_session() + + # Split path into parts + parts = path.strip("/").split("/") + if not parts or parts == [""]: + return session["paths"] + + current_level = session["paths"] + for part in parts: + found = False + for item in current_level: + if item["name"] == part and item["type"] == 0: + current_level = item.get("children", []) + found = True + break + if not found: + return [] + + return current_level + + +def setup_path_session(): + """Initialize the session path variables if not already set.""" + if "pwd" not in session: + session["pwd"] = f"/home/{getClientIP(request)}" + if "paths" not in session: + binaries = [] + for cmd in COMMANDS.keys(): + binaries.append({"name": cmd.split()[0], "type": 2, "content": "", "permissions": 1}) + boot_files = [] + for bin_file in BOOT_FILES: + boot_files.append({"name": bin_file, "type": 1, "content": "", "permissions": 1}) + + session["paths"] = [ + {"name": "home", "type": 0, "children": [ + {"name": str(getClientIP(request)), "type": 0, "children": [ + {"name": "Readme.txt", "type": 1, "content": "This is a README file.", "permissions": 2}, + ], "permissions": 2} + ], "permissions": 1}, + {"name": "bin", "type": 0, "children": binaries, "permissions": 1}, + {"name": "boot", "type": 0, "children": boot_files, "permissions": 1}, + {"name": "dev", "type": 0, "children": [], "permissions": 1}, + {"name": "etc", "type": 0, "children": [], "permissions": 1}, + {"name": "lib", "type": 0, "children": [], "permissions": 1}, + {"name": "lib64", "type": 0, "children": [], "permissions": 1}, + {"name": "opt", "type": 0, "children": [], "permissions": 1}, + {"name": "proc", "type": 0, "children": [], "permissions": 1}, + {"name": "root", "type": 0, "children": [], "permissions": 1}, + {"name": "run", "type": 0, "children": [], "permissions": 1}, + {"name": "sbin", "type": 0, "children": [], "permissions": 1}, + {"name": "srv", "type": 0, "children": [], "permissions": 1}, + {"name": "sys", "type": 0, "children": [], "permissions": 1}, + {"name": "tmp", "type": 0, "children": [], "permissions": 1}, + {"name": "usr", "type": 0, "children": [], "permissions": 1}, + {"name": "var", "type": 0, "children": [], "permissions": 1}, + ] + + +def is_valid_path(path: str) -> bool: + """Check if the given path is valid in the simulated terminal.""" + if session.get("paths", None) is None: + setup_path_session() + + # Split path into parts + parts = path.strip("/").split("/") + if not parts or parts == [""]: + return True # Root path is valid + + current_level = session["paths"] + for part in parts: + found = False + for item in current_level: + if item["name"] == part and item["type"] == 0: + current_level = item.get("children", []) + found = True + break + if not found: + return False + + return True + +def is_valid_file(path: str) -> bool: + """Check if the given file exists in the current directory.""" + if session.get("paths", None) is None: + setup_path_session() + + # Get path parts + parts = path.split("/") + # Get files in the directory + if is_valid_path("/".join(parts[:-1])): + files = get_nodes_in_directory("/".join(parts[:-1])) + for item in files: + if item["name"] == parts[-1] and item["type"] == 1: + return True + return False + +def is_valid_binary(path: str) -> bool: + """Check if the given file exists in the current directory.""" + if session.get("paths", None) is None: + setup_path_session() + + # Get path parts + parts = path.split("/") + # Get files in the directory + if is_valid_path("/".join(parts[:-1])): + files = get_nodes_in_directory("/".join(parts[:-1])) + for item in files: + if item["name"] == parts[-1] and item["type"] == 2: + return True + return False + +def get_node(path: str) -> dict: + """Get the node (file or directory) at the given path.""" + if session.get("paths", None) is None: + setup_path_session() + + parts = path.strip("/").split("/") + current_level = session["paths"] + for part in parts: + for item in current_level: + if item["name"] == part: + if part == parts[-1]: + return item + else: + current_level = item.get("children", []) + break + return {} + +def build_tree(path: str, prefix: str = "") -> str: + output = "" + files = get_nodes_in_directory(path) + for i, item in enumerate(files): + connector = "└── " if i == len(files) - 1 else "├── " + output += f"{prefix}{connector}{item['name']}\n" + if item["type"] == 0: # Directory + extension = " " if i == len(files) - 1 else "│ " + output += build_tree(sanitize_path(path + "/" + item["name"]), prefix + extension) + return output + +def sanitize_path(path: str) -> str: + """Sanitize the given path to prevent directory traversal.""" + parts = path.strip("/").split("/") + sanitized_parts = [] + for part in parts: + if part == "" or part == ".": + continue + elif part == "..": + if sanitized_parts: + sanitized_parts.pop() + else: + sanitized_parts.append(part) + return "/" + "/".join(sanitized_parts) + +def remove_node(path: str) -> bool: + """Remove the node (file or directory) at the given path.""" + if session.get("paths", None) is None: + setup_path_session() + + parts = path.strip("/").split("/") + current_level = session["paths"] + for i, part in enumerate(parts): + for j, item in enumerate(current_level): + if item["name"] == part: + if i == len(parts) - 1: + # Remove the item + del current_level[j] + + # Update the session paths + session["paths"] = session["paths"] + + return True + else: + current_level = item.get("children", []) + break + return False + + +@terminal_bp.route("/terminal/execute/ls", methods=["POST"]) +def ls(): + data = request.get_json() or {} + args = data.get("args", "") + args = args.split() + # Check if -a flag is provided + # Pop if -a flag from args + all_files = False + if "-a" in args: + all_files = True + args.remove("-a") + elif "--all" in args: + all_files = True + args.remove("--all") + + long_format = False + if "-l" in args: + long_format = True + args.remove("-l") + elif "--long" in args: + long_format = True + args.remove("--long") + + for arg in args: + if arg.startswith("-") and not arg.startswith("--"): + if "l" in arg: + long_format = True + if "a" in arg: + all_files = True + args.remove(arg) + + + ip = getClientIP(request) + path = session.get("pwd", f"/home/{ip}") + if args: + path = args[0] + if not path.startswith("/"): + # Relative path + path = sanitize_path(session.get("pwd", f"/home/{ip}") + "/" + path) + if not is_valid_path(path): + # If it is a file or binary, return it + if is_valid_file(path) or is_valid_binary(path): + if long_format: + node = get_node(path) + permissions = node.get("permissions", 0) + perm_str = "" + perm_str += "r" if permissions > 0 else "-" + perm_str += "w" if permissions > 1 else "-" + perm_str += "x" if (node.get("type", 0) == 2 and permissions > 1) else "-" + user = f"{ip}" if path.startswith(f"/home/{ip}") else "root" + output = f"{perm_str} {user} {user} {path.split('/')[-1]}" + return json_response(request, {"output": output}, 200) + return json_response(request, {"output": path.split("/")[-1]}, 200) + + return json_response(request, {"output": f"ls: cannot access '{path}': No such file or directory"}, 200) + + files = get_nodes_in_directory(path) + output = [] + if long_format: + for f in files: + permissions = f.get("permissions", 0) + perm_str = "" + perm_str += "r" if permissions > 0 else "-" + perm_str += "w" if permissions > 1 else "-" + perm_str += "x" if (f.get("type", 0) == 2 and permissions >= 1) else "-" + user = f"{ip}" if path.startswith(f"/home/{ip}") else "root" + output.append(f"{perm_str} {user} {user} {f['name']}") + else: + output = [f["name"] for f in files] + if all_files: + output.insert(0, ".") + output.insert(1, "..") + else: + output = [file for file in output if not file.startswith(".")] + if long_format: + output = "\n".join(output) + else: + output = " ".join(output) + + return json_response(request, {"output": output}, 200) + + +@terminal_bp.route("/terminal/pwd") +def pwd(): + if "pwd" not in session: + session["pwd"] = f"/home/{getClientIP(request)}" + pwd = session["pwd"] + if pwd == "/home/" + getClientIP(request): + pwd = "~" + return json_response(request, {"output": pwd, "raw": session["pwd"]}, 200) + + +@terminal_bp.route("/terminal/execute/cd", methods=["POST"]) +def cd(): + data = request.get_json() or {} + args = data.get("args", "") + args = args.split() + + if not args: + # No path provided, go to home + session["pwd"] = f"/home/{getClientIP(request)}" + output = "" + else: + path = args[0] + # Simulate changing directory + if path == "~": + session["pwd"] = f"/home/{getClientIP(request)}" + output = "" + + if not path.startswith("/"): + path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path) + + if is_valid_path(path): + session["pwd"] = sanitize_path(path) + output = "" + else: + output = f"bash: cd: {path}: No such file or directory" + + return json_response(request, {"output": output}, 200) + +def can_write_to_path(path: str) -> bool: + """Check if the user can write to the given path (must be in their home directory).""" + user_home = f"/home/{getClientIP(request)}" + normalized_path = sanitize_path(path) + return normalized_path.startswith(user_home) + +def create_file(path: str, content: str = "") -> bool: + """Create a new file at the given path.""" + if session.get("paths", None) is None: + setup_path_session() + + parts = path.strip("/").split("/") + filename = parts[-1] + dir_path = "/".join(parts[:-1]) + + # Get the directory + if not is_valid_path("/" + dir_path): + return False + + # Get nodes in directory + dir_parts = dir_path.strip("/").split("/") + current_level = session["paths"] + + for part in dir_parts: + if part == "": + continue + for item in current_level: + if item["name"] == part and item["type"] == 0: + current_level = item.get("children", []) + break + + # Check if file already exists + for item in current_level: + if item["name"] == filename: + # Update existing file + item["content"] = content + session.modified = True + return True + + # Create new file + new_file = { + "name": filename, + "type": 1, + "content": content, + "permissions": 2 + } + current_level.append(new_file) + session.modified = True + return True + +def update_file_content(path: str, content: str) -> bool: + """Update the content of an existing file.""" + if session.get("paths", None) is None: + setup_path_session() + + parts = path.strip("/").split("/") + current_level = session["paths"] + + for i, part in enumerate(parts): + for item in current_level: + if item["name"] == part: + if i == len(parts) - 1: + # Update the file content + if item["type"] == 1: + item["content"] = content + session.modified = True + return True + return False + else: + current_level = item.get("children", []) + break + return False + +@terminal_bp.route("/terminal/execute/touch", methods=["POST"]) +def touch(): + try: + data = request.get_json() or {} + args = data.get("args", "") + args = args.split() + + if not args: + return json_response(request, {"output": "touch: missing file operand"}, 200) + + path = args[0] + if not path.startswith("/"): + path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path) + + # Check if user can write to this path + if not can_write_to_path(path): + return json_response(request, {"output": f"touch: cannot touch '{path}': Permission denied"}, 200) + + # Check if file already exists + if is_valid_file(path): + return json_response(request, {"output": f"touch: '{path}': File already exists"}, 200) + + # Create the file + if create_file(path, ""): + output = "" + else: + output = f"touch: cannot create '{path}': No such file or directory" + + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"touch: {str(e)}"}, 200) + +@terminal_bp.route("/terminal/execute/nano", methods=["POST"]) +def nano(): + try: + data = request.get_json() or {} + args = data.get("args", "") + + # Parse args - format: filename CONTENT content here + parts = args.split(" CONTENT ", 1) + if len(parts) != 2: + return json_response(request, {"output": "Usage: nano [file] CONTENT [text]\nExample: nano test.txt CONTENT Hello World"}, 200) + + path = parts[0].strip() + content = parts[1] + + if not path.startswith("/"): + path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path) + + # Check if user can write to this path + if not can_write_to_path(path): + return json_response(request, {"output": f"nano: cannot write to '{path}': Permission denied"}, 200) + + # Check if file exists + if is_valid_file(path): + # Update existing file + node = get_node(path) + if node.get("permissions", 0) < 2: + return json_response(request, {"output": f"nano: cannot write to '{path}': Permission denied"}, 200) + + if update_file_content(path, content): + output = "" + else: + output = f"nano: failed to update '{path}'" + else: + # Create new file + if create_file(path, content): + output = f"File '{path}' created successfully" + else: + output = f"nano: cannot create '{path}': No such file or directory" + + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"nano: {str(e)}"}, 200) + +@terminal_bp.route("/terminal/execute/cat", methods=["POST"]) +def cat(): + try: + data = request.get_json() or {} + args = data.get("args", "") + args = args.split() + + if not args: + return json_response(request, {"output": "cat: missing file operand"}, 200) + + path = args[0] + if not path.startswith("/"): + path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path) + + # Check if path is valid + if not is_valid_file(path): + # Check if it is a binary + if is_valid_binary(path): + return json_response(request, {"output": f"cat: {path}: Binary file"}, 200) + return json_response(request, {"output": f"cat: {path}: No such file or directory"}, 200) + + output = get_node(path).get("content", "") + + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"cat: {str(e)}"}, 200) + +@terminal_bp.route("/terminal/execute/echo", methods=["POST"]) +def echo(): + try: + data = request.get_json() or {} + args = data.get("args", "") + return json_response(request, {"output": args}, 200) + except Exception as e: + return json_response(request, {"output": f"echo: {str(e)}"}, 200) + +@terminal_bp.route("/terminal/execute/date", methods=["POST"]) +def date(): + try: + # See if any args are passed + data = request.get_json() or {} + args = data.get("args", "") + + # Use arguments to format date if needed + if args: + # If args if --help or -h, show help message + if args in ("--help", "-h"): + output = ( + "Usage: date [FORMAT]\n\n" + "Display the current date and time.\n\n" + "FORMAT controls the output. Some common format specifiers:\n" + " %a Abbreviated weekday name (e.g., 'Mon')\n" + " %b Abbreviated month name (e.g., 'Jan')\n" + " %d Day of the month (01 to 31)\n" + " %H Hour (00 to 23)\n" + " %M Minute (00 to 59)\n" + " %S Second (00 to 60)\n" + " %Y Year with century (e.g., 2024)\n\n" + "Example: date '%Y-%m-%d %H:%M:%S'" + ) + return json_response(request, {"output": output}, 200) + + try: + output = datetime.now().strftime(args) + except Exception: + output = "Invalid date format." + else: + output = datetime.now().strftime("%a %b %d %H:%M:%S %Z %Y") + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"date: {str(e)}"}, 200) + +@terminal_bp.route("/terminal/execute/rm", methods=["POST"]) +def rm(): + try: + data = request.get_json() or {} + args = data.get("args", "") + args = args.split() + + if not args: + return json_response(request, {"output": "rm: missing operand"}, 200) + + path = args[0] + if not path.startswith("/"): + path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path) + + # Check if user can write to this path + if not can_write_to_path(path): + return json_response(request, {"output": f"rm: cannot remove '{path}': Permission denied"}, 200) + + # Check if path is valid + if not is_valid_file(path) and not is_valid_binary(path): + return json_response(request, {"output": f"rm: cannot remove '{path}': No such file"}, 200) + + # Get the node + node = get_node(path) + # Only let user delete files if the permission is >1 (writable) + if node.get("permissions", 0) < 2: + return json_response(request, {"output": f"rm: cannot remove '{path}': Permission denied"}, 200) + + # Only let the user remove files + if node.get("type", 1) != 1: + return json_response(request, {"output": f"rm: cannot remove '{path}': Is a directory"}, 200) + + # Remove the file from session paths + if remove_node(path): + output = f"Removed '{path}'" + else: + output = f"Failed to remove '{path}'" + + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"rm: {str(e)}"}, 200) + +@terminal_bp.route("/terminal/execute/tree", methods=["POST"]) +def tree(): + try: + data = request.get_json() or {} + args = data.get("args", "") + path = session.get("pwd", f"/home/{getClientIP(request)}") + if args: + path = args + if not path.startswith("/"): + path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path) + if not is_valid_path(path): + return json_response(request, {"output": f"tree: cannot access '{path}': No such file or directory"}, 200) + + output = build_tree(path).rstrip() + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"tree: {str(e)}"}, 200) + +@terminal_bp.route("/terminal/execute/", methods=["POST"]) +def execute_catch(command): + try: + # Basic command processing + if command == "help": + output = "Available commands:\n" + \ + "\n".join(f" {cmd}: {desc}" for cmd, desc in COMMANDS.items()) + elif command == "about": + output = "This is a simulated terminal interface created by Nathan Woodburn." + + elif command == "whoami": + output = getClientIP(request) + elif command == "pwd": + # Get pwd from session or simulate + output = session.get("pwd", f"/home/{getClientIP(request)}") + else: + output = f"Command not found: {command}" + + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"{command}: {str(e)}"}, 200) + +@terminal_bp.route("/terminal/execute/reset", methods=["POST"]) +def reset(): + try: + # Clear the session data related to terminal + session.pop("pwd", None) + session.pop("paths", None) + output = "Terminal session has been reset." + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"reset: {str(e)}"}, 200) + +# Add error handler at the end +@terminal_bp.errorhandler(Exception) +def handle_terminal_error(error): + """Handle all exceptions in terminal blueprint.""" + error_message = f"Terminal error: {str(error)}" + return json_response(request, {"output": error_message}, 500) \ No newline at end of file diff --git a/curl.py b/curl.py index 526f1f4..078a50a 100644 --- a/curl.py +++ b/curl.py @@ -77,6 +77,35 @@ def get_projects(): return projects +@lru_cache(maxsize=32) +def valid_curl_path(path: str) -> bool: + """Check if the given path corresponds to a valid curl/ascii response.""" + path = clean_path(path) + + # Special cases + special_paths = [ + "index", + "projects", + "donate", + "donate/more", + "tools" + ] + if path in special_paths: + return True + + # Check for donate/ pattern + if path.startswith("donate/"): + coin = path.split("/")[1] + address = getAddress(coin) + if address != "": + return True + + # Check if .ascii template exists + if os.path.exists(f"templates/{path}.ascii"): + return True + + return False + def curl_response(request): # Check if .ascii exists path = clean_path(request.path) @@ -119,8 +148,4 @@ def curl_response(request): if os.path.exists(f"templates/{path}.ascii"): return render_template(f"{path}.ascii",header=get_header()), 200, {'Content-Type': 'text/plain; charset=utf-8'} - # Fallback to html if it exists - if os.path.exists(f"templates/{path}.html"): - return render_template(f"{path}.html") - return error_response(request) \ No newline at end of file diff --git a/server.py b/server.py index a581350..71a365d 100644 --- a/server.py +++ b/server.py @@ -26,10 +26,12 @@ from blueprints.api import api_bp from blueprints.podcast import podcast_bp from blueprints.acme import acme_bp from blueprints.spotify import spotify_bp +from blueprints.terminal import terminal_bp from tools import isCurl, isCrawler, getAddress, getFilePath, error_response, getClientIP, json_response, getHandshakeScript, get_tools_data -from curl import curl_response +from curl import curl_response, valid_curl_path app = Flask(__name__) +app.secret_key = os.getenv("FLASK_SECRET_KEY", "supersecretkey") CORS(app) # Register blueprints @@ -40,6 +42,7 @@ app.register_blueprint(api_bp, url_prefix='/api/v1') app.register_blueprint(podcast_bp) app.register_blueprint(acme_bp) app.register_blueprint(spotify_bp, url_prefix='/spotify') +app.register_blueprint(terminal_bp) dotenv.load_dotenv() @@ -704,7 +707,7 @@ def catch_all(path: str): return error_response(request, message="Restricted route", code=403) # If curl request, return curl response - if isCurl(request): + if isCurl(request) and valid_curl_path(path): return curl_response(request) if path in REDIRECT_ROUTES: diff --git a/templates/terminal.html b/templates/terminal.html new file mode 100644 index 0000000..dfeae0e --- /dev/null +++ b/templates/terminal.html @@ -0,0 +1,472 @@ + + + + + + Terminal | Nathan.Woodburn/ + + + + +
+
Nathan.Woodburn/
+
Type 'help' for available commands
+
+
+
┌──({{user}}@NW)-[~]
+
+ └─$  + +
+
+
+ + +
+
nano -
+ +
Save modified buffer? (Y/N)
+ +
+ + + +