Files
intercept/utils/database.py
Smittix b5547d3fa9 Add Listening Post, improve setup and documentation
- Add Listening Post mode with frequency scanner and audio monitoring
- Add dependency warning for aircraft dashboard listen feature
- Auto-restart audio when switching frequencies
- Fix toolbar overflow on aircraft dashboard custom frequency
- Update setup script with full macOS/Debian support
- Clean up README and documentation for clarity
- Add sox and dump1090 to Dockerfile
- Add comprehensive tool reference to HARDWARE.md
- Add correlation, settings, and database utilities
- Add new test files for routes, validation, correlation, database

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 17:34:53 +00:00

352 lines
11 KiB
Python

"""
SQLite database utilities for persistent settings storage.
"""
from __future__ import annotations
import json
import logging
import sqlite3
import threading
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import Any
logger = logging.getLogger('intercept.database')
# Database file location
DB_DIR = Path(__file__).parent.parent / 'instance'
DB_PATH = DB_DIR / 'intercept.db'
# Thread-local storage for connections
_local = threading.local()
def get_db_path() -> Path:
"""Get the database file path, creating directory if needed."""
DB_DIR.mkdir(parents=True, exist_ok=True)
return DB_PATH
def get_connection() -> sqlite3.Connection:
"""Get a thread-local database connection."""
if not hasattr(_local, 'connection') or _local.connection is None:
db_path = get_db_path()
_local.connection = sqlite3.connect(str(db_path), check_same_thread=False)
_local.connection.row_factory = sqlite3.Row
# Enable foreign keys
_local.connection.execute('PRAGMA foreign_keys = ON')
return _local.connection
@contextmanager
def get_db():
"""Context manager for database operations."""
conn = get_connection()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
def init_db() -> None:
"""Initialize the database schema."""
db_path = get_db_path()
logger.info(f"Initializing database at {db_path}")
with get_db() as conn:
# Settings table for key-value storage
conn.execute('''
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
value_type TEXT DEFAULT 'string',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Signal history table for graphs
conn.execute('''
CREATE TABLE IF NOT EXISTS signal_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mode TEXT NOT NULL,
device_id TEXT NOT NULL,
signal_strength REAL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata TEXT
)
''')
# Create index for faster queries
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_signal_history_mode_device
ON signal_history(mode, device_id, timestamp)
''')
# Device correlation table
conn.execute('''
CREATE TABLE IF NOT EXISTS device_correlations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
wifi_mac TEXT,
bt_mac TEXT,
confidence REAL,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata TEXT,
UNIQUE(wifi_mac, bt_mac)
)
''')
logger.info("Database initialized successfully")
def close_db() -> None:
"""Close the thread-local database connection."""
if hasattr(_local, 'connection') and _local.connection is not None:
_local.connection.close()
_local.connection = None
# =============================================================================
# Settings Functions
# =============================================================================
def get_setting(key: str, default: Any = None) -> Any:
"""
Get a setting value by key.
Args:
key: Setting key
default: Default value if not found
Returns:
Setting value (auto-converted from JSON for complex types)
"""
with get_db() as conn:
cursor = conn.execute(
'SELECT value, value_type FROM settings WHERE key = ?',
(key,)
)
row = cursor.fetchone()
if row is None:
return default
value, value_type = row['value'], row['value_type']
# Convert based on type
if value_type == 'json':
try:
return json.loads(value)
except json.JSONDecodeError:
return default
elif value_type == 'int':
return int(value)
elif value_type == 'float':
return float(value)
elif value_type == 'bool':
return value.lower() in ('true', '1', 'yes')
else:
return value
def set_setting(key: str, value: Any) -> None:
"""
Set a setting value.
Args:
key: Setting key
value: Setting value (will be JSON-encoded for complex types)
"""
# Determine value type and string representation
if isinstance(value, bool):
value_type = 'bool'
str_value = 'true' if value else 'false'
elif isinstance(value, int):
value_type = 'int'
str_value = str(value)
elif isinstance(value, float):
value_type = 'float'
str_value = str(value)
elif isinstance(value, (dict, list)):
value_type = 'json'
str_value = json.dumps(value)
else:
value_type = 'string'
str_value = str(value)
with get_db() as conn:
conn.execute('''
INSERT INTO settings (key, value, value_type, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
value_type = excluded.value_type,
updated_at = CURRENT_TIMESTAMP
''', (key, str_value, value_type))
def delete_setting(key: str) -> bool:
"""
Delete a setting.
Args:
key: Setting key
Returns:
True if setting was deleted, False if not found
"""
with get_db() as conn:
cursor = conn.execute('DELETE FROM settings WHERE key = ?', (key,))
return cursor.rowcount > 0
def get_all_settings() -> dict[str, Any]:
"""Get all settings as a dictionary."""
with get_db() as conn:
cursor = conn.execute('SELECT key, value, value_type FROM settings')
settings = {}
for row in cursor:
key, value, value_type = row['key'], row['value'], row['value_type']
if value_type == 'json':
try:
settings[key] = json.loads(value)
except json.JSONDecodeError:
settings[key] = value
elif value_type == 'int':
settings[key] = int(value)
elif value_type == 'float':
settings[key] = float(value)
elif value_type == 'bool':
settings[key] = value.lower() in ('true', '1', 'yes')
else:
settings[key] = value
return settings
# =============================================================================
# Signal History Functions
# =============================================================================
def add_signal_reading(
mode: str,
device_id: str,
signal_strength: float,
metadata: dict | None = None
) -> None:
"""Add a signal strength reading."""
with get_db() as conn:
conn.execute('''
INSERT INTO signal_history (mode, device_id, signal_strength, metadata)
VALUES (?, ?, ?, ?)
''', (mode, device_id, signal_strength, json.dumps(metadata) if metadata else None))
def get_signal_history(
mode: str,
device_id: str,
limit: int = 100,
since_minutes: int = 60
) -> list[dict]:
"""
Get signal history for a device.
Args:
mode: Mode (wifi, bluetooth, adsb, etc.)
device_id: Device identifier (MAC, ICAO, etc.)
limit: Maximum number of readings
since_minutes: Only get readings from last N minutes
Returns:
List of signal readings with timestamp
"""
with get_db() as conn:
cursor = conn.execute('''
SELECT signal_strength, timestamp, metadata
FROM signal_history
WHERE mode = ? AND device_id = ?
AND timestamp > datetime('now', ?)
ORDER BY timestamp DESC
LIMIT ?
''', (mode, device_id, f'-{since_minutes} minutes', limit))
results = []
for row in cursor:
results.append({
'signal': row['signal_strength'],
'timestamp': row['timestamp'],
'metadata': json.loads(row['metadata']) if row['metadata'] else None
})
return list(reversed(results)) # Return in chronological order
def cleanup_old_signal_history(max_age_hours: int = 24) -> int:
"""
Remove old signal history entries.
Args:
max_age_hours: Maximum age in hours
Returns:
Number of deleted entries
"""
with get_db() as conn:
cursor = conn.execute('''
DELETE FROM signal_history
WHERE timestamp < datetime('now', ?)
''', (f'-{max_age_hours} hours',))
return cursor.rowcount
# =============================================================================
# Device Correlation Functions
# =============================================================================
def add_correlation(
wifi_mac: str,
bt_mac: str,
confidence: float,
metadata: dict | None = None
) -> None:
"""Add or update a device correlation."""
with get_db() as conn:
conn.execute('''
INSERT INTO device_correlations (wifi_mac, bt_mac, confidence, metadata, last_seen)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(wifi_mac, bt_mac) DO UPDATE SET
confidence = excluded.confidence,
last_seen = CURRENT_TIMESTAMP,
metadata = excluded.metadata
''', (wifi_mac, bt_mac, confidence, json.dumps(metadata) if metadata else None))
def get_correlations(min_confidence: float = 0.5) -> list[dict]:
"""Get all device correlations above minimum confidence."""
with get_db() as conn:
cursor = conn.execute('''
SELECT wifi_mac, bt_mac, confidence, first_seen, last_seen, metadata
FROM device_correlations
WHERE confidence >= ?
ORDER BY confidence DESC
''', (min_confidence,))
results = []
for row in cursor:
results.append({
'wifi_mac': row['wifi_mac'],
'bt_mac': row['bt_mac'],
'confidence': row['confidence'],
'first_seen': row['first_seen'],
'last_seen': row['last_seen'],
'metadata': json.loads(row['metadata']) if row['metadata'] else None
})
return results