mirror of
https://github.com/smittix/intercept.git
synced 2026-04-25 07:10:00 -07:00
Replaces hardcoded admin credentials with a users table in the database, storing hashed passwords and user roles. Updates the login logic in app.py to authenticate against the database using Werkzeug's password hashing utilities. Adds admin credential configuration to config.py and ensures a default admin user is created during database initialization.
379 lines
12 KiB
Python
379 lines
12 KiB
Python
"""
|
|
SQLite database utilities for persistent settings storage.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
import threading
|
|
from contextlib import contextmanager
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from werkzeug.security import generate_password_hash
|
|
from config import ADMIN_USERNAME, ADMIN_PASSWORD
|
|
|
|
logger = logging.getLogger('intercept.database')
|
|
|
|
# Database file location
|
|
DB_DIR = Path(__file__).parent.parent / 'instance'
|
|
DB_PATH = DB_DIR / 'intercept.db'
|
|
|
|
# Thread-local storage for connections
|
|
_local = threading.local()
|
|
|
|
|
|
def get_db_path() -> Path:
|
|
"""Get the database file path, creating directory if needed."""
|
|
DB_DIR.mkdir(parents=True, exist_ok=True)
|
|
return DB_PATH
|
|
|
|
|
|
def get_connection() -> sqlite3.Connection:
|
|
"""Get a thread-local database connection."""
|
|
if not hasattr(_local, 'connection') or _local.connection is None:
|
|
db_path = get_db_path()
|
|
_local.connection = sqlite3.connect(str(db_path), check_same_thread=False)
|
|
_local.connection.row_factory = sqlite3.Row
|
|
# Enable foreign keys
|
|
_local.connection.execute('PRAGMA foreign_keys = ON')
|
|
return _local.connection
|
|
|
|
|
|
@contextmanager
|
|
def get_db():
|
|
"""Context manager for database operations."""
|
|
conn = get_connection()
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
|
|
|
|
def init_db() -> None:
|
|
"""Initialize the database schema."""
|
|
db_path = get_db_path()
|
|
logger.info(f"Initializing database at {db_path}")
|
|
|
|
with get_db() as conn:
|
|
# Settings table for key-value storage
|
|
conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS settings (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL,
|
|
value_type TEXT DEFAULT 'string',
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
# Signal history table for graphs
|
|
conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS signal_history (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
mode TEXT NOT NULL,
|
|
device_id TEXT NOT NULL,
|
|
signal_strength REAL,
|
|
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
metadata TEXT
|
|
)
|
|
''')
|
|
|
|
# Create index for faster queries
|
|
conn.execute('''
|
|
CREATE INDEX IF NOT EXISTS idx_signal_history_mode_device
|
|
ON signal_history(mode, device_id, timestamp)
|
|
''')
|
|
|
|
# Device correlation table
|
|
conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS device_correlations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
wifi_mac TEXT,
|
|
bt_mac TEXT,
|
|
confidence REAL,
|
|
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
metadata TEXT,
|
|
UNIQUE(wifi_mac, bt_mac)
|
|
)
|
|
''')
|
|
|
|
# Users table for authentication
|
|
conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password_hash TEXT NOT NULL,
|
|
role TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
cursor = conn.execute('SELECT COUNT(*) FROM users')
|
|
if cursor.fetchone()[0] == 0:
|
|
from config import ADMIN_USERNAME, ADMIN_PASSWORD
|
|
|
|
logger.info(f"Creating default admin user: {ADMIN_USERNAME}")
|
|
|
|
# Password hashing
|
|
hashed_pw = generate_password_hash(ADMIN_PASSWORD)
|
|
|
|
conn.execute('''
|
|
INSERT INTO users (username, password_hash, role)
|
|
VALUES (?, ?, ?)
|
|
''', (ADMIN_USERNAME, hashed_pw, 'admin'))
|
|
|
|
logger.info("Database initialized successfully")
|
|
|
|
|
|
def close_db() -> None:
|
|
"""Close the thread-local database connection."""
|
|
if hasattr(_local, 'connection') and _local.connection is not None:
|
|
_local.connection.close()
|
|
_local.connection = None
|
|
|
|
|
|
# =============================================================================
|
|
# Settings Functions
|
|
# =============================================================================
|
|
|
|
def get_setting(key: str, default: Any = None) -> Any:
|
|
"""
|
|
Get a setting value by key.
|
|
|
|
Args:
|
|
key: Setting key
|
|
default: Default value if not found
|
|
|
|
Returns:
|
|
Setting value (auto-converted from JSON for complex types)
|
|
"""
|
|
with get_db() as conn:
|
|
cursor = conn.execute(
|
|
'SELECT value, value_type FROM settings WHERE key = ?',
|
|
(key,)
|
|
)
|
|
row = cursor.fetchone()
|
|
|
|
if row is None:
|
|
return default
|
|
|
|
value, value_type = row['value'], row['value_type']
|
|
|
|
# Convert based on type
|
|
if value_type == 'json':
|
|
try:
|
|
return json.loads(value)
|
|
except json.JSONDecodeError:
|
|
return default
|
|
elif value_type == 'int':
|
|
return int(value)
|
|
elif value_type == 'float':
|
|
return float(value)
|
|
elif value_type == 'bool':
|
|
return value.lower() in ('true', '1', 'yes')
|
|
else:
|
|
return value
|
|
|
|
|
|
def set_setting(key: str, value: Any) -> None:
|
|
"""
|
|
Set a setting value.
|
|
|
|
Args:
|
|
key: Setting key
|
|
value: Setting value (will be JSON-encoded for complex types)
|
|
"""
|
|
# Determine value type and string representation
|
|
if isinstance(value, bool):
|
|
value_type = 'bool'
|
|
str_value = 'true' if value else 'false'
|
|
elif isinstance(value, int):
|
|
value_type = 'int'
|
|
str_value = str(value)
|
|
elif isinstance(value, float):
|
|
value_type = 'float'
|
|
str_value = str(value)
|
|
elif isinstance(value, (dict, list)):
|
|
value_type = 'json'
|
|
str_value = json.dumps(value)
|
|
else:
|
|
value_type = 'string'
|
|
str_value = str(value)
|
|
|
|
with get_db() as conn:
|
|
conn.execute('''
|
|
INSERT INTO settings (key, value, value_type, updated_at)
|
|
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
|
|
ON CONFLICT(key) DO UPDATE SET
|
|
value = excluded.value,
|
|
value_type = excluded.value_type,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
''', (key, str_value, value_type))
|
|
|
|
|
|
def delete_setting(key: str) -> bool:
|
|
"""
|
|
Delete a setting.
|
|
|
|
Args:
|
|
key: Setting key
|
|
|
|
Returns:
|
|
True if setting was deleted, False if not found
|
|
"""
|
|
with get_db() as conn:
|
|
cursor = conn.execute('DELETE FROM settings WHERE key = ?', (key,))
|
|
return cursor.rowcount > 0
|
|
|
|
|
|
def get_all_settings() -> dict[str, Any]:
|
|
"""Get all settings as a dictionary."""
|
|
with get_db() as conn:
|
|
cursor = conn.execute('SELECT key, value, value_type FROM settings')
|
|
settings = {}
|
|
|
|
for row in cursor:
|
|
key, value, value_type = row['key'], row['value'], row['value_type']
|
|
|
|
if value_type == 'json':
|
|
try:
|
|
settings[key] = json.loads(value)
|
|
except json.JSONDecodeError:
|
|
settings[key] = value
|
|
elif value_type == 'int':
|
|
settings[key] = int(value)
|
|
elif value_type == 'float':
|
|
settings[key] = float(value)
|
|
elif value_type == 'bool':
|
|
settings[key] = value.lower() in ('true', '1', 'yes')
|
|
else:
|
|
settings[key] = value
|
|
|
|
return settings
|
|
|
|
|
|
# =============================================================================
|
|
# Signal History Functions
|
|
# =============================================================================
|
|
|
|
def add_signal_reading(
|
|
mode: str,
|
|
device_id: str,
|
|
signal_strength: float,
|
|
metadata: dict | None = None
|
|
) -> None:
|
|
"""Add a signal strength reading."""
|
|
with get_db() as conn:
|
|
conn.execute('''
|
|
INSERT INTO signal_history (mode, device_id, signal_strength, metadata)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (mode, device_id, signal_strength, json.dumps(metadata) if metadata else None))
|
|
|
|
|
|
def get_signal_history(
|
|
mode: str,
|
|
device_id: str,
|
|
limit: int = 100,
|
|
since_minutes: int = 60
|
|
) -> list[dict]:
|
|
"""
|
|
Get signal history for a device.
|
|
|
|
Args:
|
|
mode: Mode (wifi, bluetooth, adsb, etc.)
|
|
device_id: Device identifier (MAC, ICAO, etc.)
|
|
limit: Maximum number of readings
|
|
since_minutes: Only get readings from last N minutes
|
|
|
|
Returns:
|
|
List of signal readings with timestamp
|
|
"""
|
|
with get_db() as conn:
|
|
cursor = conn.execute('''
|
|
SELECT signal_strength, timestamp, metadata
|
|
FROM signal_history
|
|
WHERE mode = ? AND device_id = ?
|
|
AND timestamp > datetime('now', ?)
|
|
ORDER BY timestamp DESC
|
|
LIMIT ?
|
|
''', (mode, device_id, f'-{since_minutes} minutes', limit))
|
|
|
|
results = []
|
|
for row in cursor:
|
|
results.append({
|
|
'signal': row['signal_strength'],
|
|
'timestamp': row['timestamp'],
|
|
'metadata': json.loads(row['metadata']) if row['metadata'] else None
|
|
})
|
|
|
|
return list(reversed(results)) # Return in chronological order
|
|
|
|
|
|
def cleanup_old_signal_history(max_age_hours: int = 24) -> int:
|
|
"""
|
|
Remove old signal history entries.
|
|
|
|
Args:
|
|
max_age_hours: Maximum age in hours
|
|
|
|
Returns:
|
|
Number of deleted entries
|
|
"""
|
|
with get_db() as conn:
|
|
cursor = conn.execute('''
|
|
DELETE FROM signal_history
|
|
WHERE timestamp < datetime('now', ?)
|
|
''', (f'-{max_age_hours} hours',))
|
|
return cursor.rowcount
|
|
|
|
|
|
# =============================================================================
|
|
# Device Correlation Functions
|
|
# =============================================================================
|
|
|
|
def add_correlation(
|
|
wifi_mac: str,
|
|
bt_mac: str,
|
|
confidence: float,
|
|
metadata: dict | None = None
|
|
) -> None:
|
|
"""Add or update a device correlation."""
|
|
with get_db() as conn:
|
|
conn.execute('''
|
|
INSERT INTO device_correlations (wifi_mac, bt_mac, confidence, metadata, last_seen)
|
|
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
ON CONFLICT(wifi_mac, bt_mac) DO UPDATE SET
|
|
confidence = excluded.confidence,
|
|
last_seen = CURRENT_TIMESTAMP,
|
|
metadata = excluded.metadata
|
|
''', (wifi_mac, bt_mac, confidence, json.dumps(metadata) if metadata else None))
|
|
|
|
|
|
def get_correlations(min_confidence: float = 0.5) -> list[dict]:
|
|
"""Get all device correlations above minimum confidence."""
|
|
with get_db() as conn:
|
|
cursor = conn.execute('''
|
|
SELECT wifi_mac, bt_mac, confidence, first_seen, last_seen, metadata
|
|
FROM device_correlations
|
|
WHERE confidence >= ?
|
|
ORDER BY confidence DESC
|
|
''', (min_confidence,))
|
|
|
|
results = []
|
|
for row in cursor:
|
|
results.append({
|
|
'wifi_mac': row['wifi_mac'],
|
|
'bt_mac': row['bt_mac'],
|
|
'confidence': row['confidence'],
|
|
'first_seen': row['first_seen'],
|
|
'last_seen': row['last_seen'],
|
|
'metadata': json.loads(row['metadata']) if row['metadata'] else None
|
|
})
|
|
|
|
return results
|