import threading from dataclasses import dataclass from typing import Dict, List from config import AppConfig from database import InventoryStore from .base import CollectionResult from .coolify import CoolifyCollector from .docker_hosts import DockerHostsCollector from .nginx_from_agent import NginxFromAgentCollector from .proxmox import ProxmoxCollector @dataclass class RunReport: run_id: int status: str results: List[CollectionResult] class InventoryCollectorOrchestrator: def __init__(self, config: AppConfig, store: InventoryStore): self.config = config self.store = store self._run_lock = threading.Lock() self.collectors = [ ProxmoxCollector(config), DockerHostsCollector(config), CoolifyCollector(config), NginxFromAgentCollector(config), ] self.store.seed_sources( { "proxmox": config.proxmox_enabled, "docker": config.docker_enabled, "coolify": config.coolify_enabled, "nginx": bool(config.docker_agent_endpoints), } ) def collect_once(self) -> RunReport: if not self._run_lock.acquire(blocking=False): return RunReport(run_id=-1, status="running", results=[]) try: run_id = self.store.run_start() results: List[CollectionResult] = [] errors: List[str] = [] for collector in self.collectors: result = collector.collect() results.append(result) self.store.set_source_status(result.source, result.status, result.error) if result.assets: self.store.upsert_assets(result.source, result.assets) if result.status == "error": errors.append(f"{result.source}: {result.error}") overall_status = "error" if errors else "ok" self.store.run_finish(run_id=run_id, status=overall_status, error_summary=" | ".join(errors)) return RunReport(run_id=run_id, status=overall_status, results=results) finally: self._run_lock.release() def current_data(self) -> Dict: return { "summary": self.store.summary(), "topology": self.store.topology(), "assets": self.store.list_assets(), "sources": self.store.source_health(), "last_run": self.store.last_run(), }