mirror of
https://github.com/smittix/intercept.git
synced 2026-04-25 07:10:00 -07:00
Revert ISMS Listening Station implementation
Remove all ISMS (Intelligent Spectrum Monitoring Station) code including:
- GSM cell scanning with gr-gsm
- Spectrum monitoring via rtl_power
- OpenCelliD tower integration
- Baseline recording and comparison
- Setup script changes for gr-gsm/libosmocore
Reverts to pre-ISMS state (commit 4c1690d).
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -194,76 +194,6 @@ def init_db() -> None:
|
||||
ON tscm_sweeps(baseline_id)
|
||||
''')
|
||||
|
||||
# =====================================================================
|
||||
# ISMS (Intelligent Spectrum Monitoring Station) Tables
|
||||
# =====================================================================
|
||||
|
||||
# ISMS Baselines - Location-based spectrum profiles
|
||||
conn.execute('''
|
||||
CREATE TABLE IF NOT EXISTS isms_baselines (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
location_name TEXT,
|
||||
latitude REAL,
|
||||
longitude REAL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
spectrum_profile TEXT,
|
||||
cellular_environment TEXT,
|
||||
known_towers TEXT,
|
||||
is_active BOOLEAN DEFAULT 0
|
||||
)
|
||||
''')
|
||||
|
||||
# ISMS Scans - Individual scan sessions
|
||||
conn.execute('''
|
||||
CREATE TABLE IF NOT EXISTS isms_scans (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
baseline_id INTEGER,
|
||||
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
completed_at TIMESTAMP,
|
||||
status TEXT DEFAULT 'running',
|
||||
scan_preset TEXT,
|
||||
gps_coords TEXT,
|
||||
results TEXT,
|
||||
findings_count INTEGER DEFAULT 0,
|
||||
FOREIGN KEY (baseline_id) REFERENCES isms_baselines(id)
|
||||
)
|
||||
''')
|
||||
|
||||
# ISMS Findings - Detected anomalies and observations
|
||||
conn.execute('''
|
||||
CREATE TABLE IF NOT EXISTS isms_findings (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
scan_id INTEGER,
|
||||
detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
finding_type TEXT NOT NULL,
|
||||
severity TEXT DEFAULT 'info',
|
||||
band TEXT,
|
||||
frequency REAL,
|
||||
description TEXT,
|
||||
details TEXT,
|
||||
acknowledged BOOLEAN DEFAULT 0,
|
||||
FOREIGN KEY (scan_id) REFERENCES isms_scans(id)
|
||||
)
|
||||
''')
|
||||
|
||||
# ISMS indexes for performance
|
||||
conn.execute('''
|
||||
CREATE INDEX IF NOT EXISTS idx_isms_baselines_location
|
||||
ON isms_baselines(latitude, longitude)
|
||||
''')
|
||||
|
||||
conn.execute('''
|
||||
CREATE INDEX IF NOT EXISTS idx_isms_findings_severity
|
||||
ON isms_findings(severity, detected_at)
|
||||
''')
|
||||
|
||||
conn.execute('''
|
||||
CREATE INDEX IF NOT EXISTS idx_isms_scans_baseline
|
||||
ON isms_scans(baseline_id)
|
||||
''')
|
||||
|
||||
logger.info("Database initialized successfully")
|
||||
|
||||
|
||||
@@ -863,354 +793,3 @@ def get_tscm_threat_summary() -> dict:
|
||||
summary['total'] += row['count']
|
||||
|
||||
return summary
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# ISMS Functions
|
||||
# =============================================================================
|
||||
|
||||
def create_isms_baseline(
|
||||
name: str,
|
||||
location_name: str | None = None,
|
||||
latitude: float | None = None,
|
||||
longitude: float | None = None,
|
||||
spectrum_profile: dict | None = None,
|
||||
cellular_environment: list | None = None,
|
||||
known_towers: list | None = None
|
||||
) -> int:
|
||||
"""
|
||||
Create a new ISMS baseline.
|
||||
|
||||
Returns:
|
||||
The ID of the created baseline
|
||||
"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute('''
|
||||
INSERT INTO isms_baselines
|
||||
(name, location_name, latitude, longitude, spectrum_profile,
|
||||
cellular_environment, known_towers)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
''', (
|
||||
name,
|
||||
location_name,
|
||||
latitude,
|
||||
longitude,
|
||||
json.dumps(spectrum_profile) if spectrum_profile else None,
|
||||
json.dumps(cellular_environment) if cellular_environment else None,
|
||||
json.dumps(known_towers) if known_towers else None
|
||||
))
|
||||
return cursor.lastrowid
|
||||
|
||||
|
||||
def get_isms_baseline(baseline_id: int) -> dict | None:
|
||||
"""Get a specific ISMS baseline by ID."""
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute(
|
||||
'SELECT * FROM isms_baselines WHERE id = ?',
|
||||
(baseline_id,)
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is None:
|
||||
return None
|
||||
|
||||
return {
|
||||
'id': row['id'],
|
||||
'name': row['name'],
|
||||
'location_name': row['location_name'],
|
||||
'latitude': row['latitude'],
|
||||
'longitude': row['longitude'],
|
||||
'created_at': row['created_at'],
|
||||
'updated_at': row['updated_at'],
|
||||
'spectrum_profile': json.loads(row['spectrum_profile']) if row['spectrum_profile'] else {},
|
||||
'cellular_environment': json.loads(row['cellular_environment']) if row['cellular_environment'] else [],
|
||||
'known_towers': json.loads(row['known_towers']) if row['known_towers'] else [],
|
||||
'is_active': bool(row['is_active'])
|
||||
}
|
||||
|
||||
|
||||
def get_all_isms_baselines() -> list[dict]:
|
||||
"""Get all ISMS baselines."""
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute('''
|
||||
SELECT id, name, location_name, latitude, longitude, created_at, is_active
|
||||
FROM isms_baselines
|
||||
ORDER BY created_at DESC
|
||||
''')
|
||||
return [dict(row) for row in cursor]
|
||||
|
||||
|
||||
def get_active_isms_baseline() -> dict | None:
|
||||
"""Get the currently active ISMS baseline."""
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute(
|
||||
'SELECT * FROM isms_baselines WHERE is_active = 1 LIMIT 1'
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is None:
|
||||
return None
|
||||
|
||||
return get_isms_baseline(row['id'])
|
||||
|
||||
|
||||
def set_active_isms_baseline(baseline_id: int | None) -> bool:
|
||||
"""Set a baseline as active (deactivates others). Pass None to deactivate all."""
|
||||
with get_db() as conn:
|
||||
# Deactivate all
|
||||
conn.execute('UPDATE isms_baselines SET is_active = 0')
|
||||
|
||||
if baseline_id is not None:
|
||||
# Activate selected
|
||||
cursor = conn.execute(
|
||||
'UPDATE isms_baselines SET is_active = 1 WHERE id = ?',
|
||||
(baseline_id,)
|
||||
)
|
||||
return cursor.rowcount > 0
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def update_isms_baseline(
|
||||
baseline_id: int,
|
||||
spectrum_profile: dict | None = None,
|
||||
cellular_environment: list | None = None,
|
||||
known_towers: list | None = None
|
||||
) -> bool:
|
||||
"""Update baseline spectrum/cellular data."""
|
||||
updates = ['updated_at = CURRENT_TIMESTAMP']
|
||||
params = []
|
||||
|
||||
if spectrum_profile is not None:
|
||||
updates.append('spectrum_profile = ?')
|
||||
params.append(json.dumps(spectrum_profile))
|
||||
if cellular_environment is not None:
|
||||
updates.append('cellular_environment = ?')
|
||||
params.append(json.dumps(cellular_environment))
|
||||
if known_towers is not None:
|
||||
updates.append('known_towers = ?')
|
||||
params.append(json.dumps(known_towers))
|
||||
|
||||
params.append(baseline_id)
|
||||
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute(
|
||||
f'UPDATE isms_baselines SET {", ".join(updates)} WHERE id = ?',
|
||||
params
|
||||
)
|
||||
return cursor.rowcount > 0
|
||||
|
||||
|
||||
def delete_isms_baseline(baseline_id: int) -> bool:
|
||||
"""Delete an ISMS baseline."""
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute(
|
||||
'DELETE FROM isms_baselines WHERE id = ?',
|
||||
(baseline_id,)
|
||||
)
|
||||
return cursor.rowcount > 0
|
||||
|
||||
|
||||
def create_isms_scan(
|
||||
scan_preset: str,
|
||||
baseline_id: int | None = None,
|
||||
gps_coords: dict | None = None
|
||||
) -> int:
|
||||
"""
|
||||
Create a new ISMS scan session.
|
||||
|
||||
Returns:
|
||||
The ID of the created scan
|
||||
"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute('''
|
||||
INSERT INTO isms_scans (baseline_id, scan_preset, gps_coords)
|
||||
VALUES (?, ?, ?)
|
||||
''', (
|
||||
baseline_id,
|
||||
scan_preset,
|
||||
json.dumps(gps_coords) if gps_coords else None
|
||||
))
|
||||
return cursor.lastrowid
|
||||
|
||||
|
||||
def update_isms_scan(
|
||||
scan_id: int,
|
||||
status: str | None = None,
|
||||
results: dict | None = None,
|
||||
findings_count: int | None = None,
|
||||
completed: bool = False
|
||||
) -> bool:
|
||||
"""Update an ISMS scan."""
|
||||
updates = []
|
||||
params = []
|
||||
|
||||
if status is not None:
|
||||
updates.append('status = ?')
|
||||
params.append(status)
|
||||
if results is not None:
|
||||
updates.append('results = ?')
|
||||
params.append(json.dumps(results))
|
||||
if findings_count is not None:
|
||||
updates.append('findings_count = ?')
|
||||
params.append(findings_count)
|
||||
if completed:
|
||||
updates.append('completed_at = CURRENT_TIMESTAMP')
|
||||
|
||||
if not updates:
|
||||
return False
|
||||
|
||||
params.append(scan_id)
|
||||
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute(
|
||||
f'UPDATE isms_scans SET {", ".join(updates)} WHERE id = ?',
|
||||
params
|
||||
)
|
||||
return cursor.rowcount > 0
|
||||
|
||||
|
||||
def get_isms_scan(scan_id: int) -> dict | None:
|
||||
"""Get a specific ISMS scan by ID."""
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute('SELECT * FROM isms_scans WHERE id = ?', (scan_id,))
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is None:
|
||||
return None
|
||||
|
||||
return {
|
||||
'id': row['id'],
|
||||
'baseline_id': row['baseline_id'],
|
||||
'started_at': row['started_at'],
|
||||
'completed_at': row['completed_at'],
|
||||
'status': row['status'],
|
||||
'scan_preset': row['scan_preset'],
|
||||
'gps_coords': json.loads(row['gps_coords']) if row['gps_coords'] else None,
|
||||
'results': json.loads(row['results']) if row['results'] else None,
|
||||
'findings_count': row['findings_count']
|
||||
}
|
||||
|
||||
|
||||
def get_recent_isms_scans(limit: int = 20) -> list[dict]:
|
||||
"""Get recent ISMS scans."""
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute('''
|
||||
SELECT id, baseline_id, started_at, completed_at, status,
|
||||
scan_preset, findings_count
|
||||
FROM isms_scans
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ?
|
||||
''', (limit,))
|
||||
return [dict(row) for row in cursor]
|
||||
|
||||
|
||||
def add_isms_finding(
|
||||
scan_id: int,
|
||||
finding_type: str,
|
||||
severity: str,
|
||||
description: str,
|
||||
band: str | None = None,
|
||||
frequency: float | None = None,
|
||||
details: dict | None = None
|
||||
) -> int:
|
||||
"""
|
||||
Add a finding to an ISMS scan.
|
||||
|
||||
Returns:
|
||||
The ID of the created finding
|
||||
"""
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute('''
|
||||
INSERT INTO isms_findings
|
||||
(scan_id, finding_type, severity, band, frequency, description, details)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
''', (
|
||||
scan_id, finding_type, severity, band, frequency, description,
|
||||
json.dumps(details) if details else None
|
||||
))
|
||||
|
||||
# Increment findings count on scan
|
||||
conn.execute(
|
||||
'UPDATE isms_scans SET findings_count = findings_count + 1 WHERE id = ?',
|
||||
(scan_id,)
|
||||
)
|
||||
|
||||
return cursor.lastrowid
|
||||
|
||||
|
||||
def get_isms_findings(
|
||||
scan_id: int | None = None,
|
||||
severity: str | None = None,
|
||||
acknowledged: bool | None = None,
|
||||
limit: int = 100
|
||||
) -> list[dict]:
|
||||
"""Get ISMS findings with optional filters."""
|
||||
conditions = []
|
||||
params = []
|
||||
|
||||
if scan_id is not None:
|
||||
conditions.append('scan_id = ?')
|
||||
params.append(scan_id)
|
||||
if severity is not None:
|
||||
conditions.append('severity = ?')
|
||||
params.append(severity)
|
||||
if acknowledged is not None:
|
||||
conditions.append('acknowledged = ?')
|
||||
params.append(1 if acknowledged else 0)
|
||||
|
||||
where_clause = f'WHERE {" AND ".join(conditions)}' if conditions else ''
|
||||
params.append(limit)
|
||||
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute(f'''
|
||||
SELECT * FROM isms_findings
|
||||
{where_clause}
|
||||
ORDER BY detected_at DESC
|
||||
LIMIT ?
|
||||
''', params)
|
||||
|
||||
results = []
|
||||
for row in cursor:
|
||||
results.append({
|
||||
'id': row['id'],
|
||||
'scan_id': row['scan_id'],
|
||||
'detected_at': row['detected_at'],
|
||||
'finding_type': row['finding_type'],
|
||||
'severity': row['severity'],
|
||||
'band': row['band'],
|
||||
'frequency': row['frequency'],
|
||||
'description': row['description'],
|
||||
'details': json.loads(row['details']) if row['details'] else None,
|
||||
'acknowledged': bool(row['acknowledged'])
|
||||
})
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def acknowledge_isms_finding(finding_id: int) -> bool:
|
||||
"""Acknowledge an ISMS finding."""
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute(
|
||||
'UPDATE isms_findings SET acknowledged = 1 WHERE id = ?',
|
||||
(finding_id,)
|
||||
)
|
||||
return cursor.rowcount > 0
|
||||
|
||||
|
||||
def get_isms_findings_summary() -> dict:
|
||||
"""Get summary counts of findings by severity."""
|
||||
with get_db() as conn:
|
||||
cursor = conn.execute('''
|
||||
SELECT severity, COUNT(*) as count
|
||||
FROM isms_findings
|
||||
WHERE acknowledged = 0
|
||||
GROUP BY severity
|
||||
''')
|
||||
|
||||
summary = {'high': 0, 'warn': 0, 'info': 0, 'total': 0}
|
||||
for row in cursor:
|
||||
summary[row['severity']] = row['count']
|
||||
summary['total'] += row['count']
|
||||
|
||||
return summary
|
||||
|
||||
Reference in New Issue
Block a user