feat: Add a ton of terminal features
This commit is contained in:
@@ -21,6 +21,10 @@ COMMANDS = {
|
|||||||
"date": "Show current date and time with optional format",
|
"date": "Show current date and time with optional format",
|
||||||
"cd [path]": "Change directory to path",
|
"cd [path]": "Change directory to path",
|
||||||
"cat [file]": "Display file contents",
|
"cat [file]": "Display file contents",
|
||||||
|
"rm [file]": "Remove a file",
|
||||||
|
"tree [path]": "Display directory tree",
|
||||||
|
"touch [file]": "Create a new empty file",
|
||||||
|
"nano [file]": "Edit a file (write content)",
|
||||||
"reset": "Reset the terminal session",
|
"reset": "Reset the terminal session",
|
||||||
"exit": "Exit the terminal session",
|
"exit": "Exit the terminal session",
|
||||||
}
|
}
|
||||||
@@ -56,13 +60,17 @@ def setup_path_session():
|
|||||||
if "pwd" not in session:
|
if "pwd" not in session:
|
||||||
session["pwd"] = f"/home/{getClientIP(request)}"
|
session["pwd"] = f"/home/{getClientIP(request)}"
|
||||||
if "paths" not in session:
|
if "paths" not in session:
|
||||||
|
binaries = []
|
||||||
|
for cmd in COMMANDS.keys():
|
||||||
|
binaries.append({"name": cmd.split()[0], "type": 2, "content": "", "permissions": 1})
|
||||||
|
|
||||||
session["paths"] = [
|
session["paths"] = [
|
||||||
{"name": "home", "type": 0, "children": [
|
{"name": "home", "type": 0, "children": [
|
||||||
{"name": str(getClientIP(request)), "type": 0, "children": [
|
{"name": str(getClientIP(request)), "type": 0, "children": [
|
||||||
{"name": "Readme.txt", "type": 1, "content": "This is a README file.", "permissions": 2},
|
{"name": "Readme.txt", "type": 1, "content": "This is a README file.", "permissions": 2},
|
||||||
], "permissions": 2}
|
], "permissions": 2}
|
||||||
], "permissions": 1},
|
], "permissions": 1},
|
||||||
{"name": "bin", "type": 0, "children": [], "permissions": 1},
|
{"name": "bin", "type": 0, "children": binaries, "permissions": 1},
|
||||||
{"name": "boot", "type": 0, "children": [], "permissions": 1},
|
{"name": "boot", "type": 0, "children": [], "permissions": 1},
|
||||||
{"name": "dev", "type": 0, "children": [], "permissions": 1},
|
{"name": "dev", "type": 0, "children": [], "permissions": 1},
|
||||||
{"name": "etc", "type": 0, "children": [], "permissions": 1},
|
{"name": "etc", "type": 0, "children": [], "permissions": 1},
|
||||||
@@ -121,6 +129,22 @@ def is_valid_file(path: str) -> bool:
|
|||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def is_valid_binary(path: str) -> bool:
|
||||||
|
"""Check if the given file exists in the current directory."""
|
||||||
|
if session.get("paths", None) is None:
|
||||||
|
setup_path_session()
|
||||||
|
|
||||||
|
# Get path parts
|
||||||
|
parts = path.split("/")
|
||||||
|
# Get files in the directory
|
||||||
|
if is_valid_path("/".join(parts[:-1])):
|
||||||
|
files = get_nodes_in_directory("/".join(parts[:-1]))
|
||||||
|
for item in files:
|
||||||
|
print(item)
|
||||||
|
if item["name"] == parts[-1] and item["type"] == 2:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
def get_node(path: str) -> dict:
|
def get_node(path: str) -> dict:
|
||||||
"""Get the node (file or directory) at the given path."""
|
"""Get the node (file or directory) at the given path."""
|
||||||
if session.get("paths", None) is None:
|
if session.get("paths", None) is None:
|
||||||
@@ -138,6 +162,17 @@ def get_node(path: str) -> dict:
|
|||||||
break
|
break
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
def build_tree(path: str, prefix: str = "") -> str:
|
||||||
|
output = ""
|
||||||
|
files = get_nodes_in_directory(path)
|
||||||
|
for i, item in enumerate(files):
|
||||||
|
connector = "└── " if i == len(files) - 1 else "├── "
|
||||||
|
output += f"{prefix}{connector}{item['name']}\n"
|
||||||
|
if item["type"] == 0: # Directory
|
||||||
|
extension = " " if i == len(files) - 1 else "│ "
|
||||||
|
output += build_tree(sanitize_path(path + "/" + item["name"]), prefix + extension)
|
||||||
|
return output
|
||||||
|
|
||||||
def sanitize_path(path: str) -> str:
|
def sanitize_path(path: str) -> str:
|
||||||
"""Sanitize the given path to prevent directory traversal."""
|
"""Sanitize the given path to prevent directory traversal."""
|
||||||
parts = path.strip("/").split("/")
|
parts = path.strip("/").split("/")
|
||||||
@@ -152,6 +187,29 @@ def sanitize_path(path: str) -> str:
|
|||||||
sanitized_parts.append(part)
|
sanitized_parts.append(part)
|
||||||
return "/" + "/".join(sanitized_parts)
|
return "/" + "/".join(sanitized_parts)
|
||||||
|
|
||||||
|
def remove_node(path: str) -> bool:
|
||||||
|
"""Remove the node (file or directory) at the given path."""
|
||||||
|
if session.get("paths", None) is None:
|
||||||
|
setup_path_session()
|
||||||
|
|
||||||
|
parts = path.strip("/").split("/")
|
||||||
|
current_level = session["paths"]
|
||||||
|
for i, part in enumerate(parts):
|
||||||
|
for j, item in enumerate(current_level):
|
||||||
|
if item["name"] == part:
|
||||||
|
if i == len(parts) - 1:
|
||||||
|
# Remove the item
|
||||||
|
del current_level[j]
|
||||||
|
|
||||||
|
# Update the session paths
|
||||||
|
session["paths"] = session["paths"]
|
||||||
|
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
current_level = item.get("children", [])
|
||||||
|
break
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
@terminal_bp.route("/terminal/execute/ls", methods=["POST"])
|
@terminal_bp.route("/terminal/execute/ls", methods=["POST"])
|
||||||
def ls():
|
def ls():
|
||||||
@@ -178,7 +236,13 @@ def ls():
|
|||||||
return json_response(request, {"output": f"ls: cannot access '{path}': No such file or directory"}, 200)
|
return json_response(request, {"output": f"ls: cannot access '{path}': No such file or directory"}, 200)
|
||||||
|
|
||||||
files = get_nodes_in_directory(path)
|
files = get_nodes_in_directory(path)
|
||||||
|
print(files)
|
||||||
output = [file["name"] for file in files]
|
output = [file["name"] for file in files]
|
||||||
|
if all_files:
|
||||||
|
output.insert(0, ".")
|
||||||
|
output.insert(1, "..")
|
||||||
|
else:
|
||||||
|
output = [file for file in output if not file.startswith(".")]
|
||||||
output = " ".join(output)
|
output = " ".join(output)
|
||||||
|
|
||||||
return json_response(request, {"output": output}, 200)
|
return json_response(request, {"output": output}, 200)
|
||||||
@@ -222,8 +286,157 @@ def cd():
|
|||||||
|
|
||||||
return json_response(request, {"output": output}, 200)
|
return json_response(request, {"output": output}, 200)
|
||||||
|
|
||||||
|
def can_write_to_path(path: str) -> bool:
|
||||||
|
"""Check if the user can write to the given path (must be in their home directory)."""
|
||||||
|
user_home = f"/home/{getClientIP(request)}"
|
||||||
|
normalized_path = sanitize_path(path)
|
||||||
|
return normalized_path.startswith(user_home)
|
||||||
|
|
||||||
|
def create_file(path: str, content: str = "") -> bool:
|
||||||
|
"""Create a new file at the given path."""
|
||||||
|
if session.get("paths", None) is None:
|
||||||
|
setup_path_session()
|
||||||
|
|
||||||
|
parts = path.strip("/").split("/")
|
||||||
|
filename = parts[-1]
|
||||||
|
dir_path = "/".join(parts[:-1])
|
||||||
|
|
||||||
|
# Get the directory
|
||||||
|
if not is_valid_path("/" + dir_path):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Get nodes in directory
|
||||||
|
dir_parts = dir_path.strip("/").split("/")
|
||||||
|
current_level = session["paths"]
|
||||||
|
|
||||||
|
for part in dir_parts:
|
||||||
|
if part == "":
|
||||||
|
continue
|
||||||
|
for item in current_level:
|
||||||
|
if item["name"] == part and item["type"] == 0:
|
||||||
|
current_level = item.get("children", [])
|
||||||
|
break
|
||||||
|
|
||||||
|
# Check if file already exists
|
||||||
|
for item in current_level:
|
||||||
|
if item["name"] == filename:
|
||||||
|
# Update existing file
|
||||||
|
item["content"] = content
|
||||||
|
session.modified = True
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Create new file
|
||||||
|
new_file = {
|
||||||
|
"name": filename,
|
||||||
|
"type": 1,
|
||||||
|
"content": content,
|
||||||
|
"permissions": 2
|
||||||
|
}
|
||||||
|
current_level.append(new_file)
|
||||||
|
session.modified = True
|
||||||
|
return True
|
||||||
|
|
||||||
|
def update_file_content(path: str, content: str) -> bool:
|
||||||
|
"""Update the content of an existing file."""
|
||||||
|
if session.get("paths", None) is None:
|
||||||
|
setup_path_session()
|
||||||
|
|
||||||
|
parts = path.strip("/").split("/")
|
||||||
|
current_level = session["paths"]
|
||||||
|
|
||||||
|
for i, part in enumerate(parts):
|
||||||
|
for item in current_level:
|
||||||
|
if item["name"] == part:
|
||||||
|
if i == len(parts) - 1:
|
||||||
|
# Update the file content
|
||||||
|
if item["type"] == 1:
|
||||||
|
item["content"] = content
|
||||||
|
session.modified = True
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
current_level = item.get("children", [])
|
||||||
|
break
|
||||||
|
return False
|
||||||
|
|
||||||
|
@terminal_bp.route("/terminal/execute/touch", methods=["POST"])
|
||||||
|
def touch():
|
||||||
|
try:
|
||||||
|
data = request.get_json() or {}
|
||||||
|
args = data.get("args", "")
|
||||||
|
args = args.split()
|
||||||
|
|
||||||
|
if not args:
|
||||||
|
return json_response(request, {"output": "touch: missing file operand"}, 200)
|
||||||
|
|
||||||
|
path = args[0]
|
||||||
|
if not path.startswith("/"):
|
||||||
|
path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path)
|
||||||
|
|
||||||
|
# Check if user can write to this path
|
||||||
|
if not can_write_to_path(path):
|
||||||
|
return json_response(request, {"output": f"touch: cannot touch '{path}': Permission denied"}, 200)
|
||||||
|
|
||||||
|
# Check if file already exists
|
||||||
|
if is_valid_file(path):
|
||||||
|
return json_response(request, {"output": f"touch: '{path}': File already exists"}, 200)
|
||||||
|
|
||||||
|
# Create the file
|
||||||
|
if create_file(path, ""):
|
||||||
|
output = ""
|
||||||
|
else:
|
||||||
|
output = f"touch: cannot create '{path}': No such file or directory"
|
||||||
|
|
||||||
|
return json_response(request, {"output": output}, 200)
|
||||||
|
except Exception as e:
|
||||||
|
return json_response(request, {"output": f"touch: {str(e)}"}, 200)
|
||||||
|
|
||||||
|
@terminal_bp.route("/terminal/execute/nano", methods=["POST"])
|
||||||
|
def nano():
|
||||||
|
try:
|
||||||
|
data = request.get_json() or {}
|
||||||
|
args = data.get("args", "")
|
||||||
|
|
||||||
|
# Parse args - format: filename CONTENT content here
|
||||||
|
parts = args.split(" CONTENT ", 1)
|
||||||
|
if len(parts) != 2:
|
||||||
|
return json_response(request, {"output": "Usage: nano [file] CONTENT [text]\nExample: nano test.txt CONTENT Hello World"}, 200)
|
||||||
|
|
||||||
|
path = parts[0].strip()
|
||||||
|
content = parts[1]
|
||||||
|
|
||||||
|
if not path.startswith("/"):
|
||||||
|
path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path)
|
||||||
|
|
||||||
|
# Check if user can write to this path
|
||||||
|
if not can_write_to_path(path):
|
||||||
|
return json_response(request, {"output": f"nano: cannot write to '{path}': Permission denied"}, 200)
|
||||||
|
|
||||||
|
# Check if file exists
|
||||||
|
if is_valid_file(path):
|
||||||
|
# Update existing file
|
||||||
|
node = get_node(path)
|
||||||
|
if node.get("permissions", 0) < 2:
|
||||||
|
return json_response(request, {"output": f"nano: cannot write to '{path}': Permission denied"}, 200)
|
||||||
|
|
||||||
|
if update_file_content(path, content):
|
||||||
|
output = f"File '{path}' updated successfully"
|
||||||
|
else:
|
||||||
|
output = f"nano: failed to update '{path}'"
|
||||||
|
else:
|
||||||
|
# Create new file
|
||||||
|
if create_file(path, content):
|
||||||
|
output = f"File '{path}' created successfully"
|
||||||
|
else:
|
||||||
|
output = f"nano: cannot create '{path}': No such file or directory"
|
||||||
|
|
||||||
|
return json_response(request, {"output": output}, 200)
|
||||||
|
except Exception as e:
|
||||||
|
return json_response(request, {"output": f"nano: {str(e)}"}, 200)
|
||||||
|
|
||||||
@terminal_bp.route("/terminal/execute/cat", methods=["POST"])
|
@terminal_bp.route("/terminal/execute/cat", methods=["POST"])
|
||||||
def cat():
|
def cat():
|
||||||
|
try:
|
||||||
data = request.get_json() or {}
|
data = request.get_json() or {}
|
||||||
args = data.get("args", "")
|
args = data.get("args", "")
|
||||||
args = args.split()
|
args = args.split()
|
||||||
@@ -237,21 +450,29 @@ def cat():
|
|||||||
|
|
||||||
# Check if path is valid
|
# Check if path is valid
|
||||||
if not is_valid_file(path):
|
if not is_valid_file(path):
|
||||||
|
# Check if it is a binary
|
||||||
|
if is_valid_binary(path):
|
||||||
|
return json_response(request, {"output": f"cat: {path}: Binary file"}, 200)
|
||||||
return json_response(request, {"output": f"cat: {path}: No such file or directory"}, 200)
|
return json_response(request, {"output": f"cat: {path}: No such file or directory"}, 200)
|
||||||
|
|
||||||
output = get_node(path).get("content", "")
|
output = get_node(path).get("content", "")
|
||||||
|
|
||||||
return json_response(request, {"output": output}, 200)
|
return json_response(request, {"output": output}, 200)
|
||||||
|
except Exception as e:
|
||||||
|
return json_response(request, {"output": f"cat: {str(e)}"}, 200)
|
||||||
|
|
||||||
@terminal_bp.route("/terminal/execute/echo", methods=["POST"])
|
@terminal_bp.route("/terminal/execute/echo", methods=["POST"])
|
||||||
def echo():
|
def echo():
|
||||||
|
try:
|
||||||
data = request.get_json() or {}
|
data = request.get_json() or {}
|
||||||
args = data.get("args", "")
|
args = data.get("args", "")
|
||||||
return json_response(request, {"output": args}, 200)
|
return json_response(request, {"output": args}, 200)
|
||||||
|
except Exception as e:
|
||||||
|
return json_response(request, {"output": f"echo: {str(e)}"}, 200)
|
||||||
|
|
||||||
@terminal_bp.route("/terminal/execute/date", methods=["POST"])
|
@terminal_bp.route("/terminal/execute/date", methods=["POST"])
|
||||||
def date():
|
def date():
|
||||||
|
try:
|
||||||
# See if any args are passed
|
# See if any args are passed
|
||||||
data = request.get_json() or {}
|
data = request.get_json() or {}
|
||||||
args = data.get("args", "")
|
args = data.get("args", "")
|
||||||
@@ -282,10 +503,72 @@ def date():
|
|||||||
else:
|
else:
|
||||||
output = datetime.now().strftime("%a %b %d %H:%M:%S %Z %Y")
|
output = datetime.now().strftime("%a %b %d %H:%M:%S %Z %Y")
|
||||||
return json_response(request, {"output": output}, 200)
|
return json_response(request, {"output": output}, 200)
|
||||||
|
except Exception as e:
|
||||||
|
return json_response(request, {"output": f"date: {str(e)}"}, 200)
|
||||||
|
|
||||||
|
@terminal_bp.route("/terminal/execute/rm", methods=["POST"])
|
||||||
|
def rm():
|
||||||
|
try:
|
||||||
|
data = request.get_json() or {}
|
||||||
|
args = data.get("args", "")
|
||||||
|
args = args.split()
|
||||||
|
|
||||||
|
if not args:
|
||||||
|
return json_response(request, {"output": "rm: missing operand"}, 200)
|
||||||
|
|
||||||
|
path = args[0]
|
||||||
|
if not path.startswith("/"):
|
||||||
|
path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path)
|
||||||
|
|
||||||
|
# Check if user can write to this path
|
||||||
|
if not can_write_to_path(path):
|
||||||
|
return json_response(request, {"output": f"rm: cannot remove '{path}': Permission denied"}, 200)
|
||||||
|
|
||||||
|
# Check if path is valid
|
||||||
|
if not is_valid_file(path) and not is_valid_binary(path):
|
||||||
|
return json_response(request, {"output": f"rm: cannot remove '{path}': No such file"}, 200)
|
||||||
|
|
||||||
|
# Get the node
|
||||||
|
node = get_node(path)
|
||||||
|
# Only let user delete files if the permission is >1 (writable)
|
||||||
|
if node.get("permissions", 0) < 2:
|
||||||
|
return json_response(request, {"output": f"rm: cannot remove '{path}': Permission denied"}, 200)
|
||||||
|
|
||||||
|
# Only let the user remove files
|
||||||
|
if node.get("type", 1) != 1:
|
||||||
|
return json_response(request, {"output": f"rm: cannot remove '{path}': Is a directory"}, 200)
|
||||||
|
|
||||||
|
# Remove the file from session paths
|
||||||
|
if remove_node(path):
|
||||||
|
output = f"Removed '{path}'"
|
||||||
|
else:
|
||||||
|
output = f"Failed to remove '{path}'"
|
||||||
|
|
||||||
|
return json_response(request, {"output": output}, 200)
|
||||||
|
except Exception as e:
|
||||||
|
return json_response(request, {"output": f"rm: {str(e)}"}, 200)
|
||||||
|
|
||||||
|
@terminal_bp.route("/terminal/execute/tree", methods=["POST"])
|
||||||
|
def tree():
|
||||||
|
try:
|
||||||
|
data = request.get_json() or {}
|
||||||
|
args = data.get("args", "")
|
||||||
|
path = session.get("pwd", f"/home/{getClientIP(request)}")
|
||||||
|
if args:
|
||||||
|
path = args
|
||||||
|
if not path.startswith("/"):
|
||||||
|
path = sanitize_path(session.get("pwd", f"/home/{getClientIP(request)}") + "/" + path)
|
||||||
|
if not is_valid_path(path):
|
||||||
|
return json_response(request, {"output": f"tree: cannot access '{path}': No such file or directory"}, 200)
|
||||||
|
|
||||||
|
output = build_tree(path).rstrip()
|
||||||
|
return json_response(request, {"output": output}, 200)
|
||||||
|
except Exception as e:
|
||||||
|
return json_response(request, {"output": f"tree: {str(e)}"}, 200)
|
||||||
|
|
||||||
@terminal_bp.route("/terminal/execute/<command>", methods=["POST"])
|
@terminal_bp.route("/terminal/execute/<command>", methods=["POST"])
|
||||||
def execute_catch(command):
|
def execute_catch(command):
|
||||||
|
try:
|
||||||
data = request.get_json() or {}
|
data = request.get_json() or {}
|
||||||
args = data.get("args", "")
|
args = data.get("args", "")
|
||||||
|
|
||||||
@@ -295,6 +578,7 @@ def execute_catch(command):
|
|||||||
"\n".join(f" {cmd}: {desc}" for cmd, desc in COMMANDS.items())
|
"\n".join(f" {cmd}: {desc}" for cmd, desc in COMMANDS.items())
|
||||||
elif command == "about":
|
elif command == "about":
|
||||||
output = "This is a simulated terminal interface created by Nathan Woodburn."
|
output = "This is a simulated terminal interface created by Nathan Woodburn."
|
||||||
|
|
||||||
elif command == "whoami":
|
elif command == "whoami":
|
||||||
output = getClientIP(request)
|
output = getClientIP(request)
|
||||||
elif command == "pwd":
|
elif command == "pwd":
|
||||||
@@ -304,12 +588,23 @@ def execute_catch(command):
|
|||||||
output = f"Command not found: {command}"
|
output = f"Command not found: {command}"
|
||||||
|
|
||||||
return json_response(request, {"output": output}, 200)
|
return json_response(request, {"output": output}, 200)
|
||||||
|
except Exception as e:
|
||||||
|
return json_response(request, {"output": f"{command}: {str(e)}"}, 200)
|
||||||
|
|
||||||
@terminal_bp.route("/terminal/execute/reset", methods=["POST"])
|
@terminal_bp.route("/terminal/execute/reset", methods=["POST"])
|
||||||
def reset():
|
def reset():
|
||||||
|
try:
|
||||||
# Clear the session data related to terminal
|
# Clear the session data related to terminal
|
||||||
session.pop("pwd", None)
|
session.pop("pwd", None)
|
||||||
session.pop("paths", None)
|
session.pop("paths", None)
|
||||||
output = "Terminal session has been reset."
|
output = "Terminal session has been reset."
|
||||||
return json_response(request, {"output": output}, 200)
|
return json_response(request, {"output": output}, 200)
|
||||||
|
except Exception as e:
|
||||||
|
return json_response(request, {"output": f"reset: {str(e)}"}, 200)
|
||||||
|
|
||||||
|
# Add error handler at the end
|
||||||
|
@terminal_bp.errorhandler(Exception)
|
||||||
|
def handle_terminal_error(error):
|
||||||
|
"""Handle all exceptions in terminal blueprint."""
|
||||||
|
error_message = f"Terminal error: {str(error)}"
|
||||||
|
return json_response(request, {"output": error_message}, 500)
|
||||||
@@ -3,7 +3,7 @@
|
|||||||
<head>
|
<head>
|
||||||
<meta charset="UTF-8">
|
<meta charset="UTF-8">
|
||||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||||
<title>Terminal</title>
|
<title>Terminal | Nathan.Woodburn/</title>
|
||||||
<style>
|
<style>
|
||||||
* { margin: 0; padding: 0; box-sizing: border-box; }
|
* { margin: 0; padding: 0; box-sizing: border-box; }
|
||||||
body {
|
body {
|
||||||
|
|||||||
Reference in New Issue
Block a user