Files
inventory/collectors/proxmox.py
Nathan Woodburn 0ce79935d7
All checks were successful
Check Code Quality / RuffCheck (push) Successful in 1m4s
Build Docker / BuildImage (push) Successful in 1m26s
feat: Initial code
2026-03-26 23:07:05 +11:00

244 lines
9.7 KiB
Python

import ipaddress
import re
from typing import Dict, List
import requests
from config import AppConfig
from .base import BaseCollector, CollectionResult
class ProxmoxCollector(BaseCollector):
source_name = "proxmox"
def __init__(self, config: AppConfig):
self.config = config
def collect(self) -> CollectionResult:
if not self.config.proxmox_enabled:
return CollectionResult(source=self.source_name, assets=[], status="disabled")
if not self.config.proxmox_endpoints:
return CollectionResult(source=self.source_name, assets=[], status="skipped", error="No PROXMOX_ENDPOINTS configured")
assets: List[Dict] = []
errors: List[str] = []
for endpoint in self.config.proxmox_endpoints:
try:
assets.extend(self._collect_endpoint(endpoint))
except Exception as exc:
errors.append(f"{endpoint}: {exc}")
if errors and not assets:
return CollectionResult(source=self.source_name, assets=[], status="error", error=" | ".join(errors))
if errors:
return CollectionResult(source=self.source_name, assets=assets, status="degraded", error=" | ".join(errors))
return CollectionResult(source=self.source_name, assets=assets, status="ok")
def _collect_endpoint(self, endpoint: str) -> List[Dict]:
endpoint = endpoint.rstrip("/")
headers = {"Accept": "application/json"}
cookies = None
if self.config.proxmox_token_id and self.config.proxmox_token_secret:
headers["Authorization"] = (
f"PVEAPIToken={self.config.proxmox_token_id}={self.config.proxmox_token_secret}"
)
elif self.config.proxmox_user and self.config.proxmox_password:
token_resp = requests.post(
f"{endpoint}/api2/json/access/ticket",
data={"username": self.config.proxmox_user, "password": self.config.proxmox_password},
timeout=self.config.request_timeout_seconds,
verify=self.config.proxmox_verify_tls,
)
token_resp.raise_for_status()
payload = token_resp.json().get("data", {})
cookies = {"PVEAuthCookie": payload.get("ticket", "")}
csrf = payload.get("CSRFPreventionToken")
if csrf:
headers["CSRFPreventionToken"] = csrf
nodes_resp = requests.get(
f"{endpoint}/api2/json/nodes",
headers=headers,
cookies=cookies,
timeout=self.config.request_timeout_seconds,
verify=self.config.proxmox_verify_tls,
)
nodes_resp.raise_for_status()
nodes = nodes_resp.json().get("data", [])
assets: List[Dict] = []
for node in nodes:
node_name = node.get("node", "unknown-node")
qemu_resp = requests.get(
f"{endpoint}/api2/json/nodes/{node_name}/qemu",
headers=headers,
cookies=cookies,
timeout=self.config.request_timeout_seconds,
verify=self.config.proxmox_verify_tls,
)
qemu_resp.raise_for_status()
for vm in qemu_resp.json().get("data", []):
vm_id = str(vm.get("vmid", ""))
vm_ips = self._collect_qemu_ips(endpoint, node_name, vm_id, headers, cookies)
assets.append(
{
"asset_type": "vm",
"external_id": str(vm.get("vmid", vm.get("name", "unknown-vm"))),
"name": vm.get("name") or f"vm-{vm.get('vmid', 'unknown')}",
"hostname": vm.get("name"),
"status": vm.get("status", "unknown"),
"ip_addresses": vm_ips,
"node": node_name,
"cpu": vm.get("cpus"),
"memory_mb": (vm.get("maxmem", 0) or 0) / (1024 * 1024),
"disk_gb": (vm.get("maxdisk", 0) or 0) / (1024 * 1024 * 1024),
"metadata": {
"endpoint": endpoint,
"uptime_seconds": vm.get("uptime"),
},
}
)
lxc_resp = requests.get(
f"{endpoint}/api2/json/nodes/{node_name}/lxc",
headers=headers,
cookies=cookies,
timeout=self.config.request_timeout_seconds,
verify=self.config.proxmox_verify_tls,
)
lxc_resp.raise_for_status()
for lxc in lxc_resp.json().get("data", []):
lxc_id = str(lxc.get("vmid", ""))
lxc_ips = self._collect_lxc_ips(endpoint, node_name, lxc_id, headers, cookies)
assets.append(
{
"asset_type": "lxc",
"external_id": str(lxc.get("vmid", lxc.get("name", "unknown-lxc"))),
"name": lxc.get("name") or f"lxc-{lxc.get('vmid', 'unknown')}",
"hostname": lxc.get("name"),
"status": lxc.get("status", "unknown"),
"ip_addresses": lxc_ips,
"node": node_name,
"cpu": lxc.get("cpus"),
"memory_mb": (lxc.get("maxmem", 0) or 0) / (1024 * 1024),
"disk_gb": (lxc.get("maxdisk", 0) or 0) / (1024 * 1024 * 1024),
"metadata": {
"endpoint": endpoint,
"uptime_seconds": lxc.get("uptime"),
},
}
)
return assets
def _collect_qemu_ips(self, endpoint: str, node_name: str, vm_id: str, headers: Dict, cookies: Dict | None) -> List[str]:
ips: List[str] = []
# Guest agent provides the most accurate runtime IP list when enabled.
try:
agent_resp = requests.get(
f"{endpoint}/api2/json/nodes/{node_name}/qemu/{vm_id}/agent/network-get-interfaces",
headers=headers,
cookies=cookies,
timeout=self.config.request_timeout_seconds,
verify=self.config.proxmox_verify_tls,
)
if agent_resp.ok:
data = agent_resp.json().get("data", {})
interfaces = data.get("result", []) if isinstance(data, dict) else []
for interface in interfaces:
for addr in interface.get("ip-addresses", []) or []:
value = addr.get("ip-address")
if value:
ips.append(value)
except Exception:
pass
ips.extend(self._collect_config_ips(endpoint, node_name, "qemu", vm_id, headers, cookies))
return self._normalize_ips(ips)
def _collect_lxc_ips(self, endpoint: str, node_name: str, vm_id: str, headers: Dict, cookies: Dict | None) -> List[str]:
ips: List[str] = []
# Runtime interfaces capture DHCP-assigned addresses that are not present in static config.
try:
iface_resp = requests.get(
f"{endpoint}/api2/json/nodes/{node_name}/lxc/{vm_id}/interfaces",
headers=headers,
cookies=cookies,
timeout=self.config.request_timeout_seconds,
verify=self.config.proxmox_verify_tls,
)
if iface_resp.ok:
interfaces = iface_resp.json().get("data", [])
if isinstance(interfaces, list):
for interface in interfaces:
inet_values = interface.get("inet")
if isinstance(inet_values, list):
ips.extend(inet_values)
except Exception:
pass
ips.extend(self._collect_config_ips(endpoint, node_name, "lxc", vm_id, headers, cookies))
return self._normalize_ips(ips)
def _collect_config_ips(
self,
endpoint: str,
node_name: str,
vm_type: str,
vm_id: str,
headers: Dict,
cookies: Dict | None,
) -> List[str]:
try:
config_resp = requests.get(
f"{endpoint}/api2/json/nodes/{node_name}/{vm_type}/{vm_id}/config",
headers=headers,
cookies=cookies,
timeout=self.config.request_timeout_seconds,
verify=self.config.proxmox_verify_tls,
)
if not config_resp.ok:
return []
config = config_resp.json().get("data", {})
except Exception:
return []
values = []
for key, value in config.items():
if not isinstance(value, str):
continue
if key.startswith("net") or key in {"ipconfig0", "ipconfig1", "ipconfig2", "ipconfig3"}:
values.append(value)
ips: List[str] = []
for value in values:
ips.extend(re.findall(r"\b(?:\d{1,3}\.){3}\d{1,3}(?:/\d{1,2})?\b", value))
return ips
@staticmethod
def _normalize_ips(values: List[str]) -> List[str]:
normalized: List[str] = []
seen = set()
for value in values:
candidate = value.strip()
if "/" in candidate:
candidate = candidate.split("/", 1)[0]
try:
ip_obj = ipaddress.ip_address(candidate)
except ValueError:
continue
if ip_obj.is_loopback:
continue
text = str(ip_obj)
if text not in seen:
seen.add(text)
normalized.append(text)
return normalized