diff --git a/blueprints/terminal.py b/blueprints/terminal.py index c17f81c..9574edb 100644 --- a/blueprints/terminal.py +++ b/blueprints/terminal.py @@ -21,6 +21,10 @@ COMMANDS = { "date": "Show current date and time with optional format", "cd [path]": "Change directory to path", "cat [file]": "Display file contents", + "rm [file]": "Remove a file", + "tree [path]": "Display directory tree", + "touch [file]": "Create a new empty file", + "nano [file]": "Edit a file (write content)", "reset": "Reset the terminal session", "exit": "Exit the terminal session", } @@ -56,13 +60,17 @@ def setup_path_session(): if "pwd" not in session: session["pwd"] = f"/home/{getClientIP(request)}" if "paths" not in session: + binaries = [] + for cmd in COMMANDS.keys(): + binaries.append({"name": cmd.split()[0], "type": 2, "content": "", "permissions": 1}) + session["paths"] = [ {"name": "home", "type": 0, "children": [ {"name": str(getClientIP(request)), "type": 0, "children": [ {"name": "Readme.txt", "type": 1, "content": "This is a README file.", "permissions": 2}, ], "permissions": 2} ], "permissions": 1}, - {"name": "bin", "type": 0, "children": [], "permissions": 1}, + {"name": "bin", "type": 0, "children": binaries, "permissions": 1}, {"name": "boot", "type": 0, "children": [], "permissions": 1}, {"name": "dev", "type": 0, "children": [], "permissions": 1}, {"name": "etc", "type": 0, "children": [], "permissions": 1}, @@ -120,6 +128,22 @@ def is_valid_file(path: str) -> bool: if item["name"] == parts[-1] and item["type"] == 1: return True return False + +def is_valid_binary(path: str) -> bool: + """Check if the given file exists in the current directory.""" + if session.get("paths", None) is None: + setup_path_session() + + # Get path parts + parts = path.split("/") + # Get files in the directory + if is_valid_path("/".join(parts[:-1])): + files = get_nodes_in_directory("/".join(parts[:-1])) + for item in files: + print(item) + if item["name"] == parts[-1] and item["type"] == 2: + return True + return False def get_node(path: str) -> dict: """Get the node (file or directory) at the given path.""" @@ -138,6 +162,17 @@ def get_node(path: str) -> dict: break return {} +def build_tree(path: str, prefix: str = "") -> str: + output = "" + files = get_nodes_in_directory(path) + for i, item in enumerate(files): + connector = "└── " if i == len(files) - 1 else "├── " + output += f"{prefix}{connector}{item['name']}\n" + if item["type"] == 0: # Directory + extension = " " if i == len(files) - 1 else "│ " + output += build_tree(sanitize_path(path + "/" + item["name"]), prefix + extension) + return output + def sanitize_path(path: str) -> str: """Sanitize the given path to prevent directory traversal.""" parts = path.strip("/").split("/") @@ -152,6 +187,29 @@ def sanitize_path(path: str) -> str: sanitized_parts.append(part) return "/" + "/".join(sanitized_parts) +def remove_node(path: str) -> bool: + """Remove the node (file or directory) at the given path.""" + if session.get("paths", None) is None: + setup_path_session() + + parts = path.strip("/").split("/") + current_level = session["paths"] + for i, part in enumerate(parts): + for j, item in enumerate(current_level): + if item["name"] == part: + if i == len(parts) - 1: + # Remove the item + del current_level[j] + + # Update the session paths + session["paths"] = session["paths"] + + return True + else: + current_level = item.get("children", []) + break + return False + @terminal_bp.route("/terminal/execute/ls", methods=["POST"]) def ls(): @@ -178,7 +236,13 @@ def ls(): return json_response(request, {"output": f"ls: cannot access '{path}': No such file or directory"}, 200) files = get_nodes_in_directory(path) + print(files) output = [file["name"] for file in files] + if all_files: + output.insert(0, ".") + output.insert(1, "..") + else: + output = [file for file in output if not file.startswith(".")] output = " ".join(output) return json_response(request, {"output": output}, 200) @@ -222,94 +286,325 @@ def cd(): return json_response(request, {"output": output}, 200) +def can_write_to_path(path: str) -> bool: + """Check if the user can write to the given path (must be in their home directory).""" + user_home = f"/home/{getClientIP(request)}" + normalized_path = sanitize_path(path) + return normalized_path.startswith(user_home) + +def create_file(path: str, content: str = "") -> bool: + """Create a new file at the given path.""" + if session.get("paths", None) is None: + setup_path_session() + + parts = path.strip("/").split("/") + filename = parts[-1] + dir_path = "/".join(parts[:-1]) + + # Get the directory + if not is_valid_path("/" + dir_path): + return False + + # Get nodes in directory + dir_parts = dir_path.strip("/").split("/") + current_level = session["paths"] + + for part in dir_parts: + if part == "": + continue + for item in current_level: + if item["name"] == part and item["type"] == 0: + current_level = item.get("children", []) + break + + # Check if file already exists + for item in current_level: + if item["name"] == filename: + # Update existing file + item["content"] = content + session.modified = True + return True + + # Create new file + new_file = { + "name": filename, + "type": 1, + "content": content, + "permissions": 2 + } + current_level.append(new_file) + session.modified = True + return True + +def update_file_content(path: str, content: str) -> bool: + """Update the content of an existing file.""" + if session.get("paths", None) is None: + setup_path_session() + + parts = path.strip("/").split("/") + current_level = session["paths"] + + for i, part in enumerate(parts): + for item in current_level: + if item["name"] == part: + if i == len(parts) - 1: + # Update the file content + if item["type"] == 1: + item["content"] = content + session.modified = True + return True + return False + else: + current_level = item.get("children", []) + break + return False + +@terminal_bp.route("/terminal/execute/touch", methods=["POST"]) +def touch(): + try: + data = request.get_json() or {} + args = data.get("args", "") + args = args.split() + + if not args: + return json_response(request, {"output": "touch: missing file operand"}, 200) + + path = args[0] + if not path.startswith("/"): + path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path) + + # Check if user can write to this path + if not can_write_to_path(path): + return json_response(request, {"output": f"touch: cannot touch '{path}': Permission denied"}, 200) + + # Check if file already exists + if is_valid_file(path): + return json_response(request, {"output": f"touch: '{path}': File already exists"}, 200) + + # Create the file + if create_file(path, ""): + output = "" + else: + output = f"touch: cannot create '{path}': No such file or directory" + + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"touch: {str(e)}"}, 200) + +@terminal_bp.route("/terminal/execute/nano", methods=["POST"]) +def nano(): + try: + data = request.get_json() or {} + args = data.get("args", "") + + # Parse args - format: filename CONTENT content here + parts = args.split(" CONTENT ", 1) + if len(parts) != 2: + return json_response(request, {"output": "Usage: nano [file] CONTENT [text]\nExample: nano test.txt CONTENT Hello World"}, 200) + + path = parts[0].strip() + content = parts[1] + + if not path.startswith("/"): + path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path) + + # Check if user can write to this path + if not can_write_to_path(path): + return json_response(request, {"output": f"nano: cannot write to '{path}': Permission denied"}, 200) + + # Check if file exists + if is_valid_file(path): + # Update existing file + node = get_node(path) + if node.get("permissions", 0) < 2: + return json_response(request, {"output": f"nano: cannot write to '{path}': Permission denied"}, 200) + + if update_file_content(path, content): + output = f"File '{path}' updated successfully" + else: + output = f"nano: failed to update '{path}'" + else: + # Create new file + if create_file(path, content): + output = f"File '{path}' created successfully" + else: + output = f"nano: cannot create '{path}': No such file or directory" + + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"nano: {str(e)}"}, 200) + @terminal_bp.route("/terminal/execute/cat", methods=["POST"]) def cat(): - data = request.get_json() or {} - args = data.get("args", "") - args = args.split() + try: + data = request.get_json() or {} + args = data.get("args", "") + args = args.split() - if not args: - return json_response(request, {"output": "cat: missing file operand"}, 200) + if not args: + return json_response(request, {"output": "cat: missing file operand"}, 200) - path = args[0] - if not path.startswith("/"): - path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path) + path = args[0] + if not path.startswith("/"): + path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path) - # Check if path is valid - if not is_valid_file(path): - return json_response(request, {"output": f"cat: {path}: No such file or directory"}, 200) + # Check if path is valid + if not is_valid_file(path): + # Check if it is a binary + if is_valid_binary(path): + return json_response(request, {"output": f"cat: {path}: Binary file"}, 200) + return json_response(request, {"output": f"cat: {path}: No such file or directory"}, 200) - output = get_node(path).get("content", "") + output = get_node(path).get("content", "") - return json_response(request, {"output": output}, 200) + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"cat: {str(e)}"}, 200) @terminal_bp.route("/terminal/execute/echo", methods=["POST"]) def echo(): - data = request.get_json() or {} - args = data.get("args", "") - return json_response(request, {"output": args}, 200) - + try: + data = request.get_json() or {} + args = data.get("args", "") + return json_response(request, {"output": args}, 200) + except Exception as e: + return json_response(request, {"output": f"echo: {str(e)}"}, 200) @terminal_bp.route("/terminal/execute/date", methods=["POST"]) def date(): - # See if any args are passed - data = request.get_json() or {} - args = data.get("args", "") + try: + # See if any args are passed + data = request.get_json() or {} + args = data.get("args", "") - # Use arguments to format date if needed - if args: - # If args if --help or -h, show help message - if args in ("--help", "-h"): - output = ( - "Usage: date [FORMAT]\n\n" - "Display the current date and time.\n\n" - "FORMAT controls the output. Some common format specifiers:\n" - " %a Abbreviated weekday name (e.g., 'Mon')\n" - " %b Abbreviated month name (e.g., 'Jan')\n" - " %d Day of the month (01 to 31)\n" - " %H Hour (00 to 23)\n" - " %M Minute (00 to 59)\n" - " %S Second (00 to 60)\n" - " %Y Year with century (e.g., 2024)\n\n" - "Example: date '%Y-%m-%d %H:%M:%S'" - ) - return json_response(request, {"output": output}, 200) + # Use arguments to format date if needed + if args: + # If args if --help or -h, show help message + if args in ("--help", "-h"): + output = ( + "Usage: date [FORMAT]\n\n" + "Display the current date and time.\n\n" + "FORMAT controls the output. Some common format specifiers:\n" + " %a Abbreviated weekday name (e.g., 'Mon')\n" + " %b Abbreviated month name (e.g., 'Jan')\n" + " %d Day of the month (01 to 31)\n" + " %H Hour (00 to 23)\n" + " %M Minute (00 to 59)\n" + " %S Second (00 to 60)\n" + " %Y Year with century (e.g., 2024)\n\n" + "Example: date '%Y-%m-%d %H:%M:%S'" + ) + return json_response(request, {"output": output}, 200) - try: - output = datetime.now().strftime(args) - except Exception: - output = "Invalid date format." - else: - output = datetime.now().strftime("%a %b %d %H:%M:%S %Z %Y") - return json_response(request, {"output": output}, 200) + try: + output = datetime.now().strftime(args) + except Exception: + output = "Invalid date format." + else: + output = datetime.now().strftime("%a %b %d %H:%M:%S %Z %Y") + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"date: {str(e)}"}, 200) +@terminal_bp.route("/terminal/execute/rm", methods=["POST"]) +def rm(): + try: + data = request.get_json() or {} + args = data.get("args", "") + args = args.split() + + if not args: + return json_response(request, {"output": "rm: missing operand"}, 200) + + path = args[0] + if not path.startswith("/"): + path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path) + + # Check if user can write to this path + if not can_write_to_path(path): + return json_response(request, {"output": f"rm: cannot remove '{path}': Permission denied"}, 200) + + # Check if path is valid + if not is_valid_file(path) and not is_valid_binary(path): + return json_response(request, {"output": f"rm: cannot remove '{path}': No such file"}, 200) + + # Get the node + node = get_node(path) + # Only let user delete files if the permission is >1 (writable) + if node.get("permissions", 0) < 2: + return json_response(request, {"output": f"rm: cannot remove '{path}': Permission denied"}, 200) + + # Only let the user remove files + if node.get("type", 1) != 1: + return json_response(request, {"output": f"rm: cannot remove '{path}': Is a directory"}, 200) + + # Remove the file from session paths + if remove_node(path): + output = f"Removed '{path}'" + else: + output = f"Failed to remove '{path}'" + + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"rm: {str(e)}"}, 200) + +@terminal_bp.route("/terminal/execute/tree", methods=["POST"]) +def tree(): + try: + data = request.get_json() or {} + args = data.get("args", "") + path = session.get("pwd", f"/home/{getClientIP(request)}") + if args: + path = args + if not path.startswith("/"): + path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path) + if not is_valid_path(path): + return json_response(request, {"output": f"tree: cannot access '{path}': No such file or directory"}, 200) + + output = build_tree(path).rstrip() + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"tree: {str(e)}"}, 200) @terminal_bp.route("/terminal/execute/", methods=["POST"]) def execute_catch(command): - data = request.get_json() or {} - args = data.get("args", "") + try: + data = request.get_json() or {} + args = data.get("args", "") - # Basic command processing - if command == "help": - output = "Available commands:\n" + \ - "\n".join(f" {cmd}: {desc}" for cmd, desc in COMMANDS.items()) - elif command == "about": - output = "This is a simulated terminal interface created by Nathan Woodburn." - elif command == "whoami": - output = getClientIP(request) - elif command == "pwd": - # Get pwd from session or simulate - output = session.get("pwd", f"/home/{getClientIP(request)}") - else: - output = f"Command not found: {command}" + # Basic command processing + if command == "help": + output = "Available commands:\n" + \ + "\n".join(f" {cmd}: {desc}" for cmd, desc in COMMANDS.items()) + elif command == "about": + output = "This is a simulated terminal interface created by Nathan Woodburn." - return json_response(request, {"output": output}, 200) + elif command == "whoami": + output = getClientIP(request) + elif command == "pwd": + # Get pwd from session or simulate + output = session.get("pwd", f"/home/{getClientIP(request)}") + else: + output = f"Command not found: {command}" + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"{command}: {str(e)}"}, 200) @terminal_bp.route("/terminal/execute/reset", methods=["POST"]) def reset(): - # Clear the session data related to terminal - session.pop("pwd", None) - session.pop("paths", None) - output = "Terminal session has been reset." - return json_response(request, {"output": output}, 200) \ No newline at end of file + try: + # Clear the session data related to terminal + session.pop("pwd", None) + session.pop("paths", None) + output = "Terminal session has been reset." + return json_response(request, {"output": output}, 200) + except Exception as e: + return json_response(request, {"output": f"reset: {str(e)}"}, 200) + +# Add error handler at the end +@terminal_bp.errorhandler(Exception) +def handle_terminal_error(error): + """Handle all exceptions in terminal blueprint.""" + error_message = f"Terminal error: {str(error)}" + return json_response(request, {"output": error_message}, 500) \ No newline at end of file diff --git a/templates/terminal.html b/templates/terminal.html index df3c24c..fe45d3d 100644 --- a/templates/terminal.html +++ b/templates/terminal.html @@ -3,7 +3,7 @@ - Terminal + Terminal | Nathan.Woodburn/