generated from nathanwoodburn/python-webserver-template
244 lines
9.7 KiB
Python
244 lines
9.7 KiB
Python
import ipaddress
|
|
import re
|
|
from typing import Dict, List
|
|
|
|
import requests
|
|
|
|
from config import AppConfig
|
|
from .base import BaseCollector, CollectionResult
|
|
|
|
|
|
class ProxmoxCollector(BaseCollector):
|
|
source_name = "proxmox"
|
|
|
|
def __init__(self, config: AppConfig):
|
|
self.config = config
|
|
|
|
def collect(self) -> CollectionResult:
|
|
if not self.config.proxmox_enabled:
|
|
return CollectionResult(source=self.source_name, assets=[], status="disabled")
|
|
if not self.config.proxmox_endpoints:
|
|
return CollectionResult(source=self.source_name, assets=[], status="skipped", error="No PROXMOX_ENDPOINTS configured")
|
|
|
|
assets: List[Dict] = []
|
|
errors: List[str] = []
|
|
|
|
for endpoint in self.config.proxmox_endpoints:
|
|
try:
|
|
assets.extend(self._collect_endpoint(endpoint))
|
|
except Exception as exc:
|
|
errors.append(f"{endpoint}: {exc}")
|
|
|
|
if errors and not assets:
|
|
return CollectionResult(source=self.source_name, assets=[], status="error", error=" | ".join(errors))
|
|
if errors:
|
|
return CollectionResult(source=self.source_name, assets=assets, status="degraded", error=" | ".join(errors))
|
|
return CollectionResult(source=self.source_name, assets=assets, status="ok")
|
|
|
|
def _collect_endpoint(self, endpoint: str) -> List[Dict]:
|
|
endpoint = endpoint.rstrip("/")
|
|
headers = {"Accept": "application/json"}
|
|
cookies = None
|
|
|
|
if self.config.proxmox_token_id and self.config.proxmox_token_secret:
|
|
headers["Authorization"] = (
|
|
f"PVEAPIToken={self.config.proxmox_token_id}={self.config.proxmox_token_secret}"
|
|
)
|
|
elif self.config.proxmox_user and self.config.proxmox_password:
|
|
token_resp = requests.post(
|
|
f"{endpoint}/api2/json/access/ticket",
|
|
data={"username": self.config.proxmox_user, "password": self.config.proxmox_password},
|
|
timeout=self.config.request_timeout_seconds,
|
|
verify=self.config.proxmox_verify_tls,
|
|
)
|
|
token_resp.raise_for_status()
|
|
payload = token_resp.json().get("data", {})
|
|
cookies = {"PVEAuthCookie": payload.get("ticket", "")}
|
|
csrf = payload.get("CSRFPreventionToken")
|
|
if csrf:
|
|
headers["CSRFPreventionToken"] = csrf
|
|
|
|
nodes_resp = requests.get(
|
|
f"{endpoint}/api2/json/nodes",
|
|
headers=headers,
|
|
cookies=cookies,
|
|
timeout=self.config.request_timeout_seconds,
|
|
verify=self.config.proxmox_verify_tls,
|
|
)
|
|
nodes_resp.raise_for_status()
|
|
nodes = nodes_resp.json().get("data", [])
|
|
|
|
assets: List[Dict] = []
|
|
for node in nodes:
|
|
node_name = node.get("node", "unknown-node")
|
|
|
|
qemu_resp = requests.get(
|
|
f"{endpoint}/api2/json/nodes/{node_name}/qemu",
|
|
headers=headers,
|
|
cookies=cookies,
|
|
timeout=self.config.request_timeout_seconds,
|
|
verify=self.config.proxmox_verify_tls,
|
|
)
|
|
qemu_resp.raise_for_status()
|
|
|
|
for vm in qemu_resp.json().get("data", []):
|
|
vm_id = str(vm.get("vmid", ""))
|
|
vm_ips = self._collect_qemu_ips(endpoint, node_name, vm_id, headers, cookies)
|
|
assets.append(
|
|
{
|
|
"asset_type": "vm",
|
|
"external_id": str(vm.get("vmid", vm.get("name", "unknown-vm"))),
|
|
"name": vm.get("name") or f"vm-{vm.get('vmid', 'unknown')}",
|
|
"hostname": vm.get("name"),
|
|
"status": vm.get("status", "unknown"),
|
|
"ip_addresses": vm_ips,
|
|
"node": node_name,
|
|
"cpu": vm.get("cpus"),
|
|
"memory_mb": (vm.get("maxmem", 0) or 0) / (1024 * 1024),
|
|
"disk_gb": (vm.get("maxdisk", 0) or 0) / (1024 * 1024 * 1024),
|
|
"metadata": {
|
|
"endpoint": endpoint,
|
|
"uptime_seconds": vm.get("uptime"),
|
|
},
|
|
}
|
|
)
|
|
|
|
lxc_resp = requests.get(
|
|
f"{endpoint}/api2/json/nodes/{node_name}/lxc",
|
|
headers=headers,
|
|
cookies=cookies,
|
|
timeout=self.config.request_timeout_seconds,
|
|
verify=self.config.proxmox_verify_tls,
|
|
)
|
|
lxc_resp.raise_for_status()
|
|
|
|
for lxc in lxc_resp.json().get("data", []):
|
|
lxc_id = str(lxc.get("vmid", ""))
|
|
lxc_ips = self._collect_lxc_ips(endpoint, node_name, lxc_id, headers, cookies)
|
|
assets.append(
|
|
{
|
|
"asset_type": "lxc",
|
|
"external_id": str(lxc.get("vmid", lxc.get("name", "unknown-lxc"))),
|
|
"name": lxc.get("name") or f"lxc-{lxc.get('vmid', 'unknown')}",
|
|
"hostname": lxc.get("name"),
|
|
"status": lxc.get("status", "unknown"),
|
|
"ip_addresses": lxc_ips,
|
|
"node": node_name,
|
|
"cpu": lxc.get("cpus"),
|
|
"memory_mb": (lxc.get("maxmem", 0) or 0) / (1024 * 1024),
|
|
"disk_gb": (lxc.get("maxdisk", 0) or 0) / (1024 * 1024 * 1024),
|
|
"metadata": {
|
|
"endpoint": endpoint,
|
|
"uptime_seconds": lxc.get("uptime"),
|
|
},
|
|
}
|
|
)
|
|
|
|
return assets
|
|
|
|
def _collect_qemu_ips(self, endpoint: str, node_name: str, vm_id: str, headers: Dict, cookies: Dict | None) -> List[str]:
|
|
ips: List[str] = []
|
|
|
|
# Guest agent provides the most accurate runtime IP list when enabled.
|
|
try:
|
|
agent_resp = requests.get(
|
|
f"{endpoint}/api2/json/nodes/{node_name}/qemu/{vm_id}/agent/network-get-interfaces",
|
|
headers=headers,
|
|
cookies=cookies,
|
|
timeout=self.config.request_timeout_seconds,
|
|
verify=self.config.proxmox_verify_tls,
|
|
)
|
|
if agent_resp.ok:
|
|
data = agent_resp.json().get("data", {})
|
|
interfaces = data.get("result", []) if isinstance(data, dict) else []
|
|
for interface in interfaces:
|
|
for addr in interface.get("ip-addresses", []) or []:
|
|
value = addr.get("ip-address")
|
|
if value:
|
|
ips.append(value)
|
|
except Exception:
|
|
pass
|
|
|
|
ips.extend(self._collect_config_ips(endpoint, node_name, "qemu", vm_id, headers, cookies))
|
|
return self._normalize_ips(ips)
|
|
|
|
def _collect_lxc_ips(self, endpoint: str, node_name: str, vm_id: str, headers: Dict, cookies: Dict | None) -> List[str]:
|
|
ips: List[str] = []
|
|
|
|
# Runtime interfaces capture DHCP-assigned addresses that are not present in static config.
|
|
try:
|
|
iface_resp = requests.get(
|
|
f"{endpoint}/api2/json/nodes/{node_name}/lxc/{vm_id}/interfaces",
|
|
headers=headers,
|
|
cookies=cookies,
|
|
timeout=self.config.request_timeout_seconds,
|
|
verify=self.config.proxmox_verify_tls,
|
|
)
|
|
if iface_resp.ok:
|
|
interfaces = iface_resp.json().get("data", [])
|
|
if isinstance(interfaces, list):
|
|
for interface in interfaces:
|
|
inet_values = interface.get("inet")
|
|
if isinstance(inet_values, list):
|
|
ips.extend(inet_values)
|
|
except Exception:
|
|
pass
|
|
|
|
ips.extend(self._collect_config_ips(endpoint, node_name, "lxc", vm_id, headers, cookies))
|
|
return self._normalize_ips(ips)
|
|
|
|
def _collect_config_ips(
|
|
self,
|
|
endpoint: str,
|
|
node_name: str,
|
|
vm_type: str,
|
|
vm_id: str,
|
|
headers: Dict,
|
|
cookies: Dict | None,
|
|
) -> List[str]:
|
|
try:
|
|
config_resp = requests.get(
|
|
f"{endpoint}/api2/json/nodes/{node_name}/{vm_type}/{vm_id}/config",
|
|
headers=headers,
|
|
cookies=cookies,
|
|
timeout=self.config.request_timeout_seconds,
|
|
verify=self.config.proxmox_verify_tls,
|
|
)
|
|
if not config_resp.ok:
|
|
return []
|
|
config = config_resp.json().get("data", {})
|
|
except Exception:
|
|
return []
|
|
|
|
values = []
|
|
for key, value in config.items():
|
|
if not isinstance(value, str):
|
|
continue
|
|
if key.startswith("net") or key in {"ipconfig0", "ipconfig1", "ipconfig2", "ipconfig3"}:
|
|
values.append(value)
|
|
|
|
ips: List[str] = []
|
|
for value in values:
|
|
ips.extend(re.findall(r"\b(?:\d{1,3}\.){3}\d{1,3}(?:/\d{1,2})?\b", value))
|
|
return ips
|
|
|
|
@staticmethod
|
|
def _normalize_ips(values: List[str]) -> List[str]:
|
|
normalized: List[str] = []
|
|
seen = set()
|
|
for value in values:
|
|
candidate = value.strip()
|
|
if "/" in candidate:
|
|
candidate = candidate.split("/", 1)[0]
|
|
try:
|
|
ip_obj = ipaddress.ip_address(candidate)
|
|
except ValueError:
|
|
continue
|
|
if ip_obj.is_loopback:
|
|
continue
|
|
text = str(ip_obj)
|
|
if text not in seen:
|
|
seen.add(text)
|
|
normalized.append(text)
|
|
return normalized
|