diff --git a/Dockerfile b/Dockerfile
index a435d81..0b45b00 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -48,39 +48,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
airspy \
limesuite \
hackrf \
- # GSM Intelligence (tshark for packet parsing)
- tshark \
# Utilities
curl \
procps \
&& rm -rf /var/lib/apt/lists/*
-# GSM Intelligence: gr-gsm (grgsm_scanner, grgsm_livemon)
-# Install from apt if available, otherwise build from source
-RUN apt-get update \
- && apt-get install -y --no-install-recommends \
- gnuradio gr-osmosdr gr-gsm 2>/dev/null \
- || ( \
- apt-get install -y --no-install-recommends \
- gnuradio gnuradio-dev gr-osmosdr \
- git cmake libboost-all-dev libcppunit-dev swig \
- doxygen liblog4cpp5-dev python3-scipy python3-numpy \
- libvolk-dev libfftw3-dev build-essential \
- && cd /tmp \
- && git clone --depth 1 https://github.com/bkerler/gr-gsm.git \
- && cd gr-gsm \
- && mkdir build && cd build \
- && cmake .. \
- && make -j$(nproc) \
- && make install \
- && ldconfig \
- && rm -rf /tmp/gr-gsm \
- && apt-get remove -y gnuradio-dev libcppunit-dev swig doxygen \
- liblog4cpp5-dev libvolk-dev build-essential git cmake \
- && apt-get autoremove -y \
- ) \
- && rm -rf /var/lib/apt/lists/*
-
# Build dump1090-fa and acarsdec from source (packages not available in slim repos)
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
diff --git a/app.py b/app.py
index 76ee903..bb91402 100644
--- a/app.py
+++ b/app.py
@@ -39,7 +39,6 @@ from utils.constants import (
MAX_VESSEL_AGE_SECONDS,
MAX_DSC_MESSAGE_AGE_SECONDS,
MAX_DEAUTH_ALERTS_AGE_SECONDS,
- MAX_GSM_AGE_SECONDS,
QUEUE_MAX_SIZE,
)
import logging
@@ -188,16 +187,6 @@ deauth_detector = None
deauth_detector_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
deauth_detector_lock = threading.Lock()
-# GSM Spy
-gsm_spy_scanner_running = False # Flag: scanner thread active
-gsm_spy_livemon_process = None # For grgsm_livemon process
-gsm_spy_monitor_process = None # For tshark monitoring process
-gsm_spy_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
-gsm_spy_lock = threading.Lock()
-gsm_spy_active_device = None
-gsm_spy_selected_arfcn = None
-gsm_spy_region = 'Americas' # Default band
-
# ============================================
# GLOBAL STATE DICTIONARIES
# ============================================
@@ -230,16 +219,6 @@ dsc_messages = DataStore(max_age_seconds=MAX_DSC_MESSAGE_AGE_SECONDS, name='dsc_
# Deauth alerts - using DataStore for automatic cleanup
deauth_alerts = DataStore(max_age_seconds=MAX_DEAUTH_ALERTS_AGE_SECONDS, name='deauth_alerts')
-# GSM Spy data stores
-gsm_spy_towers = DataStore(
- max_age_seconds=MAX_GSM_AGE_SECONDS,
- name='gsm_spy_towers'
-)
-gsm_spy_devices = DataStore(
- max_age_seconds=MAX_GSM_AGE_SECONDS,
- name='gsm_spy_devices'
-)
-
# Satellite state
satellite_passes = [] # Predicted satellite passes (not auto-cleaned, calculated)
@@ -252,8 +231,6 @@ cleanup_manager.register(adsb_aircraft)
cleanup_manager.register(ais_vessels)
cleanup_manager.register(dsc_messages)
cleanup_manager.register(deauth_alerts)
-cleanup_manager.register(gsm_spy_towers)
-cleanup_manager.register(gsm_spy_devices)
# ============================================
# SDR DEVICE REGISTRY
@@ -687,8 +664,6 @@ def kill_all() -> Response:
global current_process, sensor_process, wifi_process, adsb_process, ais_process, acars_process
global aprs_process, aprs_rtl_process, dsc_process, dsc_rtl_process, bt_process
global dmr_process, dmr_rtl_process
- global gsm_spy_livemon_process, gsm_spy_monitor_process
- global gsm_spy_scanner_running, gsm_spy_active_device, gsm_spy_selected_arfcn, gsm_spy_region
# Import adsb and ais modules to reset their state
from routes import adsb as adsb_module
@@ -701,8 +676,7 @@ def kill_all() -> Response:
'airodump-ng', 'aireplay-ng', 'airmon-ng',
'dump1090', 'acarsdec', 'direwolf', 'AIS-catcher',
'hcitool', 'bluetoothctl', 'dsd',
- 'rtl_tcp', 'rtl_power', 'rtlamr', 'ffmpeg',
- 'grgsm_scanner', 'grgsm_livemon', 'tshark'
+ 'rtl_tcp', 'rtl_power', 'rtlamr', 'ffmpeg'
]
for proc in processes_to_kill:
@@ -771,29 +745,6 @@ def kill_all() -> Response:
except Exception:
pass
- # Reset GSM Spy state
- with gsm_spy_lock:
- gsm_spy_scanner_running = False
- gsm_spy_active_device = None
- gsm_spy_selected_arfcn = None
- gsm_spy_region = 'Americas'
-
- if gsm_spy_livemon_process:
- try:
- if safe_terminate(gsm_spy_livemon_process):
- killed.append('grgsm_livemon')
- except Exception:
- pass
- gsm_spy_livemon_process = None
-
- if gsm_spy_monitor_process:
- try:
- if safe_terminate(gsm_spy_monitor_process):
- killed.append('tshark')
- except Exception:
- pass
- gsm_spy_monitor_process = None
-
# Clear SDR device registry
with sdr_device_registry_lock:
sdr_device_registry.clear()
@@ -885,19 +836,11 @@ def main() -> None:
# Register database cleanup functions
from utils.database import (
- cleanup_old_gsm_signals,
- cleanup_old_gsm_tmsi_log,
- cleanup_old_gsm_velocity_log,
cleanup_old_signal_history,
cleanup_old_timeline_entries,
cleanup_old_dsc_alerts,
cleanup_old_payloads
)
- # GSM cleanups: signals (60 days), TMSI log (24 hours), velocity (1 hour)
- # Interval multiplier: cleanup every N cycles (60s interval = 1 cleanup per hour at multiplier 60)
- cleanup_manager.register_db_cleanup(cleanup_old_gsm_tmsi_log, interval_multiplier=60) # Every hour
- cleanup_manager.register_db_cleanup(cleanup_old_gsm_velocity_log, interval_multiplier=60) # Every hour
- cleanup_manager.register_db_cleanup(cleanup_old_gsm_signals, interval_multiplier=1440) # Every 24 hours
cleanup_manager.register_db_cleanup(cleanup_old_signal_history, interval_multiplier=1440) # Every 24 hours
cleanup_manager.register_db_cleanup(cleanup_old_timeline_entries, interval_multiplier=1440) # Every 24 hours
cleanup_manager.register_db_cleanup(cleanup_old_dsc_alerts, interval_multiplier=1440) # Every 24 hours
diff --git a/config.py b/config.py
index 98f2d1d..2e01d48 100644
--- a/config.py
+++ b/config.py
@@ -204,25 +204,20 @@ SATELLITE_UPDATE_INTERVAL = _get_env_int('SATELLITE_UPDATE_INTERVAL', 30)
SATELLITE_TRAJECTORY_POINTS = _get_env_int('SATELLITE_TRAJECTORY_POINTS', 30)
SATELLITE_ORBIT_MINUTES = _get_env_int('SATELLITE_ORBIT_MINUTES', 45)
-# Update checking
-GITHUB_REPO = _get_env('GITHUB_REPO', 'smittix/intercept')
-UPDATE_CHECK_ENABLED = _get_env_bool('UPDATE_CHECK_ENABLED', True)
-UPDATE_CHECK_INTERVAL_HOURS = _get_env_int('UPDATE_CHECK_INTERVAL_HOURS', 6)
-
-# Alerting
-ALERT_WEBHOOK_URL = _get_env('ALERT_WEBHOOK_URL', '')
-ALERT_WEBHOOK_SECRET = _get_env('ALERT_WEBHOOK_SECRET', '')
-ALERT_WEBHOOK_TIMEOUT = _get_env_int('ALERT_WEBHOOK_TIMEOUT', 5)
-
-# Admin credentials
-ADMIN_USERNAME = _get_env('ADMIN_USERNAME', 'admin')
-ADMIN_PASSWORD = _get_env('ADMIN_PASSWORD', 'admin')
+# Update checking
+GITHUB_REPO = _get_env('GITHUB_REPO', 'smittix/intercept')
+UPDATE_CHECK_ENABLED = _get_env_bool('UPDATE_CHECK_ENABLED', True)
+UPDATE_CHECK_INTERVAL_HOURS = _get_env_int('UPDATE_CHECK_INTERVAL_HOURS', 6)
+
+# Alerting
+ALERT_WEBHOOK_URL = _get_env('ALERT_WEBHOOK_URL', '')
+ALERT_WEBHOOK_SECRET = _get_env('ALERT_WEBHOOK_SECRET', '')
+ALERT_WEBHOOK_TIMEOUT = _get_env_int('ALERT_WEBHOOK_TIMEOUT', 5)
+
+# Admin credentials
+ADMIN_USERNAME = _get_env('ADMIN_USERNAME', 'admin')
+ADMIN_PASSWORD = _get_env('ADMIN_PASSWORD', 'admin')
-# GSM Spy settings
-GSM_OPENCELLID_API_KEY = _get_env('GSM_OPENCELLID_API_KEY', '')
-GSM_OPENCELLID_API_URL = _get_env('GSM_OPENCELLID_API_URL', 'https://opencellid.org/cell/get')
-GSM_API_DAILY_LIMIT = _get_env_int('GSM_API_DAILY_LIMIT', 1000)
-GSM_TA_METERS_PER_UNIT = _get_env_int('GSM_TA_METERS_PER_UNIT', 554)
def configure_logging() -> None:
"""Configure application logging."""
diff --git a/routes/__init__.py b/routes/__init__.py
index af211cc..0ac1fc2 100644
--- a/routes/__init__.py
+++ b/routes/__init__.py
@@ -31,7 +31,6 @@ def register_blueprints(app):
from .websdr import websdr_bp
from .alerts import alerts_bp
from .recordings import recordings_bp
- from .gsm_spy import gsm_spy_bp
app.register_blueprint(pager_bp)
app.register_blueprint(sensor_bp)
@@ -62,7 +61,6 @@ def register_blueprints(app):
app.register_blueprint(websdr_bp) # HF/Shortwave WebSDR
app.register_blueprint(alerts_bp) # Cross-mode alerts
app.register_blueprint(recordings_bp) # Session recordings
- app.register_blueprint(gsm_spy_bp) # GSM cellular intelligence
# Initialize TSCM state with queue and lock from app
import app as app_module
diff --git a/routes/gsm_spy.py b/routes/gsm_spy.py
deleted file mode 100644
index 49aca53..0000000
--- a/routes/gsm_spy.py
+++ /dev/null
@@ -1,2051 +0,0 @@
-"""GSM Spy route handlers for cellular tower and device tracking."""
-
-from __future__ import annotations
-
-import json
-import logging
-import os
-import queue
-import re
-import shutil
-import subprocess
-import threading
-import time
-from datetime import datetime, timedelta
-from typing import Any
-
-import requests
-from flask import Blueprint, Response, jsonify, render_template, request
-
-import app as app_module
-import config
-from config import SHARED_OBSERVER_LOCATION_ENABLED
-from utils.database import get_db
-from utils.process import register_process, safe_terminate, unregister_process
-from utils.sse import format_sse
-from utils.validation import validate_device_index
-
-from utils.logging import get_logger
-logger = get_logger('intercept.gsm_spy')
-logger.setLevel(logging.DEBUG) # GSM Spy needs verbose logging for diagnostics
-
-gsm_spy_bp = Blueprint('gsm_spy', __name__, url_prefix='/gsm_spy')
-
-# Regional band configurations (G-01)
-REGIONAL_BANDS = {
- 'Americas': {
- 'GSM850': {'start': 869e6, 'end': 894e6, 'arfcn_start': 128, 'arfcn_end': 251},
- 'PCS1900': {'start': 1930e6, 'end': 1990e6, 'arfcn_start': 512, 'arfcn_end': 810}
- },
- 'Europe': {
- 'GSM800': {'start': 832e6, 'end': 862e6, 'arfcn_start': 438, 'arfcn_end': 511}, # E-GSM800 downlink
- 'GSM850': {'start': 869e6, 'end': 894e6, 'arfcn_start': 128, 'arfcn_end': 251}, # Also used in some EU countries
- 'EGSM900': {'start': 935e6, 'end': 960e6, 'arfcn_start': 0, 'arfcn_end': 124}, # DL = 935 + 0.2*ARFCN
- 'EGSM900_EXT': {'start': 925.2e6, 'end': 935e6, 'arfcn_start': 975, 'arfcn_end': 1023}, # E-GSM extension
- 'DCS1800': {'start': 1805e6, 'end': 1880e6, 'arfcn_start': 512, 'arfcn_end': 885}
- },
- 'Asia': {
- 'EGSM900': {'start': 935e6, 'end': 960e6, 'arfcn_start': 0, 'arfcn_end': 124}, # DL = 935 + 0.2*ARFCN
- 'EGSM900_EXT': {'start': 925.2e6, 'end': 935e6, 'arfcn_start': 975, 'arfcn_end': 1023}, # E-GSM extension
- 'DCS1800': {'start': 1805e6, 'end': 1880e6, 'arfcn_start': 512, 'arfcn_end': 885}
- }
-}
-
-# Module state tracking
-gsm_using_service = False
-gsm_connected = False
-gsm_towers_found = 0
-gsm_devices_tracked = 0
-
-# Geocoding worker state
-_geocoding_worker_thread = None
-
-
-# ============================================
-# API Usage Tracking Helper Functions
-# ============================================
-
-def get_api_usage_today():
- """Get OpenCellID API usage count for today."""
- from utils.database import get_setting
- today = datetime.now().date().isoformat()
- usage_date = get_setting('gsm.opencellid.usage_date', '')
-
- # Reset counter if new day
- if usage_date != today:
- from utils.database import set_setting
- set_setting('gsm.opencellid.usage_date', today)
- set_setting('gsm.opencellid.usage_count', 0)
- return 0
-
- return get_setting('gsm.opencellid.usage_count', 0)
-
-
-def increment_api_usage():
- """Increment OpenCellID API usage counter."""
- from utils.database import set_setting
- current = get_api_usage_today()
- set_setting('gsm.opencellid.usage_count', current + 1)
- return current + 1
-
-
-def get_opencellid_api_key():
- """Get OpenCellID API key, checking env var first, then database setting."""
- env_key = config.GSM_OPENCELLID_API_KEY
- if env_key:
- return env_key
- from utils.database import get_setting
- return get_setting('gsm.opencellid.api_key', '')
-
-
-def can_use_api():
- """Check if we can make an API call within daily limit."""
- if not get_opencellid_api_key():
- return False
- current_usage = get_api_usage_today()
- return current_usage < config.GSM_API_DAILY_LIMIT
-
-
-# ============================================
-# Background Geocoding Worker
-# ============================================
-
-def start_geocoding_worker():
- """Start background thread for async geocoding."""
- global _geocoding_worker_thread
-
- # Clean poisoned cache entries (rows with NULL lat/lon from failed API responses)
- try:
- with get_db() as conn:
- deleted = conn.execute(
- 'DELETE FROM gsm_cells WHERE lat IS NULL OR lon IS NULL'
- ).rowcount
- conn.commit()
- if deleted:
- logger.info(f"Cleaned {deleted} poisoned cache entries (NULL coordinates)")
- except Exception as e:
- logger.warning(f"Could not clean cache: {e}")
-
- if _geocoding_worker_thread is None or not _geocoding_worker_thread.is_alive():
- _geocoding_worker_thread = threading.Thread(
- target=geocoding_worker,
- daemon=True,
- name='gsm-geocoding-worker'
- )
- _geocoding_worker_thread.start()
- logger.info("Started geocoding worker thread")
-
-
-def geocoding_worker():
- """Worker thread processes pending geocoding requests."""
- from utils.gsm_geocoding import lookup_cell_from_api, get_geocoding_queue
-
- geocoding_queue = get_geocoding_queue()
-
- while True:
- try:
- # Wait for pending tower with timeout
- tower_data = geocoding_queue.get(timeout=5)
-
- # Check API key and rate limit
- if not can_use_api():
- if not get_opencellid_api_key():
- logger.warning("OpenCellID API key not configured (set INTERCEPT_GSM_OPENCELLID_API_KEY or configure in Settings > API Keys)")
- else:
- current_usage = get_api_usage_today()
- logger.warning(f"OpenCellID API daily limit reached ({current_usage}/{config.GSM_API_DAILY_LIMIT})")
- geocoding_queue.task_done()
- continue
-
- # Call API
- mcc = tower_data.get('mcc')
- mnc = tower_data.get('mnc')
- lac = tower_data.get('lac')
- cid = tower_data.get('cid')
-
- logger.debug(f"Geocoding tower via API: MCC={mcc} MNC={mnc} LAC={lac} CID={cid}")
-
- coords = lookup_cell_from_api(mcc, mnc, lac, cid)
-
- if coords:
- # Update tower data with coordinates
- tower_data['lat'] = coords['lat']
- tower_data['lon'] = coords['lon']
- tower_data['source'] = 'api'
- tower_data['status'] = 'resolved'
- tower_data['type'] = 'tower_update'
-
- # Add optional fields if available
- if coords.get('azimuth') is not None:
- tower_data['azimuth'] = coords['azimuth']
- if coords.get('range_meters') is not None:
- tower_data['range_meters'] = coords['range_meters']
- if coords.get('operator'):
- tower_data['operator'] = coords['operator']
- if coords.get('radio'):
- tower_data['radio'] = coords['radio']
-
- # Update DataStore
- key = f"{mcc}_{mnc}_{lac}_{cid}"
- app_module.gsm_spy_towers[key] = tower_data
-
- # Send update to SSE stream
- try:
- app_module.gsm_spy_queue.put_nowait(tower_data)
- logger.info(f"Resolved coordinates for tower: MCC={mcc} MNC={mnc} LAC={lac} CID={cid}")
- except queue.Full:
- logger.warning("SSE queue full, dropping tower update")
-
- # Increment API usage counter
- usage_count = increment_api_usage()
- logger.info(f"OpenCellID API call #{usage_count} today")
-
- else:
- logger.warning(f"Could not resolve coordinates for tower: MCC={mcc} MNC={mnc} LAC={lac} CID={cid}")
-
- geocoding_queue.task_done()
-
- # Rate limiting between API calls (be nice to OpenCellID)
- time.sleep(1)
-
- except queue.Empty:
- # No pending towers, continue waiting
- continue
- except Exception as e:
- logger.error(f"Geocoding worker error: {e}", exc_info=True)
- time.sleep(1)
-
-
-def arfcn_to_frequency(arfcn):
- """Convert ARFCN to downlink frequency in Hz.
-
- Uses REGIONAL_BANDS to determine the correct band and conversion formula.
- Returns frequency in Hz (e.g., 925800000 for 925.8 MHz).
- """
- arfcn = int(arfcn)
-
- # Search all bands to find which one this ARFCN belongs to
- for region_bands in REGIONAL_BANDS.values():
- for band_name, band_info in region_bands.items():
- arfcn_start = band_info['arfcn_start']
- arfcn_end = band_info['arfcn_end']
-
- if arfcn_start <= arfcn <= arfcn_end:
- # Found the right band, calculate frequency
- # Downlink frequency = band_start + (arfcn - arfcn_start) * 200kHz
- freq_hz = band_info['start'] + (arfcn - arfcn_start) * 200000
- return int(freq_hz)
-
- # If ARFCN not found in any band, raise error
- raise ValueError(f"ARFCN {arfcn} not found in any known GSM band")
-
-
-def validate_band_names(bands: list[str], region: str) -> tuple[list[str], str | None]:
- """Validate band names against REGIONAL_BANDS whitelist.
-
- Args:
- bands: List of band names from user input
- region: Region name (Americas, Europe, Asia)
-
- Returns:
- Tuple of (validated_bands, error_message)
- """
- if not bands:
- return [], None
-
- region_bands = REGIONAL_BANDS.get(region)
- if not region_bands:
- return [], f"Invalid region: {region}"
-
- valid_band_names = set(region_bands.keys())
- invalid_bands = [b for b in bands if b not in valid_band_names]
-
- if invalid_bands:
- return [], (f"Invalid bands for {region}: {', '.join(invalid_bands)}. "
- f"Valid bands: {', '.join(sorted(valid_band_names))}")
-
- return bands, None
-
-
-# tshark field name discovery - field names vary between Wireshark versions
-_tshark_fields_cache: dict[str, str] | None = None
-
-
-def _discover_tshark_fields() -> dict[str, str | None]:
- """Discover correct tshark field names for GSM A protocol.
-
- Searches tshark's registered fields for keywords to find the actual
- names used by the installed Wireshark version, then validates them.
-
- Returns:
- Dict mapping logical names to actual tshark field names (or None):
- {'ta': ..., 'tmsi': ..., 'imsi': ..., 'lac': ..., 'cid': ...}
- """
- global _tshark_fields_cache
- if _tshark_fields_cache is not None:
- return _tshark_fields_cache
-
- # Search patterns for each logical field (applied to tshark -G fields output)
- # Each entry: (keyword_patterns, exclusion_patterns)
- # We search the field filter name column for matches
- search_config = {
- 'ta': {
- 'keywords': ['timing_adv', 'timing_advance'],
- 'prefer': ['gsm_a'], # prefer GSM A protocol fields
- },
- 'tmsi': {
- 'keywords': ['.tmsi'],
- 'prefer': ['gsm_a'],
- },
- 'imsi': {
- 'keywords': ['.imsi'],
- 'prefer': ['e212', 'gsm_a'],
- },
- 'lac': {
- 'keywords': ['.lac'],
- 'prefer': ['gsm_a', 'e212'],
- },
- 'cid': {
- 'keywords': ['cellid', 'cell_ci', '.cell_id', 'e212.ci'],
- 'prefer': ['gsm_a', 'e212'],
- },
- }
-
- # Step 1: Get all field names from tshark (F lines only, not P protocol lines)
- all_field_names = []
- try:
- result = subprocess.run(
- ['tshark', '-G', 'fields'],
- capture_output=True, text=True, timeout=15
- )
- for line in result.stdout.splitlines():
- if not line.startswith('F\t'):
- continue # Only actual fields, not protocols
- parts = line.split('\t')
- if len(parts) >= 3:
- all_field_names.append(parts[2])
- except Exception as e:
- logger.warning(f"Could not query tshark fields: {e}")
-
- # Step 2: Search for candidate fields matching each logical name
- candidates: dict[str, list[str]] = {}
- for logical_name, config in search_config.items():
- matches = []
- for field_name in all_field_names:
- for keyword in config['keywords']:
- if keyword in field_name:
- matches.append(field_name)
- break
- # Sort: preferred protocol prefixes first
- def sort_key(name):
- for i, pref in enumerate(config['prefer']):
- if name.startswith(pref):
- return i
- return 100
- matches.sort(key=sort_key)
- candidates[logical_name] = matches
-
- logger.info(f"tshark field candidates from -G fields: {candidates}")
-
- # Step 3: Validate candidates by testing with tshark -r /dev/null
- # Collect all unique candidate names
- all_candidate_names = set()
- for field_list in candidates.values():
- all_candidate_names.update(field_list)
-
- valid_fields = set()
- if all_candidate_names:
- # Test in batches to identify which are valid
- # Test each individually since batch testing makes it hard to identify valid ones
- for field_name in all_candidate_names:
- try:
- result = subprocess.run(
- ['tshark', '-T', 'fields', '-e', field_name, '-r', '/dev/null'],
- capture_output=True, text=True, timeout=5
- )
- if result.returncode == 0 or 'aren\'t valid' not in result.stderr:
- valid_fields.add(field_name)
- except Exception:
- pass
-
- logger.info(f"Validated tshark -e fields: {sorted(valid_fields)}")
-
- # Step 4: Resolve each logical field to the first valid candidate
- resolved: dict[str, str | None] = {}
- for logical_name, field_candidates in candidates.items():
- resolved[logical_name] = None
- for candidate in field_candidates:
- if candidate in valid_fields:
- resolved[logical_name] = candidate
- break
- if resolved[logical_name] is None and field_candidates:
- logger.warning(
- f"No valid tshark field for '{logical_name}'. "
- f"Candidates were: {field_candidates}"
- )
-
- logger.info(f"Resolved tshark fields: {resolved}")
- _tshark_fields_cache = resolved
- return resolved
-
-
-def _start_monitoring_processes(arfcn: int, device_index: int) -> tuple[subprocess.Popen, subprocess.Popen, list[str]]:
- """Start grgsm_livemon and tshark processes for monitoring an ARFCN.
-
- Returns:
- Tuple of (grgsm_process, tshark_process, field_order)
- field_order is the list of logical field names in tshark column order.
-
- Raises:
- FileNotFoundError: If grgsm_livemon or tshark not found
- RuntimeError: If grgsm_livemon or tshark exits immediately
- """
- frequency_hz = arfcn_to_frequency(arfcn)
- frequency_mhz = frequency_hz / 1e6
-
- # Check prerequisites
- if not shutil.which('grgsm_livemon'):
- raise FileNotFoundError('grgsm_livemon not found. Please install gr-gsm.')
-
- # Start grgsm_livemon
- grgsm_cmd = [
- 'grgsm_livemon',
- '--args', f'rtl={device_index}',
- '-f', f'{frequency_mhz}M'
- ]
- env = dict(os.environ,
- OSMO_FSM_DUP_CHECK_DISABLED='1',
- PYTHONUNBUFFERED='1',
- QT_QPA_PLATFORM='offscreen')
- logger.info(f"Starting grgsm_livemon: {' '.join(grgsm_cmd)}")
- grgsm_proc = subprocess.Popen(
- grgsm_cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- universal_newlines=True,
- env=env
- )
- register_process(grgsm_proc)
- logger.info(f"Started grgsm_livemon (PID: {grgsm_proc.pid})")
-
- # Wait and check it didn't die immediately
- time.sleep(2)
-
- if grgsm_proc.poll() is not None:
- # Process already exited - capture stderr for diagnostics
- stderr_output = ''
- try:
- stderr_output = grgsm_proc.stderr.read()
- except Exception:
- pass
- exit_code = grgsm_proc.returncode
- logger.error(
- f"grgsm_livemon exited immediately (code: {exit_code}). "
- f"stderr: {stderr_output[:500]}"
- )
- unregister_process(grgsm_proc)
- raise RuntimeError(
- f'grgsm_livemon failed (exit code {exit_code}): {stderr_output[:200]}'
- )
-
- # Start stderr reader thread for grgsm_livemon diagnostics
- def read_livemon_stderr():
- try:
- for line in iter(grgsm_proc.stderr.readline, ''):
- if line:
- logger.debug(f"grgsm_livemon stderr: {line.strip()}")
- except Exception:
- pass
- threading.Thread(target=read_livemon_stderr, daemon=True).start()
-
- # Start tshark
- if not shutil.which('tshark'):
- safe_terminate(grgsm_proc)
- unregister_process(grgsm_proc)
- raise FileNotFoundError('tshark not found. Please install wireshark/tshark.')
-
- fields = _discover_tshark_fields()
-
- # Build field list from only valid (non-None) fields
- # Track order so parser knows which column is which
- field_order = [] # list of logical names in column order
- tshark_cmd = [
- 'tshark',
- '-i', 'lo',
- '-l', # Line-buffered output for live capture
- '-f', 'udp port 4729', # Capture filter: only GSMTAP packets
- ]
-
- # No display filter (-Y) — the capture filter (-f 'udp port 4729')
- # already limits to GSMTAP packets, and the parser discards rows
- # without TMSI/IMSI. A -Y filter on gsm_a.tmsi misses paging
- # requests where the TMSI lives under a different field path.
- tshark_cmd.extend(['-T', 'fields'])
-
- # Add -e for each available field in known order
- for logical_name in ['ta', 'tmsi', 'imsi', 'lac', 'cid']:
- if fields.get(logical_name):
- tshark_cmd.extend(['-e', fields[logical_name]])
- field_order.append(logical_name)
-
- if not field_order:
- safe_terminate(grgsm_proc)
- unregister_process(grgsm_proc)
- raise RuntimeError('No valid tshark fields found for GSM capture')
-
- logger.info(f"tshark field order: {field_order}")
- logger.info(f"Starting tshark: {' '.join(tshark_cmd)}")
- tshark_proc = subprocess.Popen(
- tshark_cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- universal_newlines=True,
- bufsize=1
- )
- register_process(tshark_proc)
- logger.info(f"Started tshark (PID: {tshark_proc.pid})")
-
- # Check tshark didn't exit immediately
- time.sleep(1)
- if tshark_proc.poll() is not None:
- stderr_output = ''
- try:
- stderr_output = tshark_proc.stderr.read()
- except Exception:
- pass
- exit_code = tshark_proc.returncode
- logger.error(f"tshark exited immediately (code: {exit_code}). stderr: {stderr_output[:500]}")
- # Clean up grgsm_livemon since monitoring can't work without tshark
- safe_terminate(grgsm_proc)
- unregister_process(grgsm_proc)
- unregister_process(tshark_proc)
- raise RuntimeError(f'tshark failed (exit code {exit_code}): {stderr_output[:200]}')
-
- return grgsm_proc, tshark_proc, field_order
-
-
-def _start_and_register_monitor(arfcn: int, device_index: int) -> None:
- """Start monitoring processes and register them in global state.
-
- This is shared logic between start_monitor() and auto_start_monitor().
- Must be called within gsm_spy_lock context.
-
- Args:
- arfcn: ARFCN to monitor
- device_index: SDR device index
- """
- # Start monitoring processes
- grgsm_proc, tshark_proc, field_order = _start_monitoring_processes(arfcn, device_index)
- app_module.gsm_spy_livemon_process = grgsm_proc
- app_module.gsm_spy_monitor_process = tshark_proc
- app_module.gsm_spy_selected_arfcn = arfcn
-
- # Start monitoring thread
- monitor_thread_obj = threading.Thread(
- target=monitor_thread,
- args=(tshark_proc, field_order),
- daemon=True
- )
- monitor_thread_obj.start()
-
-
-@gsm_spy_bp.route('/dashboard')
-def dashboard():
- """Render GSM Spy dashboard."""
- return render_template(
- 'gsm_spy_dashboard.html',
- shared_observer_location=SHARED_OBSERVER_LOCATION_ENABLED
- )
-
-
-@gsm_spy_bp.route('/start', methods=['POST'])
-def start_scanner():
- """Start GSM scanner (G-01 BTS Scanner)."""
- global gsm_towers_found, gsm_connected
-
- with app_module.gsm_spy_lock:
- if app_module.gsm_spy_scanner_running:
- return jsonify({'error': 'Scanner already running'}), 400
-
- data = request.get_json() or {}
- device_index = data.get('device', 0)
- region = data.get('region', 'Americas')
- selected_bands = data.get('bands', []) # Get user-selected bands
-
- # Validate device index
- try:
- device_index = validate_device_index(device_index)
- except ValueError as e:
- return jsonify({'error': str(e)}), 400
-
- # Claim SDR device to prevent conflicts
- from app import claim_sdr_device
- claim_error = claim_sdr_device(device_index, 'GSM Spy')
- if claim_error:
- return jsonify({
- 'error': claim_error,
- 'error_type': 'DEVICE_BUSY'
- }), 409
-
- # If no bands selected, use all bands for the region (backwards compatibility)
- if selected_bands:
- validated_bands, error = validate_band_names(selected_bands, region)
- if error:
- from app import release_sdr_device
- release_sdr_device(device_index)
- return jsonify({'error': error}), 400
- selected_bands = validated_bands
- else:
- region_bands = REGIONAL_BANDS.get(region, REGIONAL_BANDS['Americas'])
- selected_bands = list(region_bands.keys())
- logger.warning(f"No bands specified, using all bands for {region}: {selected_bands}")
-
- # Build grgsm_scanner command
- # Example: grgsm_scanner --args="rtl=0" -b GSM900
- if not shutil.which('grgsm_scanner'):
- from app import release_sdr_device
- release_sdr_device(device_index)
- return jsonify({'error': 'grgsm_scanner not found. Please install gr-gsm.'}), 500
-
- try:
- cmd = ['grgsm_scanner']
-
- # Add device argument (--args for RTL-SDR device selection)
- cmd.extend(['--args', f'rtl={device_index}'])
-
- # Add selected band arguments
- # Map internal band names to grgsm_scanner -b values
- # grgsm_scanner accepts: GSM900, GSM850, DCS1800, PCS1900, GSM450, GSM480, GSM-R
- GRGSM_BAND_MAP = {
- 'EGSM900': 'GSM900',
- 'EGSM900_EXT': None, # Covered by GSM900 scan
- 'GSM850': 'GSM850',
- 'GSM800': None, # Not a standard GSM band for grgsm_scanner
- 'DCS1800': 'DCS1800',
- 'PCS1900': 'PCS1900',
- }
- bands_added = set()
- for band_name in selected_bands:
- grgsm_band = GRGSM_BAND_MAP.get(band_name, band_name)
- if grgsm_band is None:
- logger.info(f"Skipping band {band_name} (not supported by grgsm_scanner)")
- continue
- if grgsm_band not in bands_added:
- cmd.extend(['-b', grgsm_band])
- bands_added.add(grgsm_band)
-
- if not bands_added:
- from app import release_sdr_device
- release_sdr_device(device_index)
- return jsonify({'error': f'No scannable bands selected. '
- f'GSM800 and EGSM900_EXT are not supported by grgsm_scanner.'}), 400
-
- logger.info(f"Starting GSM scanner: {' '.join(cmd)}")
-
- # Set a flag to indicate scanner should run
- app_module.gsm_spy_active_device = device_index
- app_module.gsm_spy_region = region
- app_module.gsm_spy_scanner_running = True # Use as flag initially
-
- # Reset counters for new session
- gsm_towers_found = 0
- gsm_devices_tracked = 0
-
- # Start geocoding worker (if not already running)
- start_geocoding_worker()
-
- # Start scanning thread (will run grgsm_scanner in a loop)
- scanner_thread_obj = threading.Thread(
- target=scanner_thread,
- args=(cmd, device_index),
- daemon=True
- )
- scanner_thread_obj.start()
-
- gsm_connected = True
-
- return jsonify({
- 'status': 'started',
- 'device': device_index,
- 'region': region
- })
-
- except FileNotFoundError:
- from app import release_sdr_device
- release_sdr_device(device_index)
- return jsonify({'error': 'grgsm_scanner not found. Please install gr-gsm.'}), 500
- except Exception as e:
- from app import release_sdr_device
- release_sdr_device(device_index)
- logger.error(f"Error starting GSM scanner: {e}")
- return jsonify({'error': str(e)}), 500
-
-
-@gsm_spy_bp.route('/monitor', methods=['POST'])
-def start_monitor():
- """Start monitoring specific tower (G-02 Decoding)."""
- with app_module.gsm_spy_lock:
- if app_module.gsm_spy_monitor_process:
- return jsonify({'error': 'Monitor already running'}), 400
-
- data = request.get_json() or {}
- arfcn = data.get('arfcn')
- device_index = data.get('device', app_module.gsm_spy_active_device or 0)
-
- if not arfcn:
- return jsonify({'error': 'ARFCN required'}), 400
-
- # Validate ARFCN is valid integer and in known GSM band ranges
- try:
- arfcn = int(arfcn)
- # This will raise ValueError if ARFCN is not in any known band
- arfcn_to_frequency(arfcn)
- except (ValueError, TypeError) as e:
- return jsonify({'error': f'Invalid ARFCN: {e}'}), 400
-
- # Validate device index
- try:
- device_index = validate_device_index(device_index)
- except ValueError as e:
- return jsonify({'error': str(e)}), 400
-
- try:
- # Start and register monitoring (shared logic)
- _start_and_register_monitor(arfcn, device_index)
-
- return jsonify({
- 'status': 'monitoring',
- 'arfcn': arfcn,
- 'device': device_index
- })
-
- except FileNotFoundError as e:
- return jsonify({'error': f'Tool not found: {e}'}), 500
- except Exception as e:
- logger.error(f"Error starting monitor: {e}")
- return jsonify({'error': str(e)}), 500
-
-
-@gsm_spy_bp.route('/stop', methods=['POST'])
-def stop_scanner():
- """Stop GSM scanner and monitor."""
- global gsm_connected, gsm_towers_found, gsm_devices_tracked
-
- with app_module.gsm_spy_lock:
- killed = []
-
- # Stop scanner (now just a flag, thread will see it and exit)
- if app_module.gsm_spy_scanner_running:
- app_module.gsm_spy_scanner_running = False
- killed.append('scanner')
-
- # Terminate livemon process
- if app_module.gsm_spy_livemon_process:
- unregister_process(app_module.gsm_spy_livemon_process)
- if safe_terminate(app_module.gsm_spy_livemon_process, timeout=5):
- killed.append('livemon')
- app_module.gsm_spy_livemon_process = None
-
- # Terminate monitor process
- if app_module.gsm_spy_monitor_process:
- unregister_process(app_module.gsm_spy_monitor_process)
- if safe_terminate(app_module.gsm_spy_monitor_process, timeout=5):
- killed.append('monitor')
- app_module.gsm_spy_monitor_process = None
-
- # Release SDR device from registry
- if app_module.gsm_spy_active_device is not None:
- from app import release_sdr_device
- release_sdr_device(app_module.gsm_spy_active_device)
- app_module.gsm_spy_active_device = None
- app_module.gsm_spy_selected_arfcn = None
- gsm_connected = False
- gsm_towers_found = 0
- gsm_devices_tracked = 0
-
- return jsonify({'status': 'stopped', 'killed': killed})
-
-
-@gsm_spy_bp.route('/stream')
-def stream():
- """SSE stream for real-time GSM updates."""
- def generate():
- """Generate SSE events."""
- logger.info("SSE stream connected - client subscribed")
-
- # Send current state on connect (handles reconnects and late-joining clients)
- existing_towers = dict(app_module.gsm_spy_towers.items())
- logger.info(f"SSE sending {len(existing_towers)} existing towers on connect")
- for key, tower_data in existing_towers.items():
- yield format_sse(tower_data)
-
- last_keepalive = time.time()
- idle_count = 0 # Track consecutive idle checks to handle transitions
-
- while True:
- try:
- # Check if scanner/monitor are still running
- # Use idle counter to avoid disconnecting during scanner→monitor transition
- if not app_module.gsm_spy_scanner_running and not app_module.gsm_spy_monitor_process:
- idle_count += 1
- if idle_count >= 5: # 5 seconds grace period for mode transitions
- logger.info("SSE stream: no active scanner or monitor, disconnecting")
- yield format_sse({'type': 'disconnected'})
- break
- else:
- idle_count = 0
-
- # Try to get data from queue
- try:
- data = app_module.gsm_spy_queue.get(timeout=1)
- logger.info(f"SSE sending: type={data.get('type', '?')} keys={list(data.keys())}")
- yield format_sse(data)
- last_keepalive = time.time()
- except queue.Empty:
- # Send keepalive if needed
- if time.time() - last_keepalive > 30:
- yield format_sse({'type': 'keepalive'})
- last_keepalive = time.time()
-
- except GeneratorExit:
- logger.info("SSE stream: client disconnected (GeneratorExit)")
- break
- except Exception as e:
- logger.error(f"Error in GSM stream: {e}")
- yield format_sse({'type': 'error', 'message': str(e)})
- break
-
- response = Response(generate(), mimetype='text/event-stream')
- response.headers['Cache-Control'] = 'no-cache'
- response.headers['X-Accel-Buffering'] = 'no'
- response.headers['Connection'] = 'keep-alive'
- return response
-
-
-@gsm_spy_bp.route('/status')
-def status():
- """Get current GSM Spy status."""
- api_usage = get_api_usage_today()
- return jsonify({
- 'running': bool(app_module.gsm_spy_scanner_running),
- 'monitoring': app_module.gsm_spy_monitor_process is not None,
- 'towers_found': gsm_towers_found,
- 'devices_tracked': gsm_devices_tracked,
- 'device': app_module.gsm_spy_active_device,
- 'region': app_module.gsm_spy_region,
- 'selected_arfcn': app_module.gsm_spy_selected_arfcn,
- 'api_usage_today': api_usage,
- 'api_limit': config.GSM_API_DAILY_LIMIT,
- 'api_remaining': config.GSM_API_DAILY_LIMIT - api_usage,
- 'api_key_configured': bool(get_opencellid_api_key())
- })
-
-
-@gsm_spy_bp.route('/settings/api_key', methods=['GET', 'POST'])
-def settings_api_key():
- """Get or set OpenCellID API key configuration."""
- from utils.database import get_setting, set_setting
-
- if request.method == 'GET':
- env_key = config.GSM_OPENCELLID_API_KEY
- db_key = get_setting('gsm.opencellid.api_key', '')
-
- if env_key:
- source = 'env'
- configured = True
- elif db_key:
- source = 'database'
- configured = True
- else:
- source = 'none'
- configured = False
-
- usage_today = get_api_usage_today()
-
- return jsonify({
- 'configured': configured,
- 'source': source,
- 'usage_today': usage_today,
- 'api_limit': config.GSM_API_DAILY_LIMIT
- })
-
- # POST: save key to database
- data = request.get_json() or {}
- key = data.get('key', '').strip()
-
- if not key:
- return jsonify({'error': 'API key cannot be empty'}), 400
-
- set_setting('gsm.opencellid.api_key', key)
- logger.info("OpenCellID API key saved to database")
-
- return jsonify({
- 'status': 'saved',
- 'configured': True,
- 'source': 'database'
- })
-
-
-@gsm_spy_bp.route('/lookup_cell', methods=['POST'])
-def lookup_cell():
- """Lookup cell tower via OpenCellID (G-05)."""
- data = request.get_json() or {}
- mcc = data.get('mcc')
- mnc = data.get('mnc')
- lac = data.get('lac')
- cid = data.get('cid')
-
- if any(v is None for v in [mcc, mnc, lac, cid]):
- return jsonify({'error': 'MCC, MNC, LAC, and CID required'}), 400
-
- try:
- # Check local cache first
- with get_db() as conn:
- result = conn.execute('''
- SELECT lat, lon, azimuth, range_meters, operator, radio
- FROM gsm_cells
- WHERE mcc = ? AND mnc = ? AND lac = ? AND cid = ?
- ''', (mcc, mnc, lac, cid)).fetchone()
-
- if result:
- return jsonify({
- 'source': 'cache',
- 'lat': result['lat'],
- 'lon': result['lon'],
- 'azimuth': result['azimuth'],
- 'range': result['range_meters'],
- 'operator': result['operator'],
- 'radio': result['radio']
- })
-
- # Check API key and usage limit
- api_key = get_opencellid_api_key()
- if not api_key:
- return jsonify({'error': 'OpenCellID API key not configured'}), 503
- if not can_use_api():
- current_usage = get_api_usage_today()
- return jsonify({
- 'error': 'OpenCellID API daily limit reached',
- 'usage_today': current_usage,
- 'limit': config.GSM_API_DAILY_LIMIT
- }), 429
-
- # Call OpenCellID API
- api_url = config.GSM_OPENCELLID_API_URL
- params = {
- 'key': api_key,
- 'mcc': mcc,
- 'mnc': mnc,
- 'lac': lac,
- 'cellid': cid,
- 'format': 'json'
- }
-
- response = requests.get(api_url, params=params, timeout=10)
-
- if response.status_code == 200:
- cell_data = response.json()
-
- # Increment API usage counter
- usage_count = increment_api_usage()
- logger.info(f"OpenCellID API call #{usage_count} today")
-
- # Cache the result
- conn.execute('''
- INSERT OR REPLACE INTO gsm_cells
- (mcc, mnc, lac, cid, lat, lon, azimuth, range_meters, samples, radio, operator, last_verified)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
- ''', (
- mcc, mnc, lac, cid,
- cell_data.get('lat'),
- cell_data.get('lon'),
- cell_data.get('azimuth'),
- cell_data.get('range'),
- cell_data.get('samples'),
- cell_data.get('radio'),
- cell_data.get('operator')
- ))
- conn.commit()
-
- return jsonify({
- 'source': 'api',
- 'lat': cell_data.get('lat'),
- 'lon': cell_data.get('lon'),
- 'azimuth': cell_data.get('azimuth'),
- 'range': cell_data.get('range'),
- 'operator': cell_data.get('operator'),
- 'radio': cell_data.get('radio')
- })
- else:
- return jsonify({'error': 'Cell not found in OpenCellID'}), 404
-
- except Exception as e:
- logger.error(f"Error looking up cell: {e}")
- return jsonify({'error': str(e)}), 500
-
-
-@gsm_spy_bp.route('/detect_rogue', methods=['POST'])
-def detect_rogue():
- """Analyze and flag rogue towers (G-07)."""
- data = request.get_json() or {}
- tower_info = data.get('tower')
-
- if not tower_info:
- return jsonify({'error': 'Tower info required'}), 400
-
- try:
- is_rogue = False
- reasons = []
-
- # Check if tower exists in OpenCellID
- mcc = tower_info.get('mcc')
- mnc = tower_info.get('mnc')
- lac = tower_info.get('lac')
- cid = tower_info.get('cid')
-
- if all(v is not None for v in [mcc, mnc, lac, cid]):
- with get_db() as conn:
- result = conn.execute('''
- SELECT id FROM gsm_cells
- WHERE mcc = ? AND mnc = ? AND lac = ? AND cid = ?
- ''', (mcc, mnc, lac, cid)).fetchone()
-
- if not result:
- is_rogue = True
- reasons.append('Tower not found in OpenCellID database')
-
- # Check signal strength anomalies
- signal = tower_info.get('signal_strength', 0)
- if signal > -50: # Suspiciously strong signal
- is_rogue = True
- reasons.append(f'Unusually strong signal: {signal} dBm')
-
- # If rogue, insert into database
- if is_rogue:
- with get_db() as conn:
- conn.execute('''
- INSERT INTO gsm_rogues
- (arfcn, mcc, mnc, lac, cid, signal_strength, reason, threat_level)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?)
- ''', (
- tower_info.get('arfcn'),
- mcc, mnc, lac, cid,
- signal,
- '; '.join(reasons),
- 'high' if len(reasons) > 1 else 'medium'
- ))
- conn.commit()
-
- return jsonify({
- 'is_rogue': is_rogue,
- 'reasons': reasons
- })
-
- except Exception as e:
- logger.error(f"Error detecting rogue: {e}")
- return jsonify({'error': str(e)}), 500
-
-
-@gsm_spy_bp.route('/towers')
-def get_towers():
- """Get all detected towers."""
- towers = []
- for key, tower_data in app_module.gsm_spy_towers.items():
- towers.append(tower_data)
- return jsonify(towers)
-
-
-@gsm_spy_bp.route('/devices')
-def get_devices():
- """Get all tracked devices (IMSI/TMSI)."""
- devices = []
- for key, device_data in app_module.gsm_spy_devices.items():
- devices.append(device_data)
- return jsonify(devices)
-
-
-@gsm_spy_bp.route('/rogues')
-def get_rogues():
- """Get all detected rogue towers."""
- try:
- with get_db() as conn:
- results = conn.execute('''
- SELECT * FROM gsm_rogues
- WHERE acknowledged = 0
- ORDER BY detected_at DESC
- LIMIT 50
- ''').fetchall()
-
- rogues = [dict(row) for row in results]
- return jsonify(rogues)
- except Exception as e:
- logger.error(f"Error fetching rogues: {e}")
- return jsonify({'error': str(e)}), 500
-
-
-# ============================================
-# Advanced Features (G-08 through G-12)
-# ============================================
-
-@gsm_spy_bp.route('/velocity', methods=['GET'])
-def get_velocity_data():
- """Get velocity vectoring data for tracked devices (G-08)."""
- try:
- device_id = request.args.get('device_id')
- minutes = int(request.args.get('minutes', 60)) # Last 60 minutes by default
-
- with get_db() as conn:
- # Get velocity log entries
- query = '''
- SELECT * FROM gsm_velocity_log
- WHERE timestamp >= datetime('now', '-' || ? || ' minutes')
- '''
- params = [minutes]
-
- if device_id:
- query += ' AND device_id = ?'
- params.append(device_id)
-
- query += ' ORDER BY timestamp DESC LIMIT 100'
-
- results = conn.execute(query, params).fetchall()
- velocity_data = [dict(row) for row in results]
-
- return jsonify(velocity_data)
- except Exception as e:
- logger.error(f"Error fetching velocity data: {e}")
- return jsonify({'error': str(e)}), 500
-
-
-@gsm_spy_bp.route('/velocity/calculate', methods=['POST'])
-def calculate_velocity():
- """Calculate velocity for a device based on TA transitions (G-08)."""
- data = request.get_json() or {}
- device_id = data.get('device_id')
-
- if not device_id:
- return jsonify({'error': 'device_id required'}), 400
-
- try:
- with get_db() as conn:
- # Get last two TA readings for this device
- results = conn.execute('''
- SELECT ta_value, cid, timestamp
- FROM gsm_signals
- WHERE (imsi = ? OR tmsi = ?)
- ORDER BY timestamp DESC
- LIMIT 2
- ''', (device_id, device_id)).fetchall()
-
- if len(results) < 2:
- return jsonify({'velocity': 0, 'message': 'Insufficient data'})
-
- curr = dict(results[0])
- prev = dict(results[1])
-
- # Calculate distance change (TA * 554 meters)
- curr_distance = curr['ta_value'] * config.GSM_TA_METERS_PER_UNIT
- prev_distance = prev['ta_value'] * config.GSM_TA_METERS_PER_UNIT
- distance_change = abs(curr_distance - prev_distance)
-
- # Calculate time difference
- curr_time = datetime.fromisoformat(curr['timestamp'])
- prev_time = datetime.fromisoformat(prev['timestamp'])
- time_diff_seconds = (curr_time - prev_time).total_seconds()
-
- # Calculate velocity (m/s)
- if time_diff_seconds > 0:
- velocity = distance_change / time_diff_seconds
- else:
- velocity = 0
-
- # Store in velocity log
- conn.execute('''
- INSERT INTO gsm_velocity_log
- (device_id, prev_ta, curr_ta, prev_cid, curr_cid, estimated_velocity)
- VALUES (?, ?, ?, ?, ?, ?)
- ''', (device_id, prev['ta_value'], curr['ta_value'],
- prev['cid'], curr['cid'], velocity))
- conn.commit()
-
- return jsonify({
- 'device_id': device_id,
- 'velocity_mps': round(velocity, 2),
- 'velocity_kmh': round(velocity * 3.6, 2),
- 'distance_change_m': round(distance_change, 2),
- 'time_diff_s': round(time_diff_seconds, 2)
- })
-
- except Exception as e:
- logger.error(f"Error calculating velocity: {e}")
- return jsonify({'error': str(e)}), 500
-
-
-@gsm_spy_bp.route('/crowd_density', methods=['GET'])
-def get_crowd_density():
- """Get crowd density data by sector (G-09)."""
- try:
- hours = int(request.args.get('hours', 1)) # Last 1 hour by default
- cid = request.args.get('cid') # Optional: specific cell
-
- with get_db() as conn:
- # Count unique TMSI per cell in time window
- query = '''
- SELECT
- cid,
- lac,
- COUNT(DISTINCT tmsi) as unique_devices,
- COUNT(*) as total_pings,
- MIN(timestamp) as first_seen,
- MAX(timestamp) as last_seen
- FROM gsm_tmsi_log
- WHERE timestamp >= datetime('now', '-' || ? || ' hours')
- '''
- params = [hours]
-
- if cid:
- query += ' AND cid = ?'
- params.append(cid)
-
- query += ' GROUP BY cid, lac ORDER BY unique_devices DESC'
-
- results = conn.execute(query, params).fetchall()
- density_data = []
-
- for row in results:
- density_data.append({
- 'cid': row['cid'],
- 'lac': row['lac'],
- 'unique_devices': row['unique_devices'],
- 'total_pings': row['total_pings'],
- 'first_seen': row['first_seen'],
- 'last_seen': row['last_seen'],
- 'density_level': 'high' if row['unique_devices'] > 20 else
- 'medium' if row['unique_devices'] > 10 else 'low'
- })
-
- return jsonify(density_data)
-
- except Exception as e:
- logger.error(f"Error fetching crowd density: {e}")
- return jsonify({'error': str(e)}), 500
-
-
-@gsm_spy_bp.route('/life_patterns', methods=['GET'])
-def get_life_patterns():
- """Get life pattern analysis for a device (G-10)."""
- try:
- device_id = request.args.get('device_id')
- if not device_id:
- # Return empty results gracefully when no device selected
- return jsonify({
- 'device_id': None,
- 'patterns': [],
- 'message': 'No device selected'
- }), 200
-
- with get_db() as conn:
- # Get historical signal data
- results = conn.execute('''
- SELECT
- strftime('%H', timestamp) as hour,
- strftime('%w', timestamp) as day_of_week,
- cid,
- lac,
- COUNT(*) as occurrences
- FROM gsm_signals
- WHERE (imsi = ? OR tmsi = ?)
- AND timestamp >= datetime('now', '-60 days')
- GROUP BY hour, day_of_week, cid, lac
- ORDER BY occurrences DESC
- ''', (device_id, device_id)).fetchall()
-
- patterns = []
- for row in results:
- patterns.append({
- 'hour': int(row['hour']),
- 'day_of_week': int(row['day_of_week']),
- 'cid': row['cid'],
- 'lac': row['lac'],
- 'occurrences': row['occurrences'],
- 'day_name': ['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'][int(row['day_of_week'])]
- })
-
- # Identify regular patterns
- regular_locations = []
- for pattern in patterns[:5]: # Top 5 most frequent
- if pattern['occurrences'] >= 3: # Seen at least 3 times
- regular_locations.append({
- 'cid': pattern['cid'],
- 'typical_time': f"{pattern['day_name']} {pattern['hour']:02d}:00",
- 'frequency': pattern['occurrences']
- })
-
- return jsonify({
- 'device_id': device_id,
- 'patterns': patterns,
- 'regular_locations': regular_locations,
- 'total_observations': sum(p['occurrences'] for p in patterns)
- })
-
- except Exception as e:
- logger.error(f"Error analyzing life patterns: {e}")
- return jsonify({'error': str(e)}), 500
-
-
-@gsm_spy_bp.route('/neighbor_audit', methods=['GET'])
-def neighbor_audit():
- """Audit neighbor cell lists for consistency (G-11)."""
- try:
- cid = request.args.get('cid')
- if not cid:
- # Return empty results gracefully when no tower selected
- return jsonify({
- 'cid': None,
- 'neighbors': [],
- 'inconsistencies': [],
- 'message': 'No tower selected'
- }), 200
-
- with get_db() as conn:
- # Get tower info with metadata (neighbor list stored in metadata JSON)
- result = conn.execute('''
- SELECT metadata FROM gsm_cells WHERE cid = ?
- ''', (cid,)).fetchone()
-
- if not result or not result['metadata']:
- return jsonify({
- 'cid': cid,
- 'status': 'no_data',
- 'message': 'No neighbor list data available'
- })
-
- # Parse metadata JSON
- metadata = json.loads(result['metadata'])
- neighbor_list = metadata.get('neighbors', [])
-
- # Audit consistency
- issues = []
- for neighbor_cid in neighbor_list:
- # Check if neighbor exists in database
- neighbor_exists = conn.execute('''
- SELECT id FROM gsm_cells WHERE cid = ?
- ''', (neighbor_cid,)).fetchone()
-
- if not neighbor_exists:
- issues.append({
- 'type': 'missing_neighbor',
- 'cid': neighbor_cid,
- 'message': f'Neighbor CID {neighbor_cid} not found in database'
- })
-
- return jsonify({
- 'cid': cid,
- 'neighbor_count': len(neighbor_list),
- 'neighbors': neighbor_list,
- 'issues': issues,
- 'status': 'suspicious' if issues else 'normal'
- })
-
- except Exception as e:
- logger.error(f"Error auditing neighbors: {e}")
- return jsonify({'error': str(e)}), 500
-
-
-@gsm_spy_bp.route('/traffic_correlation', methods=['GET'])
-def traffic_correlation():
- """Correlate uplink/downlink traffic for pairing analysis (G-12)."""
- try:
- cid = request.args.get('cid')
- minutes = int(request.args.get('minutes', 5))
-
- with get_db() as conn:
- # Get recent signal activity for this cell
- results = conn.execute('''
- SELECT
- imsi,
- tmsi,
- ta_value,
- timestamp,
- metadata
- FROM gsm_signals
- WHERE cid = ?
- AND timestamp >= datetime('now', '-' || ? || ' minutes')
- ORDER BY timestamp DESC
- ''', (cid, minutes)).fetchall()
-
- correlations = []
- seen_devices = set()
-
- for row in results:
- device_id = row['imsi'] or row['tmsi']
- if device_id and device_id not in seen_devices:
- seen_devices.add(device_id)
-
- # Simple correlation: count bursts
- burst_count = conn.execute('''
- SELECT COUNT(*) as bursts
- FROM gsm_signals
- WHERE (imsi = ? OR tmsi = ?)
- AND cid = ?
- AND timestamp >= datetime('now', '-' || ? || ' minutes')
- ''', (device_id, device_id, cid, minutes)).fetchone()
-
- correlations.append({
- 'device_id': device_id,
- 'burst_count': burst_count['bursts'],
- 'last_seen': row['timestamp'],
- 'ta_value': row['ta_value'],
- 'activity_level': 'high' if burst_count['bursts'] > 10 else
- 'medium' if burst_count['bursts'] > 5 else 'low'
- })
-
- return jsonify({
- 'cid': cid,
- 'time_window_minutes': minutes,
- 'active_devices': len(correlations),
- 'correlations': correlations
- })
-
- except Exception as e:
- logger.error(f"Error correlating traffic: {e}")
- return jsonify({'error': str(e)}), 500
-
-
-# ============================================
-# Helper Functions
-# ============================================
-
-def parse_grgsm_scanner_output(line: str) -> dict[str, Any] | None:
- """Parse grgsm_scanner output line.
-
- Actual output format (comma-separated key-value pairs):
- ARFCN: 975, Freq: 925.2M, CID: 13522, LAC: 38722, MCC: 262, MNC: 1, Pwr: -58
- """
- try:
- line = line.strip()
-
- # Skip non-data lines (progress, config, neighbour info, blank)
- if not line or 'ARFCN:' not in line:
- return None
-
- # Parse "ARFCN: 975, Freq: 925.2M, CID: 13522, LAC: 38722, MCC: 262, MNC: 1, Pwr: -58"
- fields = {}
- for part in line.split(','):
- part = part.strip()
- if ':' in part:
- key, _, value = part.partition(':')
- fields[key.strip()] = value.strip()
-
- if 'ARFCN' in fields and 'CID' in fields:
- cid = int(fields.get('CID', 0))
- mcc = int(fields.get('MCC', 0))
- mnc = int(fields.get('MNC', 0))
-
- # Only skip entries with no network identity at all (MCC=0 AND MNC=0)
- # CID=0 with valid MCC/MNC is a partially decoded cell - still useful
- if mcc == 0 and mnc == 0:
- logger.debug(f"Skipping unidentified ARFCN (MCC=0, MNC=0): {line}")
- return None
-
- # Freq may have 'M' suffix (e.g. "925.2M")
- freq_str = fields.get('Freq', '0').rstrip('Mm')
-
- data = {
- 'type': 'tower',
- 'arfcn': int(fields['ARFCN']),
- 'frequency': float(freq_str),
- 'cid': cid,
- 'lac': int(fields.get('LAC', 0)),
- 'mcc': mcc,
- 'mnc': mnc,
- 'signal_strength': float(fields.get('Pwr', -999)),
- 'timestamp': datetime.now().isoformat()
- }
- return data
-
- except Exception as e:
- logger.debug(f"Failed to parse scanner line: {line} - {e}")
-
- return None
-
-
-def parse_tshark_output(line: str, field_order: list[str] | None = None) -> dict[str, Any] | None:
- """Parse tshark filtered GSM output.
-
- Args:
- line: Tab-separated tshark output line
- field_order: List of logical field names in column order.
- If None, assumes legacy order: ['ta', 'tmsi', 'imsi', 'lac', 'cid']
- """
- if field_order is None:
- field_order = ['ta', 'tmsi', 'imsi', 'lac', 'cid']
-
- try:
- parts = line.rstrip('\n\r').split('\t')
-
- if len(parts) < len(field_order):
- return None
-
- # Map logical names to column values
- field_map = {}
- for i, logical_name in enumerate(field_order):
- field_map[logical_name] = parts[i] if parts[i] else None
-
- # Convert types (use base 0 to auto-detect hex 0x prefix from tshark)
- ta_raw = field_map.get('ta')
- data = {
- 'type': 'device',
- 'ta_value': int(ta_raw, 0) if ta_raw else None,
- 'tmsi': field_map.get('tmsi'),
- 'imsi': field_map.get('imsi'),
- 'lac': int(field_map['lac'], 0) if field_map.get('lac') else None,
- 'cid': int(field_map['cid'], 0) if field_map.get('cid') else None,
- 'timestamp': datetime.now().isoformat()
- }
-
- # Need at least one identifier
- if not data['tmsi'] and not data['imsi']:
- return None
-
- # Calculate distance from TA
- if data['ta_value'] is not None:
- data['distance_meters'] = data['ta_value'] * config.GSM_TA_METERS_PER_UNIT
-
- return data
-
- except Exception as e:
- logger.debug(f"Failed to parse tshark line: {line} - {e}")
-
- return None
-
-
-def auto_start_monitor(tower_data):
- """Automatically start monitoring the strongest tower found."""
- try:
- arfcn = tower_data.get('arfcn')
- if not arfcn:
- logger.warning("Cannot auto-monitor: no ARFCN in tower data")
- return
-
- logger.info(f"Auto-monitoring strongest tower: ARFCN {arfcn}, Signal {tower_data.get('signal_strength')} dBm")
-
- # Brief delay to ensure scanner has stabilized
- time.sleep(2)
-
- with app_module.gsm_spy_lock:
- if app_module.gsm_spy_monitor_process:
- logger.info("Monitor already running, skipping auto-start")
- return
-
- device_index = app_module.gsm_spy_active_device or 0
-
- # Start and register monitoring (shared logic)
- _start_and_register_monitor(arfcn, device_index)
-
- # Send SSE notification
- try:
- app_module.gsm_spy_queue.put_nowait({
- 'type': 'auto_monitor_started',
- 'arfcn': arfcn,
- 'tower': tower_data
- })
- except queue.Full:
- pass
-
- logger.info(f"Auto-monitoring started for ARFCN {arfcn}")
-
- except Exception as e:
- logger.error(f"Error in auto-monitoring: {e}")
-
-
-def scanner_thread(cmd, device_index):
- """Thread to continuously run grgsm_scanner in a loop with non-blocking I/O.
-
- grgsm_scanner scans once and exits, so we loop it to provide
- continuous updates to the dashboard.
- """
- global gsm_towers_found
-
- strongest_tower = None
- auto_monitor_triggered = False # Moved outside loop - persists across scans
- scan_count = 0
- crash_count = 0
- process = None
-
- try:
- while app_module.gsm_spy_scanner_running: # Flag check
- scan_count += 1
- logger.info(f"Starting GSM scan #{scan_count}")
-
- try:
- # Start scanner process
- # Set OSMO_FSM_DUP_CHECK_DISABLED to prevent libosmocore
- # abort on duplicate FSM registration (common with apt gr-gsm)
- env = dict(os.environ,
- OSMO_FSM_DUP_CHECK_DISABLED='1',
- PYTHONUNBUFFERED='1',
- QT_QPA_PLATFORM='offscreen')
- process = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- universal_newlines=True,
- bufsize=1,
- env=env
- )
- register_process(process)
- logger.info(f"Started grgsm_scanner (PID: {process.pid})")
-
- # Standard pattern: reader threads with queue
- output_queue_local = queue.Queue()
-
- def read_stdout():
- try:
- for line in iter(process.stdout.readline, ''):
- if line:
- output_queue_local.put(('stdout', line))
- except Exception as e:
- logger.error(f"stdout read error: {e}")
- finally:
- output_queue_local.put(('eof', None))
-
- def read_stderr():
- try:
- for line in iter(process.stderr.readline, ''):
- if line:
- logger.debug(f"grgsm_scanner stderr: {line.strip()}")
- # grgsm_scanner outputs scan results to stderr
- output_queue_local.put(('stderr', line))
- except Exception as e:
- logger.error(f"stderr read error: {e}")
-
- stdout_thread = threading.Thread(target=read_stdout, daemon=True)
- stderr_thread = threading.Thread(target=read_stderr, daemon=True)
- stdout_thread.start()
- stderr_thread.start()
-
- # Process output with timeout
- scan_start = time.time()
- last_output = scan_start
- scan_timeout = 300 # 5 minute maximum per scan (4 bands takes ~2-3 min)
-
- while app_module.gsm_spy_scanner_running:
- # Check if process died
- if process.poll() is not None:
- logger.info(f"Scanner exited (code: {process.returncode})")
- break
-
- # Get output from queue with timeout
- try:
- msg_type, line = output_queue_local.get(timeout=1.0)
-
- if msg_type == 'eof':
- break # EOF
-
- last_output = time.time()
- stripped = line.strip()
- logger.info(f"Scanner [{msg_type}]: {stripped}")
-
- # Forward progress and status info to frontend
- progress_match = re.match(r'Scanning:\s+([\d.]+)%\s+done', stripped)
- if progress_match:
- try:
- app_module.gsm_spy_queue.put_nowait({
- 'type': 'progress',
- 'percent': float(progress_match.group(1)),
- 'scan': scan_count
- })
- except queue.Full:
- pass
- continue
- if stripped.startswith('Try scan CCCH'):
- try:
- app_module.gsm_spy_queue.put_nowait({
- 'type': 'status',
- 'message': stripped,
- 'scan': scan_count
- })
- except queue.Full:
- pass
-
- parsed = parse_grgsm_scanner_output(line)
- if parsed:
- # Enrich with coordinates
- from utils.gsm_geocoding import enrich_tower_data
- enriched = enrich_tower_data(parsed)
-
- # Store in DataStore
- key = f"{enriched['mcc']}_{enriched['mnc']}_{enriched['lac']}_{enriched['cid']}"
- app_module.gsm_spy_towers[key] = enriched
-
- # Track strongest tower
- signal = enriched.get('signal_strength', -999)
- if strongest_tower is None or signal > strongest_tower.get('signal_strength', -999):
- strongest_tower = enriched
-
- # Queue for SSE
- try:
- app_module.gsm_spy_queue.put_nowait(enriched)
- except queue.Full:
- logger.warning("Queue full, dropping tower update")
-
- # Thread-safe counter update
- with app_module.gsm_spy_lock:
- gsm_towers_found += 1
- except queue.Empty:
- # No output, check timeout
- if time.time() - last_output > scan_timeout:
- logger.warning(f"Scan timeout after {scan_timeout}s")
- break
-
- # Drain remaining queue items after process exits
- while not output_queue_local.empty():
- try:
- msg_type, line = output_queue_local.get_nowait()
- if line:
- logger.info(f"Scanner [{msg_type}] (drain): {line.strip()}")
- except queue.Empty:
- break
-
- # Clean up process with timeout
- if process.poll() is None:
- logger.info("Terminating scanner process")
- safe_terminate(process, timeout=5)
- else:
- process.wait() # Reap zombie
-
- exit_code = process.returncode
- scan_duration = time.time() - scan_start
- logger.info(f"Scan #{scan_count} complete (exit code: {exit_code}, duration: {scan_duration:.1f}s)")
-
- # Notify frontend scan completed
- try:
- app_module.gsm_spy_queue.put_nowait({
- 'type': 'scan_complete',
- 'scan': scan_count,
- 'duration': round(scan_duration, 1),
- 'towers_found': gsm_towers_found
- })
- except queue.Full:
- pass
-
- # Detect crash pattern: process exits too quickly with no data
- if scan_duration < 5 and exit_code != 0:
- crash_count += 1
- logger.error(
- f"grgsm_scanner crashed on startup (exit code: {exit_code}). "
- f"Crash count: {crash_count}. Check gr-gsm/libosmocore compatibility."
- )
- try:
- app_module.gsm_spy_queue.put_nowait({
- 'type': 'error',
- 'message': f'grgsm_scanner crashed (exit code: {exit_code}). '
- 'This may be a gr-gsm/libosmocore compatibility issue. '
- 'Try rebuilding gr-gsm from source.',
- 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%S')
- })
- except queue.Full:
- pass
- if crash_count >= 3:
- logger.error("grgsm_scanner crashed 3 times, stopping scanner")
- break
-
- except FileNotFoundError:
- logger.error(
- "grgsm_scanner not found. Please install gr-gsm: "
- "https://github.com/bkerler/gr-gsm"
- )
- # Send error to SSE stream so the UI knows
- try:
- app_module.gsm_spy_queue.put({
- 'type': 'error',
- 'message': 'grgsm_scanner not found. Please install gr-gsm.',
- 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%S')
- })
- except Exception:
- pass
- break # Don't retry - binary won't appear
-
- except Exception as e:
- logger.error(f"Scanner scan error: {e}", exc_info=True)
- if process and process.poll() is None:
- safe_terminate(process)
-
- # Check if should continue
- if not app_module.gsm_spy_scanner_running:
- break
-
- # After first scan completes: auto-switch to monitoring if towers found
- # Scanner process has exited so SDR is free for grgsm_livemon
- if not auto_monitor_triggered and strongest_tower and scan_count >= 1:
- auto_monitor_triggered = True
- arfcn = strongest_tower.get('arfcn')
- signal = strongest_tower.get('signal_strength', -999)
- logger.info(
- f"Scan complete with towers found. Auto-switching to monitor mode "
- f"on ARFCN {arfcn} (signal: {signal} dBm)"
- )
-
- # Stop scanner loop - SDR needed for monitoring
- app_module.gsm_spy_scanner_running = False
-
- try:
- app_module.gsm_spy_queue.put_nowait({
- 'type': 'status',
- 'message': f'Switching to monitor mode on ARFCN {arfcn}...'
- })
- except queue.Full:
- pass
-
- # Start monitoring (SDR is free since scanner process exited)
- try:
- with app_module.gsm_spy_lock:
- if not app_module.gsm_spy_monitor_process:
- _start_and_register_monitor(arfcn, device_index)
- logger.info(f"Auto-monitoring started for ARFCN {arfcn}")
-
- try:
- app_module.gsm_spy_queue.put_nowait({
- 'type': 'auto_monitor_started',
- 'arfcn': arfcn,
- 'tower': strongest_tower
- })
- except queue.Full:
- pass
- except Exception as e:
- logger.error(f"Error starting auto-monitor: {e}", exc_info=True)
- try:
- app_module.gsm_spy_queue.put_nowait({
- 'type': 'error',
- 'message': f'Monitor failed: {e}'
- })
- except queue.Full:
- pass
- # Resume scanning if monitor failed
- app_module.gsm_spy_scanner_running = True
-
- break # Exit scanner loop (monitoring takes over)
-
- # Wait between scans with responsive flag checking
- logger.info("Waiting 5 seconds before next scan")
- for i in range(5):
- if not app_module.gsm_spy_scanner_running:
- break
- time.sleep(1)
-
- except Exception as e:
- logger.error(f"Scanner thread fatal error: {e}", exc_info=True)
-
- finally:
- # Always cleanup
- if process and process.poll() is None:
- safe_terminate(process, timeout=5)
-
- logger.info("Scanner thread terminated")
-
- # Reset global state - but don't release SDR if monitoring took over
- with app_module.gsm_spy_lock:
- app_module.gsm_spy_scanner_running = False
- if app_module.gsm_spy_monitor_process is None:
- # No monitor running - release SDR device
- if app_module.gsm_spy_active_device is not None:
- from app import release_sdr_device
- release_sdr_device(app_module.gsm_spy_active_device)
- app_module.gsm_spy_active_device = None
- else:
- logger.info("Monitor is running, keeping SDR device allocated")
-
-
-def monitor_thread(process, field_order=None):
- """Thread to read tshark output using standard iter pattern."""
- global gsm_devices_tracked
-
- # Standard pattern: reader thread with queue
- output_queue_local = queue.Queue()
-
- def read_stdout():
- try:
- for line in iter(process.stdout.readline, ''):
- if line:
- output_queue_local.put(('stdout', line))
- except Exception as e:
- logger.error(f"tshark read error: {e}")
- finally:
- output_queue_local.put(('eof', None))
-
- def read_stderr():
- try:
- for line in iter(process.stderr.readline, ''):
- if line:
- logger.debug(f"tshark stderr: {line.strip()}")
- except Exception:
- pass
-
- stdout_thread = threading.Thread(target=read_stdout, daemon=True)
- stdout_thread.start()
- stderr_thread = threading.Thread(target=read_stderr, daemon=True)
- stderr_thread.start()
-
- monitor_start_time = time.time()
- packets_captured = 0
- lines_received = 0
- last_heartbeat = time.time()
-
- try:
- while app_module.gsm_spy_monitor_process:
- # Check if process died
- if process.poll() is not None:
- logger.info(f"Monitor process exited (code: {process.returncode})")
- break
-
- # Send periodic heartbeat so frontend knows monitor is alive
- now = time.time()
- if now - last_heartbeat >= 5:
- last_heartbeat = now
- elapsed = int(now - monitor_start_time)
- try:
- app_module.gsm_spy_queue.put_nowait({
- 'type': 'monitor_heartbeat',
- 'elapsed': elapsed,
- 'packets': packets_captured,
- 'devices': len(app_module.gsm_spy_devices)
- })
- except queue.Full:
- pass
- # Periodic diagnostic: how many raw lines vs parsed
- if lines_received > 0 or elapsed % 30 == 0:
- logger.info(
- f"Monitor stats: {lines_received} tshark lines received, "
- f"{packets_captured} parsed, fields={field_order}"
- )
-
- # Get output from queue with timeout
- try:
- msg_type, line = output_queue_local.get(timeout=1.0)
- except queue.Empty:
- continue # Timeout, check flag again
-
- if msg_type == 'eof':
- break # EOF
-
- lines_received += 1
- # Log first 5 raw lines and then every 100th for diagnostics
- if lines_received <= 5 or lines_received % 100 == 0:
- logger.debug(f"tshark raw line #{lines_received}: {line.rstrip()!r}")
-
- parsed = parse_tshark_output(line, field_order)
- if parsed:
- packets_captured += 1
-
- # Store in DataStore
- key = parsed.get('tmsi') or parsed.get('imsi') or str(time.time())
- app_module.gsm_spy_devices[key] = parsed
-
- # Queue for SSE stream
- try:
- app_module.gsm_spy_queue.put_nowait(parsed)
- except queue.Full:
- pass
-
- # Store in database for historical analysis
- try:
- with get_db() as conn:
- # gsm_signals table
- conn.execute('''
- INSERT INTO gsm_signals
- (imsi, tmsi, lac, cid, ta_value, arfcn)
- VALUES (?, ?, ?, ?, ?, ?)
- ''', (
- parsed.get('imsi'),
- parsed.get('tmsi'),
- parsed.get('lac'),
- parsed.get('cid'),
- parsed.get('ta_value'),
- app_module.gsm_spy_selected_arfcn
- ))
-
- # gsm_tmsi_log table for crowd density
- if parsed.get('tmsi'):
- conn.execute('''
- INSERT INTO gsm_tmsi_log
- (tmsi, lac, cid, ta_value)
- VALUES (?, ?, ?, ?)
- ''', (
- parsed.get('tmsi'),
- parsed.get('lac'),
- parsed.get('cid'),
- parsed.get('ta_value')
- ))
-
- # Velocity calculation (G-08)
- device_id = parsed.get('imsi') or parsed.get('tmsi')
- if device_id and parsed.get('ta_value') is not None:
- # Get previous TA reading
- prev_reading = conn.execute('''
- SELECT ta_value, cid, timestamp
- FROM gsm_signals
- WHERE (imsi = ? OR tmsi = ?)
- ORDER BY timestamp DESC
- LIMIT 1 OFFSET 1
- ''', (device_id, device_id)).fetchone()
-
- if prev_reading:
- # Calculate velocity
- curr_ta = parsed.get('ta_value')
- prev_ta = prev_reading['ta_value']
- curr_distance = curr_ta * config.GSM_TA_METERS_PER_UNIT
- prev_distance = prev_ta * config.GSM_TA_METERS_PER_UNIT
- distance_change = abs(curr_distance - prev_distance)
-
- # Time difference
- prev_time = datetime.fromisoformat(prev_reading['timestamp'])
- curr_time = datetime.now()
- time_diff_seconds = (curr_time - prev_time).total_seconds()
-
- if time_diff_seconds > 0:
- velocity = distance_change / time_diff_seconds
-
- # Store velocity
- conn.execute('''
- INSERT INTO gsm_velocity_log
- (device_id, prev_ta, curr_ta, prev_cid, curr_cid, estimated_velocity)
- VALUES (?, ?, ?, ?, ?, ?)
- ''', (
- device_id,
- prev_ta,
- curr_ta,
- prev_reading['cid'],
- parsed.get('cid'),
- velocity
- ))
-
- conn.commit()
- except Exception as e:
- logger.error(f"Error storing device data: {e}")
-
- # Thread-safe counter
- with app_module.gsm_spy_lock:
- gsm_devices_tracked += 1
-
- except Exception as e:
- logger.error(f"Monitor thread error: {e}", exc_info=True)
-
- finally:
- # Reap process with timeout
- try:
- if process.poll() is None:
- process.terminate()
- try:
- process.wait(timeout=5)
- except subprocess.TimeoutExpired:
- logger.warning("Monitor process didn't terminate, killing")
- process.kill()
- process.wait()
- else:
- process.wait()
- logger.info(f"Monitor process exited with code {process.returncode}")
- except Exception as e:
- logger.error(f"Error reaping monitor process: {e}")
-
- logger.info("Monitor thread terminated")
diff --git a/setup.sh b/setup.sh
index 3e0f210..f91e2b0 100755
--- a/setup.sh
+++ b/setup.sh
@@ -243,12 +243,6 @@ check_tools() {
check_required "hcitool" "Bluetooth scan utility" hcitool
check_required "hciconfig" "Bluetooth adapter config" hciconfig
- echo
- info "GSM Intelligence:"
- check_recommended "grgsm_scanner" "GSM tower scanner (gr-gsm)" grgsm_scanner
- check_recommended "grgsm_livemon" "GSM live monitor (gr-gsm)" grgsm_livemon
- check_recommended "tshark" "Packet analysis (Wireshark)" tshark
-
echo
info "SoapySDR:"
check_required "SoapySDRUtil" "SoapySDR CLI utility" SoapySDRUtil
@@ -713,47 +707,6 @@ install_macos_packages() {
progress "Installing gpsd"
brew_install gpsd
- # gr-gsm for GSM Intelligence
- progress "Installing gr-gsm"
- if ! cmd_exists grgsm_scanner; then
- brew_install gnuradio
- (brew_install gr-gsm) || {
- warn "gr-gsm not available in Homebrew, building from source..."
- (
- tmp_dir="$(mktemp -d)"
- trap 'rm -rf "$tmp_dir"' EXIT
-
- info "Cloning gr-gsm repository..."
- git clone --depth 1 https://github.com/bkerler/gr-gsm.git "$tmp_dir/gr-gsm" >/dev/null 2>&1 \
- || { warn "Failed to clone gr-gsm. GSM Spy feature will not work."; exit 1; }
-
- cd "$tmp_dir/gr-gsm"
- mkdir -p build && cd build
- info "Compiling gr-gsm (this may take several minutes)..."
- if cmake .. >/dev/null 2>&1 && make -j$(sysctl -n hw.ncpu) >/dev/null 2>&1; then
- if [[ -w /usr/local/lib ]]; then
- make install >/dev/null 2>&1
- else
- sudo make install >/dev/null 2>&1
- fi
- ok "gr-gsm installed successfully from source"
- else
- warn "Failed to build gr-gsm. GSM Spy feature will not work."
- fi
- )
- }
- else
- ok "gr-gsm already installed"
- fi
-
- # Wireshark (tshark) for GSM packet analysis
- progress "Installing tshark"
- if ! cmd_exists tshark; then
- brew_install wireshark
- else
- ok "tshark already installed"
- fi
-
progress "Installing Ubertooth tools (optional)"
if ! cmd_exists ubertooth-btle; then
echo
@@ -1164,82 +1117,6 @@ install_debian_packages() {
progress "Installing gpsd"
apt_install gpsd gpsd-clients || true
- # gr-gsm for GSM Intelligence
- progress "Installing GNU Radio and gr-gsm"
- if ! cmd_exists grgsm_scanner; then
- # Try to install gr-gsm directly from package repositories
- apt_install gnuradio gnuradio-dev gr-osmosdr gr-gsm || {
- warn "gr-gsm package not available in repositories. Attempting source build..."
-
- # Fallback: Build from source
- progress "Building gr-gsm from source"
- apt_install git cmake libboost-all-dev libcppunit-dev swig \
- doxygen liblog4cpp5-dev python3-scipy python3-numpy \
- libvolk-dev libuhd-dev libfftw3-dev || true
-
- info "Cloning gr-gsm repository..."
- if [ -d /tmp/gr-gsm ]; then
- rm -rf /tmp/gr-gsm
- fi
-
- git clone https://github.com/bkerler/gr-gsm.git /tmp/gr-gsm || {
- warn "Failed to clone gr-gsm repository. GSM Spy will not be available."
- return 0
- }
-
- cd /tmp/gr-gsm
- mkdir -p build && cd build
-
- # Try to find GNU Radio cmake files
- if [ -d /usr/lib/x86_64-linux-gnu/cmake/gnuradio ]; then
- export CMAKE_PREFIX_PATH="/usr/lib/x86_64-linux-gnu/cmake/gnuradio:$CMAKE_PREFIX_PATH"
- fi
-
- info "Running CMake configuration..."
- if cmake .. 2>/dev/null; then
- info "Compiling gr-gsm (this may take several minutes)..."
- if make -j$(nproc) 2>/dev/null; then
- $SUDO make install
- $SUDO ldconfig
- cd ~
- rm -rf /tmp/gr-gsm
- ok "gr-gsm built and installed successfully"
- else
- warn "gr-gsm compilation failed. GSM Spy feature will not work."
- cd ~
- rm -rf /tmp/gr-gsm
- fi
- else
- warn "gr-gsm CMake configuration failed. GNU Radio 3.8+ may not be available."
- cd ~
- rm -rf /tmp/gr-gsm
- fi
- }
-
- # Verify installation
- if cmd_exists grgsm_scanner; then
- ok "gr-gsm installed successfully"
- else
- warn "gr-gsm installation incomplete. GSM Spy feature will not work."
- fi
- else
- ok "gr-gsm already installed"
- fi
-
- # Wireshark (tshark) for GSM packet analysis
- progress "Installing tshark"
- if ! cmd_exists tshark; then
- # Pre-accept non-root capture prompt for non-interactive install
- echo 'wireshark-common wireshark-common/install-setuid boolean true' | $SUDO debconf-set-selections
- apt_install tshark || true
- # Allow non-root capture
- $SUDO dpkg-reconfigure wireshark-common 2>/dev/null || true
- $SUDO usermod -a -G wireshark $USER 2>/dev/null || true
- ok "tshark installed. You may need to re-login for wireshark group permissions."
- else
- ok "tshark already installed"
- fi
-
progress "Installing Python packages"
apt_install python3-venv python3-pip || true
# Install Python packages via apt (more reliable than pip on modern Debian/Ubuntu)
@@ -1327,7 +1204,7 @@ final_summary_and_hard_fail() {
warn "Missing RECOMMENDED tools (some features will not work):"
for t in "${missing_recommended[@]}"; do echo " - $t"; done
echo
- warn "Install these for full functionality (GSM Intelligence, etc.)"
+ warn "Install these for full functionality"
fi
}
@@ -1376,7 +1253,7 @@ main() {
install_python_deps
- # Download leaflet-heat plugin for GSM heatmap (offline mode)
+ # Download leaflet-heat plugin (offline mode)
if [ ! -f "static/vendor/leaflet-heat/leaflet-heat.js" ]; then
info "Downloading leaflet-heat plugin..."
mkdir -p static/vendor/leaflet-heat
diff --git a/static/js/core/settings-manager.js b/static/js/core/settings-manager.js
index 69ed89b..35649c2 100644
--- a/static/js/core/settings-manager.js
+++ b/static/js/core/settings-manager.js
@@ -946,32 +946,9 @@ function loadApiKeyStatus() {
if (!badge) return;
- fetch('/gsm_spy/settings/api_key')
- .then(r => r.json())
- .then(data => {
- if (data.configured) {
- badge.textContent = 'Configured';
- badge.className = 'asset-badge available';
- desc.textContent = 'Source: ' + (data.source === 'env' ? 'Environment variable' : 'Database');
- } else {
- badge.textContent = 'Not configured';
- badge.className = 'asset-badge missing';
- desc.textContent = 'No API key set';
- }
- if (usage) {
- usage.textContent = (data.usage_today || 0) + ' / ' + (data.api_limit || 1000);
- }
- if (bar) {
- const pct = Math.min(100, ((data.usage_today || 0) / (data.api_limit || 1000)) * 100);
- bar.style.width = pct + '%';
- bar.style.background = pct > 90 ? 'var(--accent-red)' : pct > 70 ? 'var(--accent-yellow)' : 'var(--accent-cyan)';
- }
- })
- .catch(() => {
- badge.textContent = 'Error';
- badge.className = 'asset-badge missing';
- desc.textContent = 'Could not load status';
- });
+ badge.textContent = 'Not available';
+ badge.className = 'asset-badge missing';
+ desc.textContent = 'GSM feature removed';
}
/**
@@ -994,30 +971,8 @@ function saveApiKey() {
result.style.color = 'var(--text-dim)';
result.textContent = 'Saving...';
- fetch('/gsm_spy/settings/api_key', {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({ key: key })
- })
- .then(r => r.json())
- .then(data => {
- if (data.error) {
- result.style.color = 'var(--accent-red)';
- result.textContent = data.error;
- } else {
- result.style.color = 'var(--accent-green)';
- result.textContent = 'API key saved successfully.';
- input.value = '';
- loadApiKeyStatus();
- // Hide the banner if visible
- const banner = document.getElementById('apiKeyBanner');
- if (banner) banner.style.display = 'none';
- }
- })
- .catch(() => {
- result.style.color = 'var(--accent-red)';
- result.textContent = 'Error saving API key.';
- });
+ result.style.color = 'var(--accent-red)';
+ result.textContent = 'GSM feature has been removed.';
}
/**
diff --git a/templates/gsm_spy_dashboard.html b/templates/gsm_spy_dashboard.html
deleted file mode 100644
index 85b12a2..0000000
--- a/templates/gsm_spy_dashboard.html
+++ /dev/null
@@ -1,3300 +0,0 @@
-
-
-
-
-
- GSM SPY // INTERCEPT - See the Invisible
-
- {% if offline_settings.fonts_source == 'local' %}
-
- {% else %}
-
- {% endif %}
-
- {% if offline_settings.assets_source == 'local' %}
-
-
- {% else %}
-
-
- {% endif %}
-
- {% if offline_settings.assets_source == 'local' %}
-
- {% else %}
-
- {% endif %}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- {% set active_mode = 'gsm' %}
- {% include 'partials/nav.html' with context %}
-
-
-
-
- {% include 'partials/settings-modal.html' %}
-
-
-
-
-
- 0
- TOWERS
-
-
- 0
- DEVICES
-
-
- 0
- ROGUES
-
-
- 0
- SIGNALS
-
-
- -
- CROWD
-
-
-
-
--:--:-- UTC
-
- Heatmap: OFF
-
-
- Analytics Overview
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Track device movement by analyzing Timing Advance transitions and cell handovers.
- Estimates velocity and direction based on TA delta and cell sector patterns.
-
-
-
-
-
- km/h
-
Avg Velocity
-
-
-
-
-
-
-
-
- Aggregate TMSI pings per cell sector to estimate crowd density.
- Visualizes hotspots and congestion patterns across towers.
-
-
-
-
-
-
-
-
- Analyze 60-day historical data to identify recurring patterns in device behavior.
- Detects work locations, commute routes, and daily routines.
-
-
-
-
-
-
-
-
- Validate neighbor cell lists against expected network topology.
- Detects inconsistencies that may indicate rogue towers.
-
-
-
-
-
-
-
-
- Correlate uplink and downlink timing to identify communication patterns.
- Maps device-to-device interactions and network flows.
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- MONITORING
- ARFCN ---
- 00:00
-
-
- 0 packets
-
- 0 devices
-
- Listening...
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
GSM SCANNER
-
-
- Americas
- Europe
- Asia
-
-
-
-
- START
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/templates/index.html b/templates/index.html
index 373005e..7d635ef 100644
--- a/templates/index.html
+++ b/templates/index.html
@@ -171,10 +171,6 @@
Vessels
-
-
- GSM SPY
-
APRS
diff --git a/templates/partials/nav.html b/templates/partials/nav.html
index cd8a915..42c942d 100644
--- a/templates/partials/nav.html
+++ b/templates/partials/nav.html
@@ -67,7 +67,6 @@
{{ mode_item('rtlamr', 'Meters', ' ') }}
{{ mode_item('adsb', 'Aircraft', ' ', '/adsb/dashboard') }}
{{ mode_item('ais', 'Vessels', ' ', '/ais/dashboard') }}
- {{ mode_item('gsm', 'GSM SPY', ' ', '/gsm_spy/dashboard') }}
{{ mode_item('aprs', 'APRS', ' ') }}
{{ mode_item('listening', 'Listening Post', ' ') }}
{{ mode_item('spystations', 'Spy Stations', ' ') }}
diff --git a/templates/partials/settings-modal.html b/templates/partials/settings-modal.html
index ca890ab..b9c951b 100644
--- a/templates/partials/settings-modal.html
+++ b/templates/partials/settings-modal.html
@@ -17,7 +17,6 @@
Tools
Alerts
Recording
- API Keys
About
@@ -360,70 +359,6 @@
-
-
-
-
OpenCellID API Key
-
- Required for GSM cell tower geolocation. Get a free key at
- opencellid.org/register
- (1,000 lookups/day).
-
-
-
-
- Status
- Checking...
-
-
Checking...
-
-
-
-
- API Key
- Paste your OpenCellID API token
-
-
-
-
-
-
- Save Key
-
-
-
-
-
-
-
-
Usage Today
-
-
- API Calls
- -- / --
-
-
-
-
-
-
- Note: The environment variable INTERCEPT_GSM_OPENCELLID_API_KEY takes priority over the saved key.
- Keys saved here persist across restarts.
-
-
-
diff --git a/tests/test_gsm_spy.py b/tests/test_gsm_spy.py
deleted file mode 100644
index 797f794..0000000
--- a/tests/test_gsm_spy.py
+++ /dev/null
@@ -1,360 +0,0 @@
-"""Unit tests for GSM Spy parsing and validation functions."""
-
-import pytest
-from routes.gsm_spy import (
- parse_grgsm_scanner_output,
- parse_tshark_output,
- arfcn_to_frequency,
- validate_band_names,
- REGIONAL_BANDS
-)
-
-
-class TestParseGrgsmScannerOutput:
- """Tests for parse_grgsm_scanner_output()."""
-
- def test_valid_output_line(self):
- """Test parsing a valid grgsm_scanner output line."""
- line = "ARFCN: 23, Freq: 940.6M, CID: 31245, LAC: 1234, MCC: 214, MNC: 01, Pwr: -48"
- result = parse_grgsm_scanner_output(line)
-
- assert result is not None
- assert result['type'] == 'tower'
- assert result['arfcn'] == 23
- assert result['frequency'] == 940.6
- assert result['cid'] == 31245
- assert result['lac'] == 1234
- assert result['mcc'] == 214
- assert result['mnc'] == 1
- assert result['signal_strength'] == -48.0
- assert 'timestamp' in result
-
- def test_freq_without_suffix(self):
- """Test parsing frequency without M suffix."""
- line = "ARFCN: 975, Freq: 925.2, CID: 13522, LAC: 38722, MCC: 262, MNC: 1, Pwr: -58"
- result = parse_grgsm_scanner_output(line)
- assert result is not None
- assert result['frequency'] == 925.2
-
- def test_config_line(self):
- """Test that configuration lines are skipped."""
- line = " Configuration: 1 CCCH, not combined"
- result = parse_grgsm_scanner_output(line)
- assert result is None
-
- def test_neighbour_line(self):
- """Test that neighbour cell lines are skipped."""
- line = " Neighbour Cells: 57, 61, 70, 71, 72, 86"
- result = parse_grgsm_scanner_output(line)
- assert result is None
-
- def test_cell_arfcn_line(self):
- """Test that cell ARFCN lines are skipped."""
- line = " Cell ARFCNs: 63, 76"
- result = parse_grgsm_scanner_output(line)
- assert result is None
-
- def test_progress_line(self):
- """Test that progress/status lines are skipped."""
- line = "Scanning GSM900 band..."
- result = parse_grgsm_scanner_output(line)
- assert result is None
-
- def test_empty_line(self):
- """Test handling of empty lines."""
- result = parse_grgsm_scanner_output("")
- assert result is None
-
- def test_invalid_data(self):
- """Test handling of non-numeric values."""
- line = "ARFCN: abc, Freq: xyz, CID: bad, LAC: data, MCC: bad, MNC: bad, Pwr: bad"
- result = parse_grgsm_scanner_output(line)
- assert result is None
-
- def test_no_identity_filtered(self):
- """Test that MCC=0/MNC=0 entries (no network identity) are filtered out."""
- line = "ARFCN: 115, Freq: 925.0M, CID: 0, LAC: 0, MCC: 0, MNC: 0, Pwr: -100"
- result = parse_grgsm_scanner_output(line)
- assert result is None
-
- def test_mcc_zero_mnc_zero_filtered(self):
- """Test that MCC=0/MNC=0 even with valid CID is filtered out."""
- line = "ARFCN: 113, Freq: 924.6M, CID: 1234, LAC: 5678, MCC: 0, MNC: 0, Pwr: -90"
- result = parse_grgsm_scanner_output(line)
- assert result is None
-
- def test_cid_zero_valid_mcc_passes(self):
- """Test that CID=0 with valid MCC/MNC passes (partially decoded cell)."""
- line = "ARFCN: 115, Freq: 958.0M, CID: 0, LAC: 21864, MCC: 234, MNC: 10, Pwr: -51"
- result = parse_grgsm_scanner_output(line)
- assert result is not None
- assert result['cid'] == 0
- assert result['mcc'] == 234
- assert result['signal_strength'] == -51.0
-
- def test_valid_cid_nonzero(self):
- """Test that valid non-zero CID/MCC entries pass through."""
- line = "ARFCN: 115, Freq: 925.0M, CID: 19088, LAC: 21864, MCC: 234, MNC: 10, Pwr: -58"
- result = parse_grgsm_scanner_output(line)
- assert result is not None
- assert result['cid'] == 19088
- assert result['signal_strength'] == -58.0
-
-
-class TestParseTsharkOutput:
- """Tests for parse_tshark_output()."""
-
- def test_valid_full_output(self):
- """Test parsing tshark output with all fields."""
- line = "5\t0xABCD1234\t123456789012345\t1234\t31245"
- result = parse_tshark_output(line)
-
- assert result is not None
- assert result['type'] == 'device'
- assert result['ta_value'] == 5
- assert result['tmsi'] == '0xABCD1234'
- assert result['imsi'] == '123456789012345'
- assert result['lac'] == 1234
- assert result['cid'] == 31245
- assert result['distance_meters'] == 5 * 554 # TA * 554 meters
- assert 'timestamp' in result
-
- def test_missing_optional_fields(self):
- """Test parsing with missing optional fields (empty tabs).
-
- A packet with TA but no TMSI/IMSI is discarded since there's
- no device identifier to track.
- """
- line = "3\t\t\t1234\t31245"
- result = parse_tshark_output(line)
- assert result is None
-
- def test_missing_optional_fields_with_tmsi(self):
- """Test parsing with TMSI but missing TA, IMSI, CID."""
- line = "\t0xABCD\t\t1234\t"
- result = parse_tshark_output(line)
-
- assert result is not None
- assert result['ta_value'] is None
- assert result['tmsi'] == '0xABCD'
- assert result['imsi'] is None
- assert result['lac'] == 1234
- assert result['cid'] is None
-
- def test_no_ta_value(self):
- """Test parsing without TA value (empty first field)."""
- line = "\t0xABCD1234\t123456789012345\t1234\t31245"
- result = parse_tshark_output(line)
-
- assert result is not None
- assert result['ta_value'] is None
- assert result['tmsi'] == '0xABCD1234'
- assert result['imsi'] == '123456789012345'
- assert result['lac'] == 1234
- assert result['cid'] == 31245
-
- def test_invalid_line(self):
- """Test handling of invalid tshark output."""
- line = "invalid data"
- result = parse_tshark_output(line)
- assert result is None
-
- def test_empty_line(self):
- """Test handling of empty lines."""
- result = parse_tshark_output("")
- assert result is None
-
- def test_partial_fields(self):
- """Test with fewer than 5 fields."""
- line = "5\t0xABCD1234" # Only 2 fields
- result = parse_tshark_output(line)
- assert result is None
-
-
-class TestArfcnToFrequency:
- """Tests for arfcn_to_frequency()."""
-
- def test_gsm850_arfcn(self):
- """Test ARFCN in GSM850 band."""
- # GSM850: ARFCN 128-251, 869-894 MHz
- arfcn = 128
- freq = arfcn_to_frequency(arfcn)
- assert freq == 869000000 # 869 MHz
-
- arfcn = 251
- freq = arfcn_to_frequency(arfcn)
- assert freq == 893600000 # 893.6 MHz
-
- def test_egsm900_arfcn(self):
- """Test ARFCN in EGSM900 band."""
- # EGSM900: ARFCN 0-124, DL = 935 + 0.2*ARFCN MHz
- arfcn = 0
- freq = arfcn_to_frequency(arfcn)
- assert freq == 935000000 # 935.0 MHz
-
- arfcn = 22
- freq = arfcn_to_frequency(arfcn)
- assert freq == 939400000 # 939.4 MHz
-
- arfcn = 124
- freq = arfcn_to_frequency(arfcn)
- assert freq == 959800000 # 959.8 MHz
-
- def test_egsm900_ext_arfcn(self):
- """Test ARFCN in EGSM900 extension band."""
- # EGSM900_EXT: ARFCN 975-1023, DL = 925.2 + 0.2*(ARFCN-975) MHz
- arfcn = 975
- freq = arfcn_to_frequency(arfcn)
- assert freq == 925200000 # 925.2 MHz
-
- arfcn = 1023
- freq = arfcn_to_frequency(arfcn)
- assert freq == 934800000 # 934.8 MHz
-
- def test_dcs1800_arfcn(self):
- """Test ARFCN in DCS1800 band."""
- # DCS1800: ARFCN 512-885, 1805-1880 MHz
- # Note: ARFCN 512 also exists in PCS1900 and will match that first
- # Use ARFCN 811+ which is only in DCS1800
- arfcn = 811 # Beyond PCS1900 range (512-810)
- freq = arfcn_to_frequency(arfcn)
- # 811 is ARFCN offset from 512, so freq = 1805MHz + (811-512)*200kHz
- expected = 1805000000 + (811 - 512) * 200000
- assert freq == expected
-
- arfcn = 885
- freq = arfcn_to_frequency(arfcn)
- assert freq == 1879600000 # 1879.6 MHz
-
- def test_pcs1900_arfcn(self):
- """Test ARFCN in PCS1900 band."""
- # PCS1900: ARFCN 512-810, 1930-1990 MHz
- # Note: overlaps with DCS1800 ARFCN range, but different frequencies
- arfcn = 512
- freq = arfcn_to_frequency(arfcn)
- # Will match first band (DCS1800 in Europe config)
- assert freq > 0
-
- def test_invalid_arfcn(self):
- """Test ARFCN outside known ranges."""
- with pytest.raises(ValueError, match="not found in any known GSM band"):
- arfcn_to_frequency(9999)
-
- with pytest.raises(ValueError):
- arfcn_to_frequency(-1)
-
- def test_arfcn_200khz_spacing(self):
- """Test that ARFCNs are 200kHz apart."""
- arfcn1 = 128
- arfcn2 = 129
- freq1 = arfcn_to_frequency(arfcn1)
- freq2 = arfcn_to_frequency(arfcn2)
- assert freq2 - freq1 == 200000 # 200 kHz
-
-
-class TestValidateBandNames:
- """Tests for validate_band_names()."""
-
- def test_valid_americas_bands(self):
- """Test valid band names for Americas region."""
- bands = ['GSM850', 'PCS1900']
- result, error = validate_band_names(bands, 'Americas')
- assert result == bands
- assert error is None
-
- def test_valid_europe_bands(self):
- """Test valid band names for Europe region."""
- # Note: Europe uses EGSM900, not GSM900
- bands = ['EGSM900', 'DCS1800', 'GSM850', 'GSM800']
- result, error = validate_band_names(bands, 'Europe')
- assert result == bands
- assert error is None
-
- def test_valid_asia_bands(self):
- """Test valid band names for Asia region."""
- # Note: Asia uses EGSM900, not GSM900
- bands = ['EGSM900', 'DCS1800']
- result, error = validate_band_names(bands, 'Asia')
- assert result == bands
- assert error is None
-
- def test_invalid_band_for_region(self):
- """Test invalid band name for a region."""
- bands = ['GSM900', 'INVALID_BAND']
- result, error = validate_band_names(bands, 'Americas')
- assert result == []
- assert error is not None
- assert 'Invalid bands' in error
- assert 'INVALID_BAND' in error
-
- def test_invalid_region(self):
- """Test invalid region name."""
- bands = ['GSM900']
- result, error = validate_band_names(bands, 'InvalidRegion')
- assert result == []
- assert error is not None
- assert 'Invalid region' in error
-
- def test_empty_bands_list(self):
- """Test with empty bands list."""
- result, error = validate_band_names([], 'Americas')
- assert result == []
- assert error is None
-
- def test_single_valid_band(self):
- """Test with single valid band."""
- bands = ['GSM850']
- result, error = validate_band_names(bands, 'Americas')
- assert result == ['GSM850']
- assert error is None
-
- def test_case_sensitive_band_names(self):
- """Test that band names are case-sensitive."""
- bands = ['gsm850'] # lowercase
- result, error = validate_band_names(bands, 'Americas')
- assert result == []
- assert error is not None
-
- def test_multiple_invalid_bands(self):
- """Test with multiple invalid bands."""
- bands = ['INVALID1', 'GSM850', 'INVALID2']
- result, error = validate_band_names(bands, 'Americas')
- assert result == []
- assert error is not None
- assert 'INVALID1' in error
- assert 'INVALID2' in error
-
-
-class TestRegionalBandsConfig:
- """Tests for REGIONAL_BANDS configuration."""
-
- def test_all_regions_defined(self):
- """Test that all expected regions are defined."""
- assert 'Americas' in REGIONAL_BANDS
- assert 'Europe' in REGIONAL_BANDS
- assert 'Asia' in REGIONAL_BANDS
-
- def test_all_bands_have_required_fields(self):
- """Test that all bands have required configuration fields."""
- for region, bands in REGIONAL_BANDS.items():
- for band_name, band_config in bands.items():
- assert 'start' in band_config
- assert 'end' in band_config
- assert 'arfcn_start' in band_config
- assert 'arfcn_end' in band_config
-
- def test_frequency_ranges_valid(self):
- """Test that frequency ranges are positive and start < end."""
- for region, bands in REGIONAL_BANDS.items():
- for band_name, band_config in bands.items():
- assert band_config['start'] > 0
- assert band_config['end'] > 0
- assert band_config['start'] < band_config['end']
-
- def test_arfcn_ranges_valid(self):
- """Test that ARFCN ranges are valid."""
- for region, bands in REGIONAL_BANDS.items():
- for band_name, band_config in bands.items():
- assert band_config['arfcn_start'] >= 0
- assert band_config['arfcn_end'] >= 0
- assert band_config['arfcn_start'] <= band_config['arfcn_end']
diff --git a/utils/constants.py b/utils/constants.py
index f51124e..2b5edff 100644
--- a/utils/constants.py
+++ b/utils/constants.py
@@ -275,13 +275,3 @@ MAX_DEAUTH_ALERTS_AGE_SECONDS = 300 # 5 minutes
# Deauth detector sniff timeout (seconds)
DEAUTH_SNIFF_TIMEOUT = 0.5
-
-# =============================================================================
-# GSM SPY (Cellular Intelligence)
-# =============================================================================
-
-# Maximum age for GSM tower/device data in DataStore (seconds)
-MAX_GSM_AGE_SECONDS = 300 # 5 minutes
-
-# Timing Advance conversion to meters
-GSM_TA_METERS_PER_UNIT = 554
diff --git a/utils/database.py b/utils/database.py
index 9e62d87..3211624 100644
--- a/utils/database.py
+++ b/utils/database.py
@@ -453,134 +453,6 @@ def init_db() -> None:
ON tscm_cases(status, created_at)
''')
- # =====================================================================
- # GSM (Global System for Mobile) Intelligence Tables
- # =====================================================================
-
- # gsm_cells - Known cell towers (OpenCellID cache)
- conn.execute('''
- CREATE TABLE IF NOT EXISTS gsm_cells (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- mcc INTEGER NOT NULL,
- mnc INTEGER NOT NULL,
- lac INTEGER NOT NULL,
- cid INTEGER NOT NULL,
- lat REAL,
- lon REAL,
- azimuth INTEGER,
- range_meters INTEGER,
- samples INTEGER,
- radio TEXT,
- operator TEXT,
- first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- last_verified TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- metadata TEXT,
- UNIQUE(mcc, mnc, lac, cid)
- )
- ''')
-
- # gsm_rogues - Detected rogue towers / IMSI catchers
- conn.execute('''
- CREATE TABLE IF NOT EXISTS gsm_rogues (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- arfcn INTEGER NOT NULL,
- mcc INTEGER,
- mnc INTEGER,
- lac INTEGER,
- cid INTEGER,
- signal_strength REAL,
- reason TEXT NOT NULL,
- threat_level TEXT DEFAULT 'medium',
- detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- location_lat REAL,
- location_lon REAL,
- acknowledged BOOLEAN DEFAULT 0,
- notes TEXT,
- metadata TEXT
- )
- ''')
-
- # gsm_signals - 60-day archive of signal observations
- conn.execute('''
- CREATE TABLE IF NOT EXISTS gsm_signals (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- imsi TEXT,
- tmsi TEXT,
- mcc INTEGER,
- mnc INTEGER,
- lac INTEGER,
- cid INTEGER,
- ta_value INTEGER,
- signal_strength REAL,
- arfcn INTEGER,
- timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- metadata TEXT
- )
- ''')
-
- # gsm_tmsi_log - 24-hour raw pings for crowd density
- conn.execute('''
- CREATE TABLE IF NOT EXISTS gsm_tmsi_log (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- tmsi TEXT NOT NULL,
- lac INTEGER,
- cid INTEGER,
- ta_value INTEGER,
- timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- ''')
-
- # gsm_velocity_log - 1-hour buffer for movement tracking
- conn.execute('''
- CREATE TABLE IF NOT EXISTS gsm_velocity_log (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- device_id TEXT NOT NULL,
- prev_ta INTEGER,
- curr_ta INTEGER,
- prev_cid INTEGER,
- curr_cid INTEGER,
- timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- estimated_velocity REAL,
- metadata TEXT
- )
- ''')
-
- # GSM indexes for performance
- conn.execute('''
- CREATE INDEX IF NOT EXISTS idx_gsm_cells_location
- ON gsm_cells(lat, lon)
- ''')
-
- conn.execute('''
- CREATE INDEX IF NOT EXISTS idx_gsm_cells_identity
- ON gsm_cells(mcc, mnc, lac, cid)
- ''')
-
- conn.execute('''
- CREATE INDEX IF NOT EXISTS idx_gsm_rogues_severity
- ON gsm_rogues(threat_level, detected_at)
- ''')
-
- conn.execute('''
- CREATE INDEX IF NOT EXISTS idx_gsm_signals_cell_time
- ON gsm_signals(cid, lac, timestamp)
- ''')
-
- conn.execute('''
- CREATE INDEX IF NOT EXISTS idx_gsm_signals_device
- ON gsm_signals(imsi, tmsi, timestamp)
- ''')
-
- conn.execute('''
- CREATE INDEX IF NOT EXISTS idx_gsm_tmsi_log_time
- ON gsm_tmsi_log(timestamp)
- ''')
-
- conn.execute('''
- CREATE INDEX IF NOT EXISTS idx_gsm_velocity_log_device
- ON gsm_velocity_log(device_id, timestamp)
- ''')
-
# =====================================================================
# DSC (Digital Selective Calling) Tables
# =====================================================================
@@ -2298,60 +2170,3 @@ def cleanup_old_payloads(max_age_hours: int = 24) -> int:
''', (f'-{max_age_hours} hours',))
return cursor.rowcount
-
-# =============================================================================
-# GSM Cleanup Functions
-# =============================================================================
-
-def cleanup_old_gsm_signals(max_age_days: int = 60) -> int:
- """
- Remove old GSM signal observations (60-day archive).
-
- Args:
- max_age_days: Maximum age in days (default: 60)
-
- Returns:
- Number of deleted entries
- """
- with get_db() as conn:
- cursor = conn.execute('''
- DELETE FROM gsm_signals
- WHERE timestamp < datetime('now', ?)
- ''', (f'-{max_age_days} days',))
- return cursor.rowcount
-
-
-def cleanup_old_gsm_tmsi_log(max_age_hours: int = 24) -> int:
- """
- Remove old TMSI log entries (24-hour buffer for crowd density).
-
- Args:
- max_age_hours: Maximum age in hours (default: 24)
-
- Returns:
- Number of deleted entries
- """
- with get_db() as conn:
- cursor = conn.execute('''
- DELETE FROM gsm_tmsi_log
- WHERE timestamp < datetime('now', ?)
- ''', (f'-{max_age_hours} hours',))
- return cursor.rowcount
-
-
-def cleanup_old_gsm_velocity_log(max_age_hours: int = 1) -> int:
- """
- Remove old velocity log entries (1-hour buffer for movement tracking).
-
- Args:
- max_age_hours: Maximum age in hours (default: 1)
-
- Returns:
- Number of deleted entries
- """
- with get_db() as conn:
- cursor = conn.execute('''
- DELETE FROM gsm_velocity_log
- WHERE timestamp < datetime('now', ?)
- ''', (f'-{max_age_hours} hours',))
- return cursor.rowcount
diff --git a/utils/dependencies.py b/utils/dependencies.py
index e6a1bee..a12eca7 100644
--- a/utils/dependencies.py
+++ b/utils/dependencies.py
@@ -444,38 +444,6 @@ TOOL_DEPENDENCIES = {
}
}
},
- 'gsm': {
- 'name': 'GSM Intelligence',
- 'tools': {
- 'grgsm_scanner': {
- 'required': True,
- 'description': 'gr-gsm scanner for finding GSM towers',
- 'install': {
- 'apt': 'Build gr-gsm from source: https://github.com/bkerler/gr-gsm',
- 'brew': 'brew install gr-gsm (may require manual build)',
- 'manual': 'https://github.com/bkerler/gr-gsm'
- }
- },
- 'grgsm_livemon': {
- 'required': True,
- 'description': 'gr-gsm live monitor for decoding GSM signals',
- 'install': {
- 'apt': 'Included with gr-gsm package',
- 'brew': 'Included with gr-gsm',
- 'manual': 'Included with gr-gsm'
- }
- },
- 'tshark': {
- 'required': True,
- 'description': 'Wireshark CLI for parsing GSM packets',
- 'install': {
- 'apt': 'sudo apt-get install tshark',
- 'brew': 'brew install wireshark',
- 'manual': 'https://www.wireshark.org/download.html'
- }
- }
- }
- }
}
diff --git a/utils/gsm_geocoding.py b/utils/gsm_geocoding.py
deleted file mode 100644
index feaf164..0000000
--- a/utils/gsm_geocoding.py
+++ /dev/null
@@ -1,226 +0,0 @@
-"""GSM Cell Tower Geocoding Service.
-
-Provides hybrid cache-first geocoding with async API fallback for cell towers.
-"""
-
-from __future__ import annotations
-
-import logging
-import queue
-from typing import Any
-
-import requests
-
-import config
-from utils.database import get_db
-
-logger = logging.getLogger('intercept.gsm_geocoding')
-
-# Queue for pending geocoding requests
-_geocoding_queue = queue.Queue(maxsize=100)
-
-
-def lookup_cell_coordinates(mcc: int, mnc: int, lac: int, cid: int) -> dict[str, Any] | None:
- """
- Lookup cell tower coordinates with cache-first strategy.
-
- Strategy:
- 1. Check gsm_cells table (cache) - fast synchronous lookup
- 2. If not found, return None (caller decides whether to use API)
-
- Args:
- mcc: Mobile Country Code
- mnc: Mobile Network Code
- lac: Location Area Code
- cid: Cell ID
-
- Returns:
- dict with keys: lat, lon, source='cache', azimuth (optional),
- range_meters (optional), operator (optional), radio (optional)
- Returns None if not found in cache.
- """
- try:
- with get_db() as conn:
- result = conn.execute('''
- SELECT lat, lon, azimuth, range_meters, operator, radio
- FROM gsm_cells
- WHERE mcc = ? AND mnc = ? AND lac = ? AND cid = ?
- ''', (mcc, mnc, lac, cid)).fetchone()
-
- if result and result['lat'] is not None and result['lon'] is not None:
- return {
- 'lat': result['lat'],
- 'lon': result['lon'],
- 'source': 'cache',
- 'azimuth': result['azimuth'],
- 'range_meters': result['range_meters'],
- 'operator': result['operator'],
- 'radio': result['radio']
- }
-
- return None
-
- except Exception as e:
- logger.error(f"Error looking up coordinates from cache: {e}")
- return None
-
-
-def _get_api_key() -> str:
- """Get OpenCellID API key at runtime (env var first, then database)."""
- env_key = config.GSM_OPENCELLID_API_KEY
- if env_key:
- return env_key
- from utils.database import get_setting
- return get_setting('gsm.opencellid.api_key', '')
-
-
-def lookup_cell_from_api(mcc: int, mnc: int, lac: int, cid: int) -> dict[str, Any] | None:
- """
- Lookup cell tower from OpenCellID API and cache result.
-
- Args:
- mcc: Mobile Country Code
- mnc: Mobile Network Code
- lac: Location Area Code
- cid: Cell ID
-
- Returns:
- dict with keys: lat, lon, source='api', azimuth (optional),
- range_meters (optional), operator (optional), radio (optional)
- Returns None if API call fails or cell not found.
- """
- try:
- api_key = _get_api_key()
- if not api_key:
- logger.warning("OpenCellID API key not configured")
- return None
-
- api_url = config.GSM_OPENCELLID_API_URL
- params = {
- 'key': api_key,
- 'mcc': mcc,
- 'mnc': mnc,
- 'lac': lac,
- 'cellid': cid,
- 'format': 'json'
- }
-
- response = requests.get(api_url, params=params, timeout=10)
-
- if response.status_code == 200:
- cell_data = response.json()
- lat = cell_data.get('lat')
- lon = cell_data.get('lon')
-
- # Validate response has actual coordinates
- if lat is None or lon is None:
- logger.warning(
- f"OpenCellID API returned 200 but no coordinates for "
- f"MCC={mcc} MNC={mnc} LAC={lac} CID={cid}: {cell_data}"
- )
- return None
-
- # Cache the result
- with get_db() as conn:
- conn.execute('''
- INSERT OR REPLACE INTO gsm_cells
- (mcc, mnc, lac, cid, lat, lon, azimuth, range_meters, samples, radio, operator, last_verified)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
- ''', (
- mcc, mnc, lac, cid,
- lat, lon,
- cell_data.get('azimuth'),
- cell_data.get('range'),
- cell_data.get('samples'),
- cell_data.get('radio'),
- cell_data.get('operator')
- ))
- conn.commit()
-
- logger.info(f"Cached cell tower from API: MCC={mcc} MNC={mnc} LAC={lac} CID={cid} -> ({lat}, {lon})")
-
- return {
- 'lat': lat,
- 'lon': lon,
- 'source': 'api',
- 'azimuth': cell_data.get('azimuth'),
- 'range_meters': cell_data.get('range'),
- 'operator': cell_data.get('operator'),
- 'radio': cell_data.get('radio')
- }
- else:
- logger.warning(
- f"OpenCellID API returned {response.status_code} for "
- f"MCC={mcc} MNC={mnc} LAC={lac} CID={cid}: {response.text[:200]}"
- )
- return None
-
- except Exception as e:
- logger.error(f"Error calling OpenCellID API: {e}")
- return None
-
-
-def enrich_tower_data(tower_data: dict[str, Any]) -> dict[str, Any]:
- """
- Enrich tower data with coordinates using cache-first strategy.
-
- If coordinates found in cache, adds them immediately.
- If not found, marks as 'pending' and queues for background API lookup.
-
- Args:
- tower_data: Dictionary with keys mcc, mnc, lac, cid (and other tower data)
-
- Returns:
- Enriched tower_data dict with added fields:
- - lat, lon (if found in cache)
- - status='pending' (if needs API lookup)
- - source='cache' (if from cache)
- """
- mcc = tower_data.get('mcc')
- mnc = tower_data.get('mnc')
- lac = tower_data.get('lac')
- cid = tower_data.get('cid')
-
- # Validate required fields
- if not all([mcc is not None, mnc is not None, lac is not None, cid is not None]):
- logger.warning(f"Tower data missing required fields: {tower_data}")
- return tower_data
-
- # Try cache lookup
- coords = lookup_cell_coordinates(mcc, mnc, lac, cid)
-
- if coords:
- # Found in cache - add coordinates immediately
- tower_data['lat'] = coords['lat']
- tower_data['lon'] = coords['lon']
- tower_data['source'] = 'cache'
-
- # Add optional fields if available
- if coords.get('azimuth') is not None:
- tower_data['azimuth'] = coords['azimuth']
- if coords.get('range_meters') is not None:
- tower_data['range_meters'] = coords['range_meters']
- if coords.get('operator'):
- tower_data['operator'] = coords['operator']
- if coords.get('radio'):
- tower_data['radio'] = coords['radio']
-
- logger.debug(f"Cache hit for tower: MCC={mcc} MNC={mnc} LAC={lac} CID={cid}")
- else:
- # Not in cache - mark as pending and queue for API lookup
- tower_data['status'] = 'pending'
- tower_data['source'] = 'unknown'
-
- # Queue for background geocoding (non-blocking)
- try:
- _geocoding_queue.put_nowait(tower_data.copy())
- logger.debug(f"Queued tower for geocoding: MCC={mcc} MNC={mnc} LAC={lac} CID={cid}")
- except queue.Full:
- logger.warning("Geocoding queue full, dropping tower")
-
- return tower_data
-
-
-def get_geocoding_queue() -> queue.Queue:
- """Get the geocoding queue for the background worker."""
- return _geocoding_queue
diff --git a/utils/logging.py b/utils/logging.py
index addbabe..3d2cc6a 100644
--- a/utils/logging.py
+++ b/utils/logging.py
@@ -28,4 +28,3 @@ wifi_logger = get_logger('intercept.wifi')
bluetooth_logger = get_logger('intercept.bluetooth')
adsb_logger = get_logger('intercept.adsb')
satellite_logger = get_logger('intercept.satellite')
-gsm_spy_logger = get_logger('intercept.gsm_spy')