import importlib import os import re from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Set from flask import Flask, Request, jsonify, request app = Flask(__name__) def _authorized(req: Request) -> bool: token = os.getenv("AGENT_TOKEN", "").strip() if not token: return True header = req.headers.get("Authorization", "") expected = f"Bearer {token}" return header == expected def _docker_client(): docker_sdk = importlib.import_module("docker") docker_host = os.getenv("DOCKER_HOST", "unix:///var/run/docker.sock") return docker_sdk.DockerClient(base_url=docker_host) def _container_to_json(container: Any) -> Dict[str, Any]: attrs = container.attrs or {} network_settings = attrs.get("NetworkSettings", {}) networks = network_settings.get("Networks", {}) or {} ip_addresses: List[str] = [] for network in networks.values(): ip = network.get("IPAddress") if ip: ip_addresses.append(ip) ports = network_settings.get("Ports", {}) or {} labels = attrs.get("Config", {}).get("Labels", {}) or {} state = attrs.get("State", {}).get("Status", "unknown") image_obj = container.image image_name = "unknown" if image_obj is not None: image_tags = image_obj.tags or [] image_name = image_tags[0] if image_tags else image_obj.id return { "id": container.id, "name": container.name, "state": state, "image": image_name, "ports": ports, "networks": list(networks.keys()), "ip_addresses": ip_addresses, "labels": labels, } def _nginx_root() -> Path: return Path(os.getenv("NGINX_CONFIG_DIR", "/mnt/nginx")) def _parse_nginx_file(content: str) -> Dict[str, Any]: server_names = re.findall(r"server_name\s+([^;]+);", content) listens = re.findall(r"listen\s+([^;]+);", content) proxy_pass_targets = re.findall(r"proxy_pass\s+([^;]+);", content) upstream_blocks: List[str] = [] includes = re.findall(r"include\s+([^;]+);", content) set_matches = re.findall(r"set\s+\$([A-Za-z0-9_]+)\s+([^;]+);", content) upstream_servers: List[str] = [] for match in re.finditer(r"\bupstream\b\s+([^\s{]+)\s*\{(.*?)\}", content, flags=re.DOTALL): upstream_blocks.append(match.group(1).strip()) block_body = match.group(2) for upstream_server in re.findall(r"server\s+([^;]+);", block_body): upstream_servers.append(upstream_server.strip()) set_variables: Dict[str, str] = {} for var_name, value in set_matches: candidate = value.strip().strip("\"'") set_variables[var_name] = candidate if "http://" in candidate or "https://" in candidate or ":" in candidate: proxy_pass_targets.append(candidate) split_values = [] for value in server_names: split_values.extend([name.strip() for name in value.split() if name.strip()]) inferred_targets: List[str] = [] forward_host = set_variables.get("forward_host", "").strip() forward_port = set_variables.get("forward_port", "").strip() forward_scheme = set_variables.get("forward_scheme", "http").strip() or "http" if forward_host: if forward_port: inferred_targets.append(f"{forward_scheme}://{forward_host}:{forward_port}") else: inferred_targets.append(f"{forward_scheme}://{forward_host}") server_var = set_variables.get("server", "").strip() port_var = set_variables.get("port", "").strip() if server_var and server_var.startswith("$") is False: if port_var and port_var.startswith("$") is False: inferred_targets.append(f"{forward_scheme}://{server_var}:{port_var}") else: inferred_targets.append(f"{forward_scheme}://{server_var}") return { "server_names": sorted(set(split_values)), "listens": sorted(set([value.strip() for value in listens if value.strip()])), "proxy_pass": sorted(set([value.strip() for value in proxy_pass_targets if value.strip()])), "upstreams": sorted(set([value.strip() for value in upstream_blocks if value.strip()])), "upstream_servers": sorted(set([value for value in upstream_servers if value])), "inferred_targets": sorted(set([value for value in inferred_targets if value])), "includes": sorted(set([value.strip() for value in includes if value.strip()])), "set_variables": set_variables, } def _expand_globbed_includes(root: Path, include_value: str) -> List[Path]: candidate = include_value.strip().strip("\"'") if not candidate: return [] if candidate.startswith("/"): include_path = root.joinpath(candidate.lstrip("/")) else: include_path = root.joinpath(candidate) matches = [path for path in root.glob(str(include_path.relative_to(root))) if path.is_file()] return sorted(set(matches)) def _resolve_value(raw: str, variables: Dict[str, str]) -> str: value = raw.strip().strip("\"'") for _ in range(5): replaced = False for var_name, var_value in variables.items(): token = f"${var_name}" if token in value: value = value.replace(token, var_value) replaced = True if not replaced: break return value def _collect_proxy_targets( file_path: Path, parsed_map: Dict[Path, Dict[str, Any]], root: Path, inherited_variables: Dict[str, str], visited: Set[Path], ) -> List[str]: if file_path in visited: return [] visited.add(file_path) parsed = parsed_map.get(file_path) if not parsed: return [] variables = dict(inherited_variables) variables.update(parsed.get("set_variables", {})) targets: List[str] = [] for proxy_value in parsed.get("proxy_pass", []): resolved = _resolve_value(proxy_value, variables) if resolved: targets.append(resolved) for include_value in parsed.get("includes", []): for include_file in _expand_globbed_includes(root, include_value): targets.extend(_collect_proxy_targets(include_file, parsed_map, root, variables, visited)) return targets def _scan_nginx_configs() -> List[Dict[str, Any]]: root = _nginx_root() if not root.exists() or not root.is_dir(): return [] records: List[Dict[str, Any]] = [] discovered_files: Set[Path] = set() for pattern in ("*.conf", "*.vhost", "*.inc"): for config_file in root.rglob(pattern): discovered_files.add(config_file) for config_file in root.rglob("*"): if not config_file.is_file(): continue if config_file.suffix: continue discovered_files.add(config_file) parsed_map: Dict[Path, Dict[str, Any]] = {} for config_file in sorted(discovered_files): try: content = config_file.read_text(encoding="utf-8", errors="ignore") except OSError: continue parsed = _parse_nginx_file(content) parsed_map[config_file] = parsed for config_file in sorted(parsed_map.keys()): parsed = parsed_map[config_file] resolved_targets = sorted( set( [ target for target in _collect_proxy_targets( config_file, parsed_map, root, parsed.get("set_variables", {}), set(), ) if target ] ) ) records.append( { "path": str(config_file.relative_to(root)), "server_names": parsed["server_names"], "listens": parsed["listens"], "proxy_pass": parsed["proxy_pass"], "proxy_pass_resolved": resolved_targets, "upstreams": parsed["upstreams"], "upstream_servers": parsed["upstream_servers"], "inferred_targets": parsed["inferred_targets"], } ) return records @app.route("/health", methods=["GET"]) def health() -> Any: return jsonify( { "ok": True, "service": "docker-inventory-agent", "timestamp": datetime.now(timezone.utc).isoformat(), } ) @app.route("/api/v1/containers", methods=["GET"]) def containers() -> Any: if not _authorized(request): return jsonify({"error": "Unauthorized"}), 401 client = _docker_client() try: data = [_container_to_json(container) for container in client.containers.list(all=True)] finally: client.close() return jsonify({"containers": data}) @app.route("/api/v1/nginx-configs", methods=["GET"]) def nginx_configs() -> Any: if not _authorized(request): return jsonify({"error": "Unauthorized"}), 401 root = _nginx_root() configs = _scan_nginx_configs() return jsonify( { "config_root": str(root), "exists": root.exists() and root.is_dir(), "count": len(configs), "configs": configs, } ) if __name__ == "__main__": app.run(host="0.0.0.0", port=int(os.getenv("PORT", "9090")))