import json import sqlite3 from contextlib import closing from datetime import datetime, timezone from typing import Dict, Iterable, List, Optional def utc_now() -> str: return datetime.now(timezone.utc).isoformat() class InventoryStore: def __init__(self, database_path: str): self.database_path = database_path def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self.database_path, check_same_thread=False) conn.row_factory = sqlite3.Row return conn def init(self) -> None: with closing(self._connect()) as conn: conn.executescript( """ PRAGMA journal_mode=WAL; CREATE TABLE IF NOT EXISTS sources ( name TEXT PRIMARY KEY, enabled INTEGER NOT NULL, last_status TEXT, last_error TEXT, last_success TEXT, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS collection_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, started_at TEXT NOT NULL, finished_at TEXT, status TEXT NOT NULL, error_summary TEXT ); CREATE TABLE IF NOT EXISTS assets ( id INTEGER PRIMARY KEY AUTOINCREMENT, source TEXT NOT NULL, asset_type TEXT NOT NULL, external_id TEXT NOT NULL, name TEXT NOT NULL, hostname TEXT, status TEXT, ip_addresses TEXT, subnet TEXT, public_ip TEXT, node TEXT, parent_id TEXT, cpu REAL, memory_mb REAL, disk_gb REAL, metadata_json TEXT, last_seen TEXT NOT NULL, updated_at TEXT NOT NULL, UNIQUE(source, asset_type, external_id) ); CREATE INDEX IF NOT EXISTS idx_assets_source ON assets(source); CREATE INDEX IF NOT EXISTS idx_assets_status ON assets(status); CREATE INDEX IF NOT EXISTS idx_assets_subnet ON assets(subnet); """ ) conn.commit() def seed_sources(self, source_states: Dict[str, bool]) -> None: now = utc_now() with closing(self._connect()) as conn: valid_sources = set(source_states.keys()) for source, enabled in source_states.items(): conn.execute( """ INSERT INTO sources(name, enabled, updated_at) VALUES(?, ?, ?) ON CONFLICT(name) DO UPDATE SET enabled=excluded.enabled, updated_at=excluded.updated_at """, (source, int(enabled), now), ) if valid_sources: placeholders = ",".join(["?"] * len(valid_sources)) conn.execute( f"DELETE FROM sources WHERE name NOT IN ({placeholders})", tuple(valid_sources), ) conn.commit() def run_start(self) -> int: with closing(self._connect()) as conn: cursor = conn.execute( "INSERT INTO collection_runs(started_at, status) VALUES(?, ?)", (utc_now(), "running"), ) conn.commit() row_id = cursor.lastrowid if row_id is None: raise RuntimeError("Failed to create collection run") return int(row_id) def run_finish(self, run_id: int, status: str, error_summary: str = "") -> None: with closing(self._connect()) as conn: conn.execute( """ UPDATE collection_runs SET finished_at=?, status=?, error_summary=? WHERE id=? """, (utc_now(), status, error_summary.strip(), run_id), ) conn.commit() def set_source_status(self, source: str, status: str, error: str = "") -> None: now = utc_now() success_ts = now if status == "ok" else None with closing(self._connect()) as conn: conn.execute( """ UPDATE sources SET last_status=?, last_error=?, last_success=COALESCE(?, last_success), updated_at=? WHERE name=? """, (status, error.strip(), success_ts, now, source), ) conn.commit() def upsert_assets(self, source: str, assets: Iterable[Dict]) -> None: now = utc_now() with closing(self._connect()) as conn: for asset in assets: conn.execute( """ INSERT INTO assets( source, asset_type, external_id, name, hostname, status, ip_addresses, subnet, public_ip, node, parent_id, cpu, memory_mb, disk_gb, metadata_json, last_seen, updated_at ) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(source, asset_type, external_id) DO UPDATE SET name=excluded.name, hostname=excluded.hostname, status=excluded.status, ip_addresses=excluded.ip_addresses, subnet=excluded.subnet, public_ip=excluded.public_ip, node=excluded.node, parent_id=excluded.parent_id, cpu=excluded.cpu, memory_mb=excluded.memory_mb, disk_gb=excluded.disk_gb, metadata_json=excluded.metadata_json, last_seen=excluded.last_seen, updated_at=excluded.updated_at """, ( source, asset.get("asset_type", "unknown"), str(asset.get("external_id", asset.get("name", "unknown"))), asset.get("name", "unknown"), asset.get("hostname"), asset.get("status"), json.dumps(asset.get("ip_addresses", [])), asset.get("subnet"), asset.get("public_ip"), asset.get("node"), asset.get("parent_id"), asset.get("cpu"), asset.get("memory_mb"), asset.get("disk_gb"), json.dumps(asset.get("metadata", {})), now, now, ), ) conn.commit() def list_assets(self) -> List[Dict]: with closing(self._connect()) as conn: rows = conn.execute( """ SELECT source, asset_type, external_id, name, hostname, status, ip_addresses, subnet, public_ip, node, parent_id, cpu, memory_mb, disk_gb, metadata_json, last_seen, updated_at FROM assets ORDER BY source, asset_type, name """ ).fetchall() return [self._row_to_asset(row) for row in rows] def source_health(self) -> List[Dict]: with closing(self._connect()) as conn: rows = conn.execute( """ SELECT name, enabled, last_status, last_error, last_success, updated_at FROM sources ORDER BY name """ ).fetchall() return [dict(row) for row in rows] def last_run(self) -> Optional[Dict]: with closing(self._connect()) as conn: row = conn.execute( """ SELECT id, started_at, finished_at, status, error_summary FROM collection_runs ORDER BY id DESC LIMIT 1 """ ).fetchone() return dict(row) if row else None def summary(self) -> Dict: with closing(self._connect()) as conn: totals = conn.execute( """ SELECT COUNT(*) AS total_assets, SUM(CASE WHEN status IN ('running', 'online', 'healthy', 'up') THEN 1 ELSE 0 END) AS online_assets, SUM(CASE WHEN status IN ('stopped', 'offline', 'down', 'error', 'unhealthy') THEN 1 ELSE 0 END) AS offline_assets, COUNT(DISTINCT source) AS source_count, COUNT(DISTINCT subnet) AS subnet_count FROM assets """ ).fetchone() by_type = conn.execute( """ SELECT asset_type, COUNT(*) AS count FROM assets GROUP BY asset_type ORDER BY count DESC """ ).fetchall() return { "total_assets": int(totals["total_assets"] or 0), "online_assets": int(totals["online_assets"] or 0), "offline_assets": int(totals["offline_assets"] or 0), "source_count": int(totals["source_count"] or 0), "subnet_count": int(totals["subnet_count"] or 0), "asset_breakdown": [{"asset_type": row["asset_type"], "count": row["count"]} for row in by_type], } def topology(self) -> Dict: with closing(self._connect()) as conn: rows = conn.execute( """ SELECT COALESCE(subnet, 'unassigned') AS subnet, COUNT(*) AS asset_count, COUNT(DISTINCT source) AS source_count, GROUP_CONCAT(DISTINCT public_ip) AS public_ips FROM assets GROUP BY COALESCE(subnet, 'unassigned') ORDER BY subnet """ ).fetchall() networks = [] for row in rows: ips = [value for value in (row["public_ips"] or "").split(",") if value] networks.append( { "subnet": row["subnet"], "asset_count": row["asset_count"], "source_count": row["source_count"], "public_ips": ips, } ) return {"networks": networks} @staticmethod def _row_to_asset(row: sqlite3.Row) -> Dict: return { "source": row["source"], "asset_type": row["asset_type"], "external_id": row["external_id"], "name": row["name"], "hostname": row["hostname"], "status": row["status"], "ip_addresses": json.loads(row["ip_addresses"] or "[]"), "subnet": row["subnet"], "public_ip": row["public_ip"], "node": row["node"], "parent_id": row["parent_id"], "cpu": row["cpu"], "memory_mb": row["memory_mb"], "disk_gb": row["disk_gb"], "metadata": json.loads(row["metadata_json"] or "{}"), "last_seen": row["last_seen"], "updated_at": row["updated_at"], }