import logging import os from urllib.parse import urlparse from datetime import datetime import dotenv import requests from flask import Flask, Request, jsonify, make_response, render_template, request, send_file, send_from_directory from collectors import InventoryCollectorOrchestrator from config import load_config from database import InventoryStore from services import CollectionScheduler, should_autostart_scheduler dotenv.load_dotenv() logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"), format="%(asctime)s %(levelname)s %(name)s: %(message)s") LOGGER = logging.getLogger(__name__) app = Flask(__name__) config = load_config() store = InventoryStore(config.database_path) store.init() orchestrator = InventoryCollectorOrchestrator(config, store) scheduler = CollectionScheduler(config, orchestrator) if should_autostart_scheduler(): scheduler.start() if os.getenv("INITIAL_COLLECT_ON_STARTUP", "true").strip().lower() in {"1", "true", "yes", "on"}: LOGGER.info("Running initial inventory collection") orchestrator.collect_once() def find(name, path): for root, dirs, files in os.walk(path): if name in files: return os.path.join(root, name) # Assets routes @app.route("/assets/") def send_assets(path): if path.endswith(".json"): return send_from_directory( "templates/assets", path, mimetype="application/json" ) if os.path.isfile("templates/assets/" + path): return send_from_directory("templates/assets", path) # Try looking in one of the directories filename: str = path.split("/")[-1] if ( filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".svg") ): if os.path.isfile("templates/assets/img/" + filename): return send_from_directory("templates/assets/img", filename) if os.path.isfile("templates/assets/img/favicon/" + filename): return send_from_directory("templates/assets/img/favicon", filename) return render_template("404.html"), 404 # region Special routes @app.route("/favicon.png") def faviconPNG(): return send_from_directory("templates/assets/img", "favicon.png") @app.route("/.well-known/") def wellknown(path): # Try to proxy to https://nathan.woodburn.au/.well-known/ req = requests.get(f"https://nathan.woodburn.au/.well-known/{path}") return make_response( req.content, 200, {"Content-Type": req.headers["Content-Type"]} ) # endregion # region Main routes @app.route("/") def index(): current_datetime = datetime.now().strftime("%d %b %Y %I:%M %p") return render_template("index.html", datetime=current_datetime, app_name=config.app_name) @app.route("/") def catch_all(path: str): if os.path.isfile("templates/" + path): return render_template(path) # Try with .html if os.path.isfile("templates/" + path + ".html"): return render_template(path + ".html") if os.path.isfile("templates/" + path.strip("/") + ".html"): return render_template(path.strip("/") + ".html") # Try to find a file matching if path.count("/") < 1: # Try to find a file matching filename = find(path, "templates") if filename: return send_file(filename) return render_template("404.html"), 404 # endregion # region API routes def _authorized(req: Request) -> bool: if not config.admin_token: return True return req.headers.get("X-Admin-Token", "") == config.admin_token @app.route("/api/v1/summary", methods=["GET"]) def api_summary(): payload = { "app_name": config.app_name, "summary": store.summary(), "last_run": store.last_run(), "timestamp": datetime.now().isoformat(), } return jsonify(payload) @app.route("/api/v1/topology", methods=["GET"]) def api_topology(): return jsonify(store.topology()) @app.route("/api/v1/assets", methods=["GET"]) def api_assets(): assets = store.list_assets() assets = _link_assets_to_proxmox(assets) source = request.args.get("source") status = request.args.get("status") search = (request.args.get("search") or "").strip().lower() filtered = [] for asset in assets: if source and asset.get("source") != source: continue if status and (asset.get("status") or "") != status: continue if search: haystack = " ".join( [ asset.get("name") or "", asset.get("hostname") or "", asset.get("asset_type") or "", asset.get("source") or "", asset.get("subnet") or "", asset.get("public_ip") or "", ] ).lower() if search not in haystack: continue filtered.append(asset) return jsonify({"count": len(filtered), "assets": filtered}) def _extract_target_hosts(asset: dict) -> list[str]: metadata = asset.get("metadata") or {} raw_values: list[str] = [] for field in ("proxy_pass_resolved", "inferred_targets", "proxy_pass", "upstream_servers"): values = metadata.get(field) or [] if isinstance(values, list): raw_values.extend([str(value) for value in values]) if asset.get("hostname"): raw_values.append(str(asset.get("hostname"))) hosts: list[str] = [] for raw in raw_values: parts = [part.strip() for part in raw.split(",") if part.strip()] for part in parts: parsed_host = "" if part.startswith("http://") or part.startswith("https://"): parsed_host = urlparse(part).hostname or "" else: candidate = part.split("/", 1)[0] if ":" in candidate: candidate = candidate.split(":", 1)[0] parsed_host = candidate.strip() if parsed_host: hosts.append(parsed_host.lower()) return hosts def _link_assets_to_proxmox(assets: list[dict]) -> list[dict]: proxmox_assets = [ asset for asset in assets if asset.get("source") == "proxmox" and asset.get("asset_type") in {"vm", "lxc"} ] by_ip: dict[str, list[dict]] = {} by_name: dict[str, list[dict]] = {} for asset in proxmox_assets: for ip in asset.get("ip_addresses") or []: by_ip.setdefault(str(ip).lower(), []).append(asset) for key in (asset.get("name"), asset.get("hostname")): if key: by_name.setdefault(str(key).lower(), []).append(asset) for asset in assets: if asset.get("source") == "proxmox": continue hosts = _extract_target_hosts(asset) linked = [] seen = set() for host in hosts: matches = by_ip.get(host, []) + by_name.get(host, []) for match in matches: match_key = f"{match.get('source')}:{match.get('asset_type')}:{match.get('external_id')}" if match_key in seen: continue seen.add(match_key) linked.append( { "source": match.get("source"), "asset_type": match.get("asset_type"), "external_id": match.get("external_id"), "name": match.get("name"), "hostname": match.get("hostname"), "ip_addresses": match.get("ip_addresses") or [], } ) metadata = dict(asset.get("metadata") or {}) metadata["linked_proxmox_assets"] = linked asset["metadata"] = metadata return assets @app.route("/api/v1/sources", methods=["GET"]) def api_sources(): return jsonify({"sources": store.source_health()}) @app.route("/api/v1/nginx/routes", methods=["GET"]) def api_nginx_routes(): assets = store.list_assets() nginx_assets = [ asset for asset in assets if asset.get("source") == "nginx" and asset.get("asset_type") == "nginx_site" ] routes = [] for asset in nginx_assets: metadata = asset.get("metadata") or {} inferred_targets = metadata.get("inferred_targets") or [] proxy_targets_resolved = metadata.get("proxy_pass_resolved") or [] proxy_targets = metadata.get("proxy_pass") or [] upstreams = metadata.get("upstreams") or [] upstream_servers = metadata.get("upstream_servers") or [] listens = metadata.get("listens") or [] route_targets = ( proxy_targets_resolved if proxy_targets_resolved else ( inferred_targets if inferred_targets else (proxy_targets if proxy_targets else (upstream_servers if upstream_servers else upstreams)) ) ) routes.append( { "server_name": asset.get("name") or asset.get("hostname") or "unknown", "target": ", ".join(route_targets) if route_targets else "-", "listen": ", ".join(listens) if listens else "-", "source_host": asset.get("node") or "-", "config_path": metadata.get("path") or "-", "status": asset.get("status") or "unknown", } ) routes.sort(key=lambda item: (item["server_name"], item["source_host"])) return jsonify({"count": len(routes), "routes": routes}) @app.route("/api/v1/health", methods=["GET"]) def api_health(): last_run = store.last_run() healthy = bool(last_run) and last_run.get("status") in {"ok", "running"} return jsonify( { "healthy": healthy, "last_run": last_run, "scheduler_enabled": config.scheduler_enabled, "poll_interval_seconds": config.poll_interval_seconds, } ) @app.route("/api/v1/collect/trigger", methods=["POST"]) def api_collect_trigger(): if not _authorized(request): return jsonify({"error": "Unauthorized"}), 401 report = orchestrator.collect_once() status_code = 200 if report.status == "running": status_code = 409 elif report.status == "error": status_code = 500 return ( jsonify( { "run_id": report.run_id, "status": report.status, "results": [ { "source": result.source, "status": result.status, "asset_count": len(result.assets), "error": result.error, } for result in report.results ], } ), status_code, ) @app.route("/api/v1/data", methods=["GET"]) def api_data(): payload = orchestrator.current_data() payload["header"] = config.app_name payload["content"] = "Inventory snapshot" payload["timestamp"] = datetime.now().isoformat() return jsonify(payload) # endregion # region Error Catching # 404 catch all @app.errorhandler(404) def not_found(e): return render_template("404.html"), 404 # endregion if __name__ == "__main__": app.run(debug=True, port=5000, host="127.0.0.1")