Remove Drone Ops feature end-to-end

This commit is contained in:
Smittix
2026-02-20 17:09:17 +00:00
parent b628a5f751
commit af5b17e841
19 changed files with 2 additions and 5231 deletions

View File

@@ -555,172 +555,6 @@ def init_db() -> None:
VALUES ('40069', 'METEOR-M2', NULL, NULL, 1, 1)
''')
# =====================================================================
# Drone Ops / Professional Ops Tables
# =====================================================================
conn.execute('''
CREATE TABLE IF NOT EXISTS drone_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mode TEXT NOT NULL DEFAULT 'passive',
label TEXT,
operator TEXT,
metadata TEXT,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
stopped_at TIMESTAMP,
summary TEXT
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS drone_detections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER,
source TEXT NOT NULL,
identifier TEXT NOT NULL,
classification TEXT,
confidence REAL DEFAULT 0,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
payload_json TEXT,
remote_id_json TEXT,
FOREIGN KEY (session_id) REFERENCES drone_sessions(id) ON DELETE SET NULL,
UNIQUE(session_id, source, identifier)
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS drone_tracks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
detection_id INTEGER NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
lat REAL,
lon REAL,
altitude_m REAL,
speed_mps REAL,
heading_deg REAL,
quality REAL,
source TEXT,
FOREIGN KEY (detection_id) REFERENCES drone_detections(id) ON DELETE CASCADE
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS drone_correlations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
drone_identifier TEXT,
operator_identifier TEXT,
method TEXT,
confidence REAL,
evidence_json TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS drone_incidents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
status TEXT DEFAULT 'open',
severity TEXT DEFAULT 'medium',
opened_by TEXT,
opened_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
closed_at TIMESTAMP,
summary TEXT,
metadata TEXT
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS drone_incident_artifacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
incident_id INTEGER NOT NULL,
artifact_type TEXT NOT NULL,
artifact_ref TEXT NOT NULL,
added_by TEXT,
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata TEXT,
FOREIGN KEY (incident_id) REFERENCES drone_incidents(id) ON DELETE CASCADE
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS action_requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
incident_id INTEGER NOT NULL,
action_type TEXT NOT NULL,
requested_by TEXT NOT NULL,
requested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'pending',
payload_json TEXT,
executed_at TIMESTAMP,
executed_by TEXT,
FOREIGN KEY (incident_id) REFERENCES drone_incidents(id) ON DELETE CASCADE
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS action_approvals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id INTEGER NOT NULL,
approved_by TEXT NOT NULL,
approved_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
decision TEXT NOT NULL,
notes TEXT,
FOREIGN KEY (request_id) REFERENCES action_requests(id) ON DELETE CASCADE
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS action_audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id INTEGER,
event_type TEXT NOT NULL,
actor TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
details_json TEXT,
FOREIGN KEY (request_id) REFERENCES action_requests(id) ON DELETE SET NULL
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS evidence_manifests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
incident_id INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hash_algo TEXT DEFAULT 'sha256',
manifest_json TEXT NOT NULL,
signature TEXT,
created_by TEXT,
FOREIGN KEY (incident_id) REFERENCES drone_incidents(id) ON DELETE CASCADE
)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_drone_detections_last_seen
ON drone_detections(last_seen, confidence)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_drone_tracks_detection_time
ON drone_tracks(detection_id, timestamp)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_drone_incidents_status
ON drone_incidents(status, opened_at)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_action_requests_status
ON action_requests(status, requested_at)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_action_approvals_request
ON action_approvals(request_id, approved_at)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_evidence_manifests_incident
ON evidence_manifests(incident_id, created_at)
''')
logger.info("Database initialized successfully")
@@ -2469,806 +2303,3 @@ def remove_tracked_satellite(norad_id: str) -> tuple[bool, str]:
return True, 'Removed'
# =============================================================================
# Drone Ops / Professional Ops Functions
# =============================================================================
def _decode_json(raw: str | None, default: Any = None) -> Any:
"""Decode JSON safely with fallback default."""
if raw is None or raw == '':
return default
try:
return json.loads(raw)
except (TypeError, json.JSONDecodeError):
return default
def create_drone_session(
mode: str = 'passive',
label: str | None = None,
operator: str | None = None,
metadata: dict | None = None,
) -> int:
"""Create a Drone Ops session and return its ID."""
with get_db() as conn:
cursor = conn.execute('''
INSERT INTO drone_sessions (mode, label, operator, metadata)
VALUES (?, ?, ?, ?)
''', (mode, label, operator, json.dumps(metadata) if metadata else None))
return int(cursor.lastrowid)
def stop_drone_session(session_id: int, summary: dict | None = None) -> bool:
"""Stop an active Drone Ops session."""
with get_db() as conn:
cursor = conn.execute('''
UPDATE drone_sessions
SET stopped_at = CURRENT_TIMESTAMP, summary = ?
WHERE id = ? AND stopped_at IS NULL
''', (json.dumps(summary) if summary else None, session_id))
return cursor.rowcount > 0
def get_drone_session(session_id: int) -> dict | None:
"""Get Drone Ops session by ID."""
with get_db() as conn:
row = conn.execute(
'SELECT * FROM drone_sessions WHERE id = ?',
(session_id,),
).fetchone()
if not row:
return None
return {
'id': row['id'],
'mode': row['mode'],
'label': row['label'],
'operator': row['operator'],
'metadata': _decode_json(row['metadata'], {}),
'started_at': row['started_at'],
'stopped_at': row['stopped_at'],
'summary': _decode_json(row['summary'], {}),
'active': row['stopped_at'] is None,
}
def get_active_drone_session() -> dict | None:
"""Return currently active Drone Ops session, if any."""
with get_db() as conn:
row = conn.execute('''
SELECT * FROM drone_sessions
WHERE stopped_at IS NULL
ORDER BY started_at DESC
LIMIT 1
''').fetchone()
if not row:
return None
return {
'id': row['id'],
'mode': row['mode'],
'label': row['label'],
'operator': row['operator'],
'metadata': _decode_json(row['metadata'], {}),
'started_at': row['started_at'],
'stopped_at': row['stopped_at'],
'summary': _decode_json(row['summary'], {}),
'active': True,
}
def list_drone_sessions(limit: int = 50, active_only: bool = False) -> list[dict]:
"""List Drone Ops sessions."""
query = '''
SELECT * FROM drone_sessions
'''
params: list[Any] = []
if active_only:
query += ' WHERE stopped_at IS NULL'
query += ' ORDER BY started_at DESC LIMIT ?'
params.append(limit)
with get_db() as conn:
rows = conn.execute(query, params).fetchall()
return [
{
'id': row['id'],
'mode': row['mode'],
'label': row['label'],
'operator': row['operator'],
'metadata': _decode_json(row['metadata'], {}),
'started_at': row['started_at'],
'stopped_at': row['stopped_at'],
'summary': _decode_json(row['summary'], {}),
'active': row['stopped_at'] is None,
}
for row in rows
]
def upsert_drone_detection(
session_id: int | None,
source: str,
identifier: str,
classification: str | None = None,
confidence: float = 0.0,
payload: dict | None = None,
remote_id: dict | None = None,
) -> int:
"""Insert or update a Drone Ops detection, returning detection ID."""
source = (source or '').strip().lower()
identifier = (identifier or '').strip()
if not source or not identifier:
raise ValueError('source and identifier are required')
with get_db() as conn:
if session_id is None:
row = conn.execute('''
SELECT id FROM drone_detections
WHERE session_id IS NULL AND source = ? AND identifier = ?
''', (source, identifier)).fetchone()
else:
row = conn.execute('''
SELECT id FROM drone_detections
WHERE session_id = ? AND source = ? AND identifier = ?
''', (session_id, source, identifier)).fetchone()
payload_json = json.dumps(payload) if payload is not None else None
remote_id_json = json.dumps(remote_id) if remote_id is not None else None
if row:
detection_id = int(row['id'])
conn.execute('''
UPDATE drone_detections
SET
classification = COALESCE(?, classification),
confidence = CASE WHEN ? > confidence THEN ? ELSE confidence END,
last_seen = CURRENT_TIMESTAMP,
payload_json = COALESCE(?, payload_json),
remote_id_json = COALESCE(?, remote_id_json)
WHERE id = ?
''', (
classification,
confidence,
confidence,
payload_json,
remote_id_json,
detection_id,
))
return detection_id
cursor = conn.execute('''
INSERT INTO drone_detections
(session_id, source, identifier, classification, confidence, payload_json, remote_id_json)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
session_id,
source,
identifier,
classification,
confidence,
payload_json,
remote_id_json,
))
return int(cursor.lastrowid)
def get_drone_detection(detection_id: int) -> dict | None:
"""Get a Drone Ops detection by ID."""
with get_db() as conn:
row = conn.execute(
'SELECT * FROM drone_detections WHERE id = ?',
(detection_id,),
).fetchone()
if not row:
return None
return {
'id': row['id'],
'session_id': row['session_id'],
'source': row['source'],
'identifier': row['identifier'],
'classification': row['classification'],
'confidence': float(row['confidence'] or 0.0),
'first_seen': row['first_seen'],
'last_seen': row['last_seen'],
'payload': _decode_json(row['payload_json'], {}),
'remote_id': _decode_json(row['remote_id_json'], {}),
}
def list_drone_detections(
session_id: int | None = None,
source: str | None = None,
min_confidence: float = 0.0,
limit: int = 200,
) -> list[dict]:
"""List Drone Ops detections with optional filters."""
conditions = ['confidence >= ?']
params: list[Any] = [min_confidence]
if session_id is not None:
conditions.append('session_id = ?')
params.append(session_id)
if source:
conditions.append('source = ?')
params.append(source.strip().lower())
where_clause = 'WHERE ' + ' AND '.join(conditions)
params.append(limit)
with get_db() as conn:
rows = conn.execute(f'''
SELECT * FROM drone_detections
{where_clause}
ORDER BY last_seen DESC
LIMIT ?
''', params).fetchall()
return [
{
'id': row['id'],
'session_id': row['session_id'],
'source': row['source'],
'identifier': row['identifier'],
'classification': row['classification'],
'confidence': float(row['confidence'] or 0.0),
'first_seen': row['first_seen'],
'last_seen': row['last_seen'],
'payload': _decode_json(row['payload_json'], {}),
'remote_id': _decode_json(row['remote_id_json'], {}),
}
for row in rows
]
def add_drone_track(
detection_id: int,
lat: float | None = None,
lon: float | None = None,
altitude_m: float | None = None,
speed_mps: float | None = None,
heading_deg: float | None = None,
quality: float | None = None,
source: str | None = None,
) -> int:
"""Add a track point for a detection."""
with get_db() as conn:
cursor = conn.execute('''
INSERT INTO drone_tracks
(detection_id, lat, lon, altitude_m, speed_mps, heading_deg, quality, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (detection_id, lat, lon, altitude_m, speed_mps, heading_deg, quality, source))
return int(cursor.lastrowid)
def list_drone_tracks(
detection_id: int | None = None,
identifier: str | None = None,
limit: int = 1000,
) -> list[dict]:
"""List track points by detection ID or detection identifier."""
params: list[Any] = []
where_clause = ''
join_clause = ''
if detection_id is not None:
where_clause = 'WHERE t.detection_id = ?'
params.append(detection_id)
elif identifier:
join_clause = 'JOIN drone_detections d ON t.detection_id = d.id'
where_clause = 'WHERE d.identifier = ?'
params.append(identifier)
params.append(limit)
with get_db() as conn:
rows = conn.execute(f'''
SELECT t.*
FROM drone_tracks t
{join_clause}
{where_clause}
ORDER BY t.timestamp DESC
LIMIT ?
''', params).fetchall()
return [
{
'id': row['id'],
'detection_id': row['detection_id'],
'timestamp': row['timestamp'],
'lat': row['lat'],
'lon': row['lon'],
'altitude_m': row['altitude_m'],
'speed_mps': row['speed_mps'],
'heading_deg': row['heading_deg'],
'quality': row['quality'],
'source': row['source'],
}
for row in rows
]
def add_drone_correlation(
drone_identifier: str,
operator_identifier: str,
method: str,
confidence: float,
evidence: dict | None = None,
) -> int:
"""Store a drone/operator correlation result."""
with get_db() as conn:
cursor = conn.execute('''
INSERT INTO drone_correlations
(drone_identifier, operator_identifier, method, confidence, evidence_json)
VALUES (?, ?, ?, ?, ?)
''', (
drone_identifier,
operator_identifier,
method,
confidence,
json.dumps(evidence) if evidence else None,
))
return int(cursor.lastrowid)
def list_drone_correlations(
drone_identifier: str | None = None,
min_confidence: float = 0.0,
limit: int = 200,
) -> list[dict]:
"""List drone correlation records."""
conditions = ['confidence >= ?']
params: list[Any] = [min_confidence]
if drone_identifier:
conditions.append('drone_identifier = ?')
params.append(drone_identifier)
params.append(limit)
with get_db() as conn:
rows = conn.execute(f'''
SELECT * FROM drone_correlations
WHERE {" AND ".join(conditions)}
ORDER BY created_at DESC
LIMIT ?
''', params).fetchall()
return [
{
'id': row['id'],
'drone_identifier': row['drone_identifier'],
'operator_identifier': row['operator_identifier'],
'method': row['method'],
'confidence': float(row['confidence'] or 0.0),
'evidence': _decode_json(row['evidence_json'], {}),
'created_at': row['created_at'],
}
for row in rows
]
def create_drone_incident(
title: str,
severity: str = 'medium',
opened_by: str | None = None,
summary: str | None = None,
metadata: dict | None = None,
) -> int:
"""Create a Drone Ops incident."""
with get_db() as conn:
cursor = conn.execute('''
INSERT INTO drone_incidents
(title, severity, opened_by, summary, metadata)
VALUES (?, ?, ?, ?, ?)
''', (title, severity, opened_by, summary, json.dumps(metadata) if metadata else None))
return int(cursor.lastrowid)
def update_drone_incident(
incident_id: int,
status: str | None = None,
severity: str | None = None,
summary: str | None = None,
metadata: dict | None = None,
) -> bool:
"""Update incident status/metadata."""
updates = []
params: list[Any] = []
if status is not None:
updates.append('status = ?')
params.append(status)
if status.lower() == 'closed':
updates.append('closed_at = CURRENT_TIMESTAMP')
elif status.lower() in {'open', 'active'}:
updates.append('closed_at = NULL')
if severity is not None:
updates.append('severity = ?')
params.append(severity)
if summary is not None:
updates.append('summary = ?')
params.append(summary)
if metadata is not None:
updates.append('metadata = ?')
params.append(json.dumps(metadata))
if not updates:
return False
params.append(incident_id)
with get_db() as conn:
cursor = conn.execute(
f'UPDATE drone_incidents SET {", ".join(updates)} WHERE id = ?',
params,
)
return cursor.rowcount > 0
def add_drone_incident_artifact(
incident_id: int,
artifact_type: str,
artifact_ref: str,
added_by: str | None = None,
metadata: dict | None = None,
) -> int:
"""Add an artifact reference to an incident."""
with get_db() as conn:
cursor = conn.execute('''
INSERT INTO drone_incident_artifacts
(incident_id, artifact_type, artifact_ref, added_by, metadata)
VALUES (?, ?, ?, ?, ?)
''', (
incident_id,
artifact_type,
artifact_ref,
added_by,
json.dumps(metadata) if metadata else None,
))
return int(cursor.lastrowid)
def get_drone_incident(incident_id: int) -> dict | None:
"""Get an incident with linked artifacts and manifests."""
with get_db() as conn:
row = conn.execute(
'SELECT * FROM drone_incidents WHERE id = ?',
(incident_id,),
).fetchone()
if not row:
return None
artifacts_rows = conn.execute('''
SELECT * FROM drone_incident_artifacts
WHERE incident_id = ?
ORDER BY added_at DESC
''', (incident_id,)).fetchall()
manifests_rows = conn.execute('''
SELECT id, incident_id, created_at, hash_algo, signature, created_by, manifest_json
FROM evidence_manifests
WHERE incident_id = ?
ORDER BY created_at DESC
''', (incident_id,)).fetchall()
return {
'id': row['id'],
'title': row['title'],
'status': row['status'],
'severity': row['severity'],
'opened_by': row['opened_by'],
'opened_at': row['opened_at'],
'closed_at': row['closed_at'],
'summary': row['summary'],
'metadata': _decode_json(row['metadata'], {}),
'artifacts': [
{
'id': a['id'],
'incident_id': a['incident_id'],
'artifact_type': a['artifact_type'],
'artifact_ref': a['artifact_ref'],
'added_by': a['added_by'],
'added_at': a['added_at'],
'metadata': _decode_json(a['metadata'], {}),
}
for a in artifacts_rows
],
'manifests': [
{
'id': m['id'],
'incident_id': m['incident_id'],
'created_at': m['created_at'],
'hash_algo': m['hash_algo'],
'signature': m['signature'],
'created_by': m['created_by'],
'manifest': _decode_json(m['manifest_json'], {}),
}
for m in manifests_rows
],
}
def list_drone_incidents(status: str | None = None, limit: int = 100) -> list[dict]:
"""List incidents."""
query = 'SELECT * FROM drone_incidents'
params: list[Any] = []
if status:
query += ' WHERE status = ?'
params.append(status)
query += ' ORDER BY opened_at DESC LIMIT ?'
params.append(limit)
with get_db() as conn:
rows = conn.execute(query, params).fetchall()
return [
{
'id': row['id'],
'title': row['title'],
'status': row['status'],
'severity': row['severity'],
'opened_by': row['opened_by'],
'opened_at': row['opened_at'],
'closed_at': row['closed_at'],
'summary': row['summary'],
'metadata': _decode_json(row['metadata'], {}),
}
for row in rows
]
def create_action_request(
incident_id: int,
action_type: str,
requested_by: str,
payload: dict | None = None,
) -> int:
"""Create an action request record."""
with get_db() as conn:
cursor = conn.execute('''
INSERT INTO action_requests
(incident_id, action_type, requested_by, payload_json)
VALUES (?, ?, ?, ?)
''', (
incident_id,
action_type,
requested_by,
json.dumps(payload) if payload else None,
))
return int(cursor.lastrowid)
def update_action_request_status(
request_id: int,
status: str,
executed_by: str | None = None,
) -> bool:
"""Update action request status."""
with get_db() as conn:
if status.lower() == 'executed':
cursor = conn.execute('''
UPDATE action_requests
SET status = ?, executed_at = CURRENT_TIMESTAMP, executed_by = ?
WHERE id = ?
''', (status, executed_by, request_id))
else:
cursor = conn.execute(
'UPDATE action_requests SET status = ? WHERE id = ?',
(status, request_id),
)
return cursor.rowcount > 0
def get_action_request(request_id: int) -> dict | None:
"""Get action request with approval summary."""
with get_db() as conn:
row = conn.execute(
'SELECT * FROM action_requests WHERE id = ?',
(request_id,),
).fetchone()
if not row:
return None
approvals = conn.execute('''
SELECT id, request_id, approved_by, approved_at, decision, notes
FROM action_approvals
WHERE request_id = ?
ORDER BY approved_at ASC
''', (request_id,)).fetchall()
return {
'id': row['id'],
'incident_id': row['incident_id'],
'action_type': row['action_type'],
'requested_by': row['requested_by'],
'requested_at': row['requested_at'],
'status': row['status'],
'payload': _decode_json(row['payload_json'], {}),
'executed_at': row['executed_at'],
'executed_by': row['executed_by'],
'approvals': [
{
'id': ap['id'],
'request_id': ap['request_id'],
'approved_by': ap['approved_by'],
'approved_at': ap['approved_at'],
'decision': ap['decision'],
'notes': ap['notes'],
}
for ap in approvals
],
}
def list_action_requests(
incident_id: int | None = None,
status: str | None = None,
limit: int = 100,
) -> list[dict]:
"""List action requests with optional filtering."""
conditions = []
params: list[Any] = []
if incident_id is not None:
conditions.append('incident_id = ?')
params.append(incident_id)
if status:
conditions.append('status = ?')
params.append(status)
where_clause = f'WHERE {" AND ".join(conditions)}' if conditions else ''
params.append(limit)
with get_db() as conn:
rows = conn.execute(f'''
SELECT * FROM action_requests
{where_clause}
ORDER BY requested_at DESC
LIMIT ?
''', params).fetchall()
return [
{
'id': row['id'],
'incident_id': row['incident_id'],
'action_type': row['action_type'],
'requested_by': row['requested_by'],
'requested_at': row['requested_at'],
'status': row['status'],
'payload': _decode_json(row['payload_json'], {}),
'executed_at': row['executed_at'],
'executed_by': row['executed_by'],
}
for row in rows
]
def add_action_approval(
request_id: int,
approved_by: str,
decision: str = 'approved',
notes: str | None = None,
) -> int:
"""Add an approval decision to an action request."""
with get_db() as conn:
cursor = conn.execute('''
INSERT INTO action_approvals
(request_id, approved_by, decision, notes)
VALUES (?, ?, ?, ?)
''', (request_id, approved_by, decision, notes))
return int(cursor.lastrowid)
def add_action_audit_log(
request_id: int | None,
event_type: str,
actor: str | None = None,
details: dict | None = None,
) -> int:
"""Append an action audit log event."""
with get_db() as conn:
cursor = conn.execute('''
INSERT INTO action_audit_log
(request_id, event_type, actor, details_json)
VALUES (?, ?, ?, ?)
''', (
request_id,
event_type,
actor,
json.dumps(details) if details else None,
))
return int(cursor.lastrowid)
def list_action_audit_logs(
request_id: int | None = None,
limit: int = 200,
) -> list[dict]:
"""List action audit logs."""
params: list[Any] = []
where_clause = ''
if request_id is not None:
where_clause = 'WHERE request_id = ?'
params.append(request_id)
params.append(limit)
with get_db() as conn:
rows = conn.execute(f'''
SELECT * FROM action_audit_log
{where_clause}
ORDER BY timestamp DESC
LIMIT ?
''', params).fetchall()
return [
{
'id': row['id'],
'request_id': row['request_id'],
'event_type': row['event_type'],
'actor': row['actor'],
'timestamp': row['timestamp'],
'details': _decode_json(row['details_json'], {}),
}
for row in rows
]
def create_evidence_manifest(
incident_id: int,
manifest: dict,
hash_algo: str = 'sha256',
signature: str | None = None,
created_by: str | None = None,
) -> int:
"""Store evidence manifest metadata for an incident."""
with get_db() as conn:
cursor = conn.execute('''
INSERT INTO evidence_manifests
(incident_id, hash_algo, manifest_json, signature, created_by)
VALUES (?, ?, ?, ?, ?)
''', (
incident_id,
hash_algo,
json.dumps(manifest),
signature,
created_by,
))
return int(cursor.lastrowid)
def get_evidence_manifest(manifest_id: int) -> dict | None:
"""Get an evidence manifest by ID."""
with get_db() as conn:
row = conn.execute(
'SELECT * FROM evidence_manifests WHERE id = ?',
(manifest_id,),
).fetchone()
if not row:
return None
return {
'id': row['id'],
'incident_id': row['incident_id'],
'created_at': row['created_at'],
'hash_algo': row['hash_algo'],
'manifest': _decode_json(row['manifest_json'], {}),
'signature': row['signature'],
'created_by': row['created_by'],
}
def list_evidence_manifests(incident_id: int, limit: int = 50) -> list[dict]:
"""List manifests for an incident."""
with get_db() as conn:
rows = conn.execute('''
SELECT * FROM evidence_manifests
WHERE incident_id = ?
ORDER BY created_at DESC
LIMIT ?
''', (incident_id, limit)).fetchall()
return [
{
'id': row['id'],
'incident_id': row['incident_id'],
'created_at': row['created_at'],
'hash_algo': row['hash_algo'],
'manifest': _decode_json(row['manifest_json'], {}),
'signature': row['signature'],
'created_by': row['created_by'],
}
for row in rows
]