generated from nathanwoodburn/python-webserver-template
291 lines
9.2 KiB
Python
291 lines
9.2 KiB
Python
import importlib
|
|
import os
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Set
|
|
|
|
from flask import Flask, Request, jsonify, request
|
|
|
|
app = Flask(__name__)
|
|
|
|
|
|
def _authorized(req: Request) -> bool:
|
|
token = os.getenv("AGENT_TOKEN", "").strip()
|
|
if not token:
|
|
return True
|
|
header = req.headers.get("Authorization", "")
|
|
expected = f"Bearer {token}"
|
|
return header == expected
|
|
|
|
|
|
def _docker_client():
|
|
docker_sdk = importlib.import_module("docker")
|
|
docker_host = os.getenv("DOCKER_HOST", "unix:///var/run/docker.sock")
|
|
return docker_sdk.DockerClient(base_url=docker_host)
|
|
|
|
|
|
def _container_to_json(container: Any) -> Dict[str, Any]:
|
|
attrs = container.attrs or {}
|
|
network_settings = attrs.get("NetworkSettings", {})
|
|
networks = network_settings.get("Networks", {}) or {}
|
|
|
|
ip_addresses: List[str] = []
|
|
for network in networks.values():
|
|
ip = network.get("IPAddress")
|
|
if ip:
|
|
ip_addresses.append(ip)
|
|
|
|
ports = network_settings.get("Ports", {}) or {}
|
|
labels = attrs.get("Config", {}).get("Labels", {}) or {}
|
|
state = attrs.get("State", {}).get("Status", "unknown")
|
|
|
|
image_obj = container.image
|
|
image_name = "unknown"
|
|
if image_obj is not None:
|
|
image_tags = image_obj.tags or []
|
|
image_name = image_tags[0] if image_tags else image_obj.id
|
|
|
|
return {
|
|
"id": container.id,
|
|
"name": container.name,
|
|
"state": state,
|
|
"image": image_name,
|
|
"ports": ports,
|
|
"networks": list(networks.keys()),
|
|
"ip_addresses": ip_addresses,
|
|
"labels": labels,
|
|
}
|
|
|
|
|
|
def _nginx_root() -> Path:
|
|
return Path(os.getenv("NGINX_CONFIG_DIR", "/mnt/nginx"))
|
|
|
|
|
|
def _parse_nginx_file(content: str) -> Dict[str, Any]:
|
|
server_names = re.findall(r"server_name\s+([^;]+);", content)
|
|
listens = re.findall(r"listen\s+([^;]+);", content)
|
|
proxy_pass_targets = re.findall(r"proxy_pass\s+([^;]+);", content)
|
|
upstream_blocks: List[str] = []
|
|
includes = re.findall(r"include\s+([^;]+);", content)
|
|
set_matches = re.findall(r"set\s+\$([A-Za-z0-9_]+)\s+([^;]+);", content)
|
|
upstream_servers: List[str] = []
|
|
|
|
for match in re.finditer(r"\bupstream\b\s+([^\s{]+)\s*\{(.*?)\}", content, flags=re.DOTALL):
|
|
upstream_blocks.append(match.group(1).strip())
|
|
block_body = match.group(2)
|
|
for upstream_server in re.findall(r"server\s+([^;]+);", block_body):
|
|
upstream_servers.append(upstream_server.strip())
|
|
|
|
set_variables: Dict[str, str] = {}
|
|
for var_name, value in set_matches:
|
|
candidate = value.strip().strip("\"'")
|
|
set_variables[var_name] = candidate
|
|
if "http://" in candidate or "https://" in candidate or ":" in candidate:
|
|
proxy_pass_targets.append(candidate)
|
|
|
|
split_values = []
|
|
for value in server_names:
|
|
split_values.extend([name.strip() for name in value.split() if name.strip()])
|
|
|
|
inferred_targets: List[str] = []
|
|
forward_host = set_variables.get("forward_host", "").strip()
|
|
forward_port = set_variables.get("forward_port", "").strip()
|
|
forward_scheme = set_variables.get("forward_scheme", "http").strip() or "http"
|
|
|
|
if forward_host:
|
|
if forward_port:
|
|
inferred_targets.append(f"{forward_scheme}://{forward_host}:{forward_port}")
|
|
else:
|
|
inferred_targets.append(f"{forward_scheme}://{forward_host}")
|
|
|
|
server_var = set_variables.get("server", "").strip()
|
|
port_var = set_variables.get("port", "").strip()
|
|
if server_var and server_var.startswith("$") is False:
|
|
if port_var and port_var.startswith("$") is False:
|
|
inferred_targets.append(f"{forward_scheme}://{server_var}:{port_var}")
|
|
else:
|
|
inferred_targets.append(f"{forward_scheme}://{server_var}")
|
|
|
|
return {
|
|
"server_names": sorted(set(split_values)),
|
|
"listens": sorted(set([value.strip() for value in listens if value.strip()])),
|
|
"proxy_pass": sorted(set([value.strip() for value in proxy_pass_targets if value.strip()])),
|
|
"upstreams": sorted(set([value.strip() for value in upstream_blocks if value.strip()])),
|
|
"upstream_servers": sorted(set([value for value in upstream_servers if value])),
|
|
"inferred_targets": sorted(set([value for value in inferred_targets if value])),
|
|
"includes": sorted(set([value.strip() for value in includes if value.strip()])),
|
|
"set_variables": set_variables,
|
|
}
|
|
|
|
|
|
def _expand_globbed_includes(root: Path, include_value: str) -> List[Path]:
|
|
candidate = include_value.strip().strip("\"'")
|
|
if not candidate:
|
|
return []
|
|
|
|
if candidate.startswith("/"):
|
|
include_path = root.joinpath(candidate.lstrip("/"))
|
|
else:
|
|
include_path = root.joinpath(candidate)
|
|
|
|
matches = [path for path in root.glob(str(include_path.relative_to(root))) if path.is_file()]
|
|
return sorted(set(matches))
|
|
|
|
|
|
def _resolve_value(raw: str, variables: Dict[str, str]) -> str:
|
|
value = raw.strip().strip("\"'")
|
|
|
|
for _ in range(5):
|
|
replaced = False
|
|
for var_name, var_value in variables.items():
|
|
token = f"${var_name}"
|
|
if token in value:
|
|
value = value.replace(token, var_value)
|
|
replaced = True
|
|
if not replaced:
|
|
break
|
|
|
|
return value
|
|
|
|
|
|
def _collect_proxy_targets(
|
|
file_path: Path,
|
|
parsed_map: Dict[Path, Dict[str, Any]],
|
|
root: Path,
|
|
inherited_variables: Dict[str, str],
|
|
visited: Set[Path],
|
|
) -> List[str]:
|
|
if file_path in visited:
|
|
return []
|
|
visited.add(file_path)
|
|
|
|
parsed = parsed_map.get(file_path)
|
|
if not parsed:
|
|
return []
|
|
|
|
variables = dict(inherited_variables)
|
|
variables.update(parsed.get("set_variables", {}))
|
|
|
|
targets: List[str] = []
|
|
for proxy_value in parsed.get("proxy_pass", []):
|
|
resolved = _resolve_value(proxy_value, variables)
|
|
if resolved:
|
|
targets.append(resolved)
|
|
|
|
for include_value in parsed.get("includes", []):
|
|
for include_file in _expand_globbed_includes(root, include_value):
|
|
targets.extend(_collect_proxy_targets(include_file, parsed_map, root, variables, visited))
|
|
|
|
return targets
|
|
|
|
|
|
def _scan_nginx_configs() -> List[Dict[str, Any]]:
|
|
root = _nginx_root()
|
|
if not root.exists() or not root.is_dir():
|
|
return []
|
|
|
|
records: List[Dict[str, Any]] = []
|
|
discovered_files: Set[Path] = set()
|
|
|
|
for pattern in ("*.conf", "*.vhost", "*.inc"):
|
|
for config_file in root.rglob(pattern):
|
|
discovered_files.add(config_file)
|
|
|
|
for config_file in root.rglob("*"):
|
|
if not config_file.is_file():
|
|
continue
|
|
if config_file.suffix:
|
|
continue
|
|
discovered_files.add(config_file)
|
|
|
|
parsed_map: Dict[Path, Dict[str, Any]] = {}
|
|
|
|
for config_file in sorted(discovered_files):
|
|
try:
|
|
content = config_file.read_text(encoding="utf-8", errors="ignore")
|
|
except OSError:
|
|
continue
|
|
|
|
parsed = _parse_nginx_file(content)
|
|
parsed_map[config_file] = parsed
|
|
|
|
for config_file in sorted(parsed_map.keys()):
|
|
parsed = parsed_map[config_file]
|
|
resolved_targets = sorted(
|
|
set(
|
|
[
|
|
target
|
|
for target in _collect_proxy_targets(
|
|
config_file,
|
|
parsed_map,
|
|
root,
|
|
parsed.get("set_variables", {}),
|
|
set(),
|
|
)
|
|
if target
|
|
]
|
|
)
|
|
)
|
|
|
|
records.append(
|
|
{
|
|
"path": str(config_file.relative_to(root)),
|
|
"server_names": parsed["server_names"],
|
|
"listens": parsed["listens"],
|
|
"proxy_pass": parsed["proxy_pass"],
|
|
"proxy_pass_resolved": resolved_targets,
|
|
"upstreams": parsed["upstreams"],
|
|
"upstream_servers": parsed["upstream_servers"],
|
|
"inferred_targets": parsed["inferred_targets"],
|
|
}
|
|
)
|
|
|
|
return records
|
|
|
|
|
|
@app.route("/health", methods=["GET"])
|
|
def health() -> Any:
|
|
return jsonify(
|
|
{
|
|
"ok": True,
|
|
"service": "docker-inventory-agent",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
)
|
|
|
|
|
|
@app.route("/api/v1/containers", methods=["GET"])
|
|
def containers() -> Any:
|
|
if not _authorized(request):
|
|
return jsonify({"error": "Unauthorized"}), 401
|
|
|
|
client = _docker_client()
|
|
try:
|
|
data = [_container_to_json(container) for container in client.containers.list(all=True)]
|
|
finally:
|
|
client.close()
|
|
|
|
return jsonify({"containers": data})
|
|
|
|
|
|
@app.route("/api/v1/nginx-configs", methods=["GET"])
|
|
def nginx_configs() -> Any:
|
|
if not _authorized(request):
|
|
return jsonify({"error": "Unauthorized"}), 401
|
|
|
|
root = _nginx_root()
|
|
configs = _scan_nginx_configs()
|
|
return jsonify(
|
|
{
|
|
"config_root": str(root),
|
|
"exists": root.exists() and root.is_dir(),
|
|
"count": len(configs),
|
|
"configs": configs,
|
|
}
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=int(os.getenv("PORT", "9090")))
|