Files
inventory/database.py
Nathan Woodburn 0ce79935d7
All checks were successful
Check Code Quality / RuffCheck (push) Successful in 1m4s
Build Docker / BuildImage (push) Successful in 1m26s
feat: Initial code
2026-03-26 23:07:05 +11:00

306 lines
12 KiB
Python

import json
import sqlite3
from contextlib import closing
from datetime import datetime, timezone
from typing import Dict, Iterable, List, Optional
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
class InventoryStore:
def __init__(self, database_path: str):
self.database_path = database_path
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.database_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init(self) -> None:
with closing(self._connect()) as conn:
conn.executescript(
"""
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS sources (
name TEXT PRIMARY KEY,
enabled INTEGER NOT NULL,
last_status TEXT,
last_error TEXT,
last_success TEXT,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS collection_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at TEXT NOT NULL,
finished_at TEXT,
status TEXT NOT NULL,
error_summary TEXT
);
CREATE TABLE IF NOT EXISTS assets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
asset_type TEXT NOT NULL,
external_id TEXT NOT NULL,
name TEXT NOT NULL,
hostname TEXT,
status TEXT,
ip_addresses TEXT,
subnet TEXT,
public_ip TEXT,
node TEXT,
parent_id TEXT,
cpu REAL,
memory_mb REAL,
disk_gb REAL,
metadata_json TEXT,
last_seen TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(source, asset_type, external_id)
);
CREATE INDEX IF NOT EXISTS idx_assets_source ON assets(source);
CREATE INDEX IF NOT EXISTS idx_assets_status ON assets(status);
CREATE INDEX IF NOT EXISTS idx_assets_subnet ON assets(subnet);
"""
)
conn.commit()
def seed_sources(self, source_states: Dict[str, bool]) -> None:
now = utc_now()
with closing(self._connect()) as conn:
valid_sources = set(source_states.keys())
for source, enabled in source_states.items():
conn.execute(
"""
INSERT INTO sources(name, enabled, updated_at)
VALUES(?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
enabled=excluded.enabled,
updated_at=excluded.updated_at
""",
(source, int(enabled), now),
)
if valid_sources:
placeholders = ",".join(["?"] * len(valid_sources))
conn.execute(
f"DELETE FROM sources WHERE name NOT IN ({placeholders})",
tuple(valid_sources),
)
conn.commit()
def run_start(self) -> int:
with closing(self._connect()) as conn:
cursor = conn.execute(
"INSERT INTO collection_runs(started_at, status) VALUES(?, ?)",
(utc_now(), "running"),
)
conn.commit()
row_id = cursor.lastrowid
if row_id is None:
raise RuntimeError("Failed to create collection run")
return int(row_id)
def run_finish(self, run_id: int, status: str, error_summary: str = "") -> None:
with closing(self._connect()) as conn:
conn.execute(
"""
UPDATE collection_runs
SET finished_at=?, status=?, error_summary=?
WHERE id=?
""",
(utc_now(), status, error_summary.strip(), run_id),
)
conn.commit()
def set_source_status(self, source: str, status: str, error: str = "") -> None:
now = utc_now()
success_ts = now if status == "ok" else None
with closing(self._connect()) as conn:
conn.execute(
"""
UPDATE sources
SET last_status=?,
last_error=?,
last_success=COALESCE(?, last_success),
updated_at=?
WHERE name=?
""",
(status, error.strip(), success_ts, now, source),
)
conn.commit()
def upsert_assets(self, source: str, assets: Iterable[Dict]) -> None:
now = utc_now()
with closing(self._connect()) as conn:
for asset in assets:
conn.execute(
"""
INSERT INTO assets(
source, asset_type, external_id, name, hostname, status,
ip_addresses, subnet, public_ip, node, parent_id,
cpu, memory_mb, disk_gb, metadata_json, last_seen, updated_at
) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(source, asset_type, external_id)
DO UPDATE SET
name=excluded.name,
hostname=excluded.hostname,
status=excluded.status,
ip_addresses=excluded.ip_addresses,
subnet=excluded.subnet,
public_ip=excluded.public_ip,
node=excluded.node,
parent_id=excluded.parent_id,
cpu=excluded.cpu,
memory_mb=excluded.memory_mb,
disk_gb=excluded.disk_gb,
metadata_json=excluded.metadata_json,
last_seen=excluded.last_seen,
updated_at=excluded.updated_at
""",
(
source,
asset.get("asset_type", "unknown"),
str(asset.get("external_id", asset.get("name", "unknown"))),
asset.get("name", "unknown"),
asset.get("hostname"),
asset.get("status"),
json.dumps(asset.get("ip_addresses", [])),
asset.get("subnet"),
asset.get("public_ip"),
asset.get("node"),
asset.get("parent_id"),
asset.get("cpu"),
asset.get("memory_mb"),
asset.get("disk_gb"),
json.dumps(asset.get("metadata", {})),
now,
now,
),
)
conn.commit()
def list_assets(self) -> List[Dict]:
with closing(self._connect()) as conn:
rows = conn.execute(
"""
SELECT source, asset_type, external_id, name, hostname, status,
ip_addresses, subnet, public_ip, node, parent_id,
cpu, memory_mb, disk_gb, metadata_json, last_seen, updated_at
FROM assets
ORDER BY source, asset_type, name
"""
).fetchall()
return [self._row_to_asset(row) for row in rows]
def source_health(self) -> List[Dict]:
with closing(self._connect()) as conn:
rows = conn.execute(
"""
SELECT name, enabled, last_status, last_error, last_success, updated_at
FROM sources
ORDER BY name
"""
).fetchall()
return [dict(row) for row in rows]
def last_run(self) -> Optional[Dict]:
with closing(self._connect()) as conn:
row = conn.execute(
"""
SELECT id, started_at, finished_at, status, error_summary
FROM collection_runs
ORDER BY id DESC
LIMIT 1
"""
).fetchone()
return dict(row) if row else None
def summary(self) -> Dict:
with closing(self._connect()) as conn:
totals = conn.execute(
"""
SELECT
COUNT(*) AS total_assets,
SUM(CASE WHEN status IN ('running', 'online', 'healthy', 'up') THEN 1 ELSE 0 END) AS online_assets,
SUM(CASE WHEN status IN ('stopped', 'offline', 'down', 'error', 'unhealthy') THEN 1 ELSE 0 END) AS offline_assets,
COUNT(DISTINCT source) AS source_count,
COUNT(DISTINCT subnet) AS subnet_count
FROM assets
"""
).fetchone()
by_type = conn.execute(
"""
SELECT asset_type, COUNT(*) AS count
FROM assets
GROUP BY asset_type
ORDER BY count DESC
"""
).fetchall()
return {
"total_assets": int(totals["total_assets"] or 0),
"online_assets": int(totals["online_assets"] or 0),
"offline_assets": int(totals["offline_assets"] or 0),
"source_count": int(totals["source_count"] or 0),
"subnet_count": int(totals["subnet_count"] or 0),
"asset_breakdown": [{"asset_type": row["asset_type"], "count": row["count"]} for row in by_type],
}
def topology(self) -> Dict:
with closing(self._connect()) as conn:
rows = conn.execute(
"""
SELECT
COALESCE(subnet, 'unassigned') AS subnet,
COUNT(*) AS asset_count,
COUNT(DISTINCT source) AS source_count,
GROUP_CONCAT(DISTINCT public_ip) AS public_ips
FROM assets
GROUP BY COALESCE(subnet, 'unassigned')
ORDER BY subnet
"""
).fetchall()
networks = []
for row in rows:
ips = [value for value in (row["public_ips"] or "").split(",") if value]
networks.append(
{
"subnet": row["subnet"],
"asset_count": row["asset_count"],
"source_count": row["source_count"],
"public_ips": ips,
}
)
return {"networks": networks}
@staticmethod
def _row_to_asset(row: sqlite3.Row) -> Dict:
return {
"source": row["source"],
"asset_type": row["asset_type"],
"external_id": row["external_id"],
"name": row["name"],
"hostname": row["hostname"],
"status": row["status"],
"ip_addresses": json.loads(row["ip_addresses"] or "[]"),
"subnet": row["subnet"],
"public_ip": row["public_ip"],
"node": row["node"],
"parent_id": row["parent_id"],
"cpu": row["cpu"],
"memory_mb": row["memory_mb"],
"disk_gb": row["disk_gb"],
"metadata": json.loads(row["metadata_json"] or "{}"),
"last_seen": row["last_seen"],
"updated_at": row["updated_at"],
}