import ipaddress import re from typing import Dict, List import requests from config import AppConfig from .base import BaseCollector, CollectionResult class ProxmoxCollector(BaseCollector): source_name = "proxmox" def __init__(self, config: AppConfig): self.config = config def collect(self) -> CollectionResult: if not self.config.proxmox_enabled: return CollectionResult(source=self.source_name, assets=[], status="disabled") if not self.config.proxmox_endpoints: return CollectionResult(source=self.source_name, assets=[], status="skipped", error="No PROXMOX_ENDPOINTS configured") assets: List[Dict] = [] errors: List[str] = [] for endpoint in self.config.proxmox_endpoints: try: assets.extend(self._collect_endpoint(endpoint)) except Exception as exc: errors.append(f"{endpoint}: {exc}") if errors and not assets: return CollectionResult(source=self.source_name, assets=[], status="error", error=" | ".join(errors)) if errors: return CollectionResult(source=self.source_name, assets=assets, status="degraded", error=" | ".join(errors)) return CollectionResult(source=self.source_name, assets=assets, status="ok") def _collect_endpoint(self, endpoint: str) -> List[Dict]: endpoint = endpoint.rstrip("/") headers = {"Accept": "application/json"} cookies = None if self.config.proxmox_token_id and self.config.proxmox_token_secret: headers["Authorization"] = ( f"PVEAPIToken={self.config.proxmox_token_id}={self.config.proxmox_token_secret}" ) elif self.config.proxmox_user and self.config.proxmox_password: token_resp = requests.post( f"{endpoint}/api2/json/access/ticket", data={"username": self.config.proxmox_user, "password": self.config.proxmox_password}, timeout=self.config.request_timeout_seconds, verify=self.config.proxmox_verify_tls, ) token_resp.raise_for_status() payload = token_resp.json().get("data", {}) cookies = {"PVEAuthCookie": payload.get("ticket", "")} csrf = payload.get("CSRFPreventionToken") if csrf: headers["CSRFPreventionToken"] = csrf nodes_resp = requests.get( f"{endpoint}/api2/json/nodes", headers=headers, cookies=cookies, timeout=self.config.request_timeout_seconds, verify=self.config.proxmox_verify_tls, ) nodes_resp.raise_for_status() nodes = nodes_resp.json().get("data", []) assets: List[Dict] = [] for node in nodes: node_name = node.get("node", "unknown-node") qemu_resp = requests.get( f"{endpoint}/api2/json/nodes/{node_name}/qemu", headers=headers, cookies=cookies, timeout=self.config.request_timeout_seconds, verify=self.config.proxmox_verify_tls, ) qemu_resp.raise_for_status() for vm in qemu_resp.json().get("data", []): vm_id = str(vm.get("vmid", "")) vm_ips = self._collect_qemu_ips(endpoint, node_name, vm_id, headers, cookies) assets.append( { "asset_type": "vm", "external_id": str(vm.get("vmid", vm.get("name", "unknown-vm"))), "name": vm.get("name") or f"vm-{vm.get('vmid', 'unknown')}", "hostname": vm.get("name"), "status": vm.get("status", "unknown"), "ip_addresses": vm_ips, "node": node_name, "cpu": vm.get("cpus"), "memory_mb": (vm.get("maxmem", 0) or 0) / (1024 * 1024), "disk_gb": (vm.get("maxdisk", 0) or 0) / (1024 * 1024 * 1024), "metadata": { "endpoint": endpoint, "uptime_seconds": vm.get("uptime"), }, } ) lxc_resp = requests.get( f"{endpoint}/api2/json/nodes/{node_name}/lxc", headers=headers, cookies=cookies, timeout=self.config.request_timeout_seconds, verify=self.config.proxmox_verify_tls, ) lxc_resp.raise_for_status() for lxc in lxc_resp.json().get("data", []): lxc_id = str(lxc.get("vmid", "")) lxc_ips = self._collect_lxc_ips(endpoint, node_name, lxc_id, headers, cookies) assets.append( { "asset_type": "lxc", "external_id": str(lxc.get("vmid", lxc.get("name", "unknown-lxc"))), "name": lxc.get("name") or f"lxc-{lxc.get('vmid', 'unknown')}", "hostname": lxc.get("name"), "status": lxc.get("status", "unknown"), "ip_addresses": lxc_ips, "node": node_name, "cpu": lxc.get("cpus"), "memory_mb": (lxc.get("maxmem", 0) or 0) / (1024 * 1024), "disk_gb": (lxc.get("maxdisk", 0) or 0) / (1024 * 1024 * 1024), "metadata": { "endpoint": endpoint, "uptime_seconds": lxc.get("uptime"), }, } ) return assets def _collect_qemu_ips(self, endpoint: str, node_name: str, vm_id: str, headers: Dict, cookies: Dict | None) -> List[str]: ips: List[str] = [] # Guest agent provides the most accurate runtime IP list when enabled. try: agent_resp = requests.get( f"{endpoint}/api2/json/nodes/{node_name}/qemu/{vm_id}/agent/network-get-interfaces", headers=headers, cookies=cookies, timeout=self.config.request_timeout_seconds, verify=self.config.proxmox_verify_tls, ) if agent_resp.ok: data = agent_resp.json().get("data", {}) interfaces = data.get("result", []) if isinstance(data, dict) else [] for interface in interfaces: for addr in interface.get("ip-addresses", []) or []: value = addr.get("ip-address") if value: ips.append(value) except Exception: pass ips.extend(self._collect_config_ips(endpoint, node_name, "qemu", vm_id, headers, cookies)) return self._normalize_ips(ips) def _collect_lxc_ips(self, endpoint: str, node_name: str, vm_id: str, headers: Dict, cookies: Dict | None) -> List[str]: ips: List[str] = [] # Runtime interfaces capture DHCP-assigned addresses that are not present in static config. try: iface_resp = requests.get( f"{endpoint}/api2/json/nodes/{node_name}/lxc/{vm_id}/interfaces", headers=headers, cookies=cookies, timeout=self.config.request_timeout_seconds, verify=self.config.proxmox_verify_tls, ) if iface_resp.ok: interfaces = iface_resp.json().get("data", []) if isinstance(interfaces, list): for interface in interfaces: inet_values = interface.get("inet") if isinstance(inet_values, list): ips.extend(inet_values) except Exception: pass ips.extend(self._collect_config_ips(endpoint, node_name, "lxc", vm_id, headers, cookies)) return self._normalize_ips(ips) def _collect_config_ips( self, endpoint: str, node_name: str, vm_type: str, vm_id: str, headers: Dict, cookies: Dict | None, ) -> List[str]: try: config_resp = requests.get( f"{endpoint}/api2/json/nodes/{node_name}/{vm_type}/{vm_id}/config", headers=headers, cookies=cookies, timeout=self.config.request_timeout_seconds, verify=self.config.proxmox_verify_tls, ) if not config_resp.ok: return [] config = config_resp.json().get("data", {}) except Exception: return [] values = [] for key, value in config.items(): if not isinstance(value, str): continue if key.startswith("net") or key in {"ipconfig0", "ipconfig1", "ipconfig2", "ipconfig3"}: values.append(value) ips: List[str] = [] for value in values: ips.extend(re.findall(r"\b(?:\d{1,3}\.){3}\d{1,3}(?:/\d{1,2})?\b", value)) return ips @staticmethod def _normalize_ips(values: List[str]) -> List[str]: normalized: List[str] = [] seen = set() for value in values: candidate = value.strip() if "/" in candidate: candidate = candidate.split("/", 1)[0] try: ip_obj = ipaddress.ip_address(candidate) except ValueError: continue if ip_obj.is_loopback: continue text = str(ip_obj) if text not in seen: seen.add(text) normalized.append(text) return normalized