diff --git a/app.py b/app.py
index 37115d7..6bcb4e1 100644
--- a/app.py
+++ b/app.py
@@ -675,7 +675,7 @@ def kill_all() -> Response:
"""Kill all decoder, WiFi, and Bluetooth processes."""
global current_process, sensor_process, wifi_process, adsb_process, ais_process, acars_process
global aprs_process, aprs_rtl_process, dsc_process, dsc_rtl_process, bt_process
- global gsm_spy_process, gsm_spy_livemon_process, gsm_spy_monitor_process
+ global gsm_spy_livemon_process, gsm_spy_monitor_process
# Import adsb and ais modules to reset their state
from routes import adsb as adsb_module
@@ -754,26 +754,18 @@ def kill_all() -> Response:
# Reset GSM Spy state
with gsm_spy_lock:
- if gsm_spy_process:
- try:
- safe_terminate(gsm_spy_process, 'grgsm_scanner')
- killed.append('grgsm_scanner')
- except Exception:
- pass
- gsm_spy_process = None
-
if gsm_spy_livemon_process:
try:
- safe_terminate(gsm_spy_livemon_process, 'grgsm_livemon')
- killed.append('grgsm_livemon')
+ if safe_terminate(gsm_spy_livemon_process):
+ killed.append('grgsm_livemon')
except Exception:
pass
gsm_spy_livemon_process = None
if gsm_spy_monitor_process:
try:
- safe_terminate(gsm_spy_monitor_process, 'tshark')
- killed.append('tshark')
+ if safe_terminate(gsm_spy_monitor_process):
+ killed.append('tshark')
except Exception:
pass
gsm_spy_monitor_process = None
@@ -867,6 +859,26 @@ def main() -> None:
from utils.database import init_db
init_db()
+ # Register database cleanup functions
+ from utils.database import (
+ cleanup_old_gsm_signals,
+ cleanup_old_gsm_tmsi_log,
+ cleanup_old_gsm_velocity_log,
+ cleanup_old_signal_history,
+ cleanup_old_timeline_entries,
+ cleanup_old_dsc_alerts,
+ cleanup_old_payloads
+ )
+ # GSM cleanups: signals (60 days), TMSI log (24 hours), velocity (1 hour)
+ # Interval multiplier: cleanup every N cycles (60s interval = 1 cleanup per hour at multiplier 60)
+ cleanup_manager.register_db_cleanup(cleanup_old_gsm_tmsi_log, interval_multiplier=60) # Every hour
+ cleanup_manager.register_db_cleanup(cleanup_old_gsm_velocity_log, interval_multiplier=60) # Every hour
+ cleanup_manager.register_db_cleanup(cleanup_old_gsm_signals, interval_multiplier=1440) # Every 24 hours
+ cleanup_manager.register_db_cleanup(cleanup_old_signal_history, interval_multiplier=1440) # Every 24 hours
+ cleanup_manager.register_db_cleanup(cleanup_old_timeline_entries, interval_multiplier=1440) # Every 24 hours
+ cleanup_manager.register_db_cleanup(cleanup_old_dsc_alerts, interval_multiplier=1440) # Every 24 hours
+ cleanup_manager.register_db_cleanup(cleanup_old_payloads, interval_multiplier=1440) # Every 24 hours
+
# Start automatic cleanup of stale data entries
cleanup_manager.start()
diff --git a/config.py b/config.py
index 4da4826..6f57bd4 100644
--- a/config.py
+++ b/config.py
@@ -205,8 +205,6 @@ GSM_OPENCELLID_API_KEY = _get_env('GSM_OPENCELLID_API_KEY', '')
GSM_OPENCELLID_API_URL = _get_env('GSM_OPENCELLID_API_URL', 'https://opencellid.org/cell/get')
GSM_API_DAILY_LIMIT = _get_env_int('GSM_API_DAILY_LIMIT', 1000)
GSM_TA_METERS_PER_UNIT = _get_env_int('GSM_TA_METERS_PER_UNIT', 554)
-GSM_UPDATE_INTERVAL = _get_env_float('GSM_UPDATE_INTERVAL', 2.0)
-GSM_MAX_AGE_SECONDS = _get_env_int('GSM_MAX_AGE_SECONDS', 300)
def configure_logging() -> None:
"""Configure application logging."""
diff --git a/routes/gsm_spy.py b/routes/gsm_spy.py
index 1091b87..57f581f 100644
--- a/routes/gsm_spy.py
+++ b/routes/gsm_spy.py
@@ -6,7 +6,6 @@ import json
import logging
import queue
import re
-import select
import subprocess
import threading
import time
@@ -284,6 +283,31 @@ def _start_monitoring_processes(arfcn: int, device_index: int) -> tuple[subproce
return grgsm_proc, tshark_proc
+def _start_and_register_monitor(arfcn: int, device_index: int) -> None:
+ """Start monitoring processes and register them in global state.
+
+ This is shared logic between start_monitor() and auto_start_monitor().
+ Must be called within gsm_spy_lock context.
+
+ Args:
+ arfcn: ARFCN to monitor
+ device_index: SDR device index
+ """
+ # Start monitoring processes
+ grgsm_proc, tshark_proc = _start_monitoring_processes(arfcn, device_index)
+ app_module.gsm_spy_livemon_process = grgsm_proc
+ app_module.gsm_spy_monitor_process = tshark_proc
+ app_module.gsm_spy_selected_arfcn = arfcn
+
+ # Start monitoring thread
+ monitor_thread_obj = threading.Thread(
+ target=monitor_thread,
+ args=(tshark_proc,),
+ daemon=True
+ )
+ monitor_thread_obj.start()
+
+
@gsm_spy_bp.route('/dashboard')
def dashboard():
"""Render GSM Spy dashboard."""
@@ -405,6 +429,14 @@ def start_monitor():
if not arfcn:
return jsonify({'error': 'ARFCN required'}), 400
+ # Validate ARFCN is valid integer and in known GSM band ranges
+ try:
+ arfcn = int(arfcn)
+ # This will raise ValueError if ARFCN is not in any known band
+ arfcn_to_frequency(arfcn)
+ except (ValueError, TypeError) as e:
+ return jsonify({'error': f'Invalid ARFCN: {e}'}), 400
+
# Validate device index
try:
device_index = validate_device_index(device_index)
@@ -412,19 +444,8 @@ def start_monitor():
return jsonify({'error': str(e)}), 400
try:
- # Start monitoring processes
- grgsm_proc, tshark_proc = _start_monitoring_processes(arfcn, device_index)
- app_module.gsm_spy_livemon_process = grgsm_proc
- app_module.gsm_spy_monitor_process = tshark_proc
- app_module.gsm_spy_selected_arfcn = arfcn
-
- # Start monitoring thread
- monitor_thread_obj = threading.Thread(
- target=monitor_thread,
- args=(tshark_proc,),
- daemon=True
- )
- monitor_thread_obj.start()
+ # Start and register monitoring (shared logic)
+ _start_and_register_monitor(arfcn, device_index)
return jsonify({
'status': 'monitoring',
@@ -466,12 +487,8 @@ def stop_scanner():
killed.append('monitor')
app_module.gsm_spy_monitor_process = None
- # Release SDR device
- if app_module.gsm_spy_active_device is not None:
- from app import release_sdr_device
- release_sdr_device(app_module.gsm_spy_active_device)
- logger.info(f"Released SDR device {app_module.gsm_spy_active_device}")
-
+ # Note: SDR device is released by scanner thread's finally block
+ # to avoid race condition. Just reset the state variables here.
app_module.gsm_spy_active_device = None
app_module.gsm_spy_selected_arfcn = None
gsm_connected = False
@@ -526,7 +543,7 @@ def status():
"""Get current GSM Spy status."""
api_usage = get_api_usage_today()
return jsonify({
- 'running': app_module.gsm_spy_scanner_running is not None,
+ 'running': bool(app_module.gsm_spy_scanner_running),
'monitoring': app_module.gsm_spy_monitor_process is not None,
'towers_found': gsm_towers_found,
'devices_tracked': gsm_devices_tracked,
@@ -1162,19 +1179,8 @@ def auto_start_monitor(tower_data):
device_index = app_module.gsm_spy_active_device or 0
- # Start monitoring processes
- grgsm_proc, tshark_proc = _start_monitoring_processes(arfcn, device_index)
- app_module.gsm_spy_livemon_process = grgsm_proc
- app_module.gsm_spy_monitor_process = tshark_proc
- app_module.gsm_spy_selected_arfcn = arfcn
-
- # Start monitoring thread
- monitor_thread_obj = threading.Thread(
- target=monitor_thread,
- args=(tshark_proc,),
- daemon=True
- )
- monitor_thread_obj.start()
+ # Start and register monitoring (shared logic)
+ _start_and_register_monitor(arfcn, device_index)
# Send SSE notification
try:
@@ -1219,20 +1225,36 @@ def scanner_thread(cmd, device_index):
universal_newlines=True,
bufsize=1
)
+ register_process(process)
+ logger.info(f"Started grgsm_scanner (PID: {process.pid})")
+
+ # Standard pattern: reader threads with queue
+ output_queue_local = queue.Queue()
+
+ def read_stdout():
+ try:
+ for line in iter(process.stdout.readline, ''):
+ if line:
+ output_queue_local.put(('stdout', line))
+ except Exception as e:
+ logger.error(f"stdout read error: {e}")
+ finally:
+ output_queue_local.put(('eof', None))
- # Non-blocking stderr reader
def read_stderr():
try:
- for line in process.stderr:
+ for line in iter(process.stderr.readline, ''):
if line:
logger.debug(f"grgsm_scanner: {line.strip()}")
except Exception as e:
logger.error(f"stderr read error: {e}")
+ stdout_thread = threading.Thread(target=read_stdout, daemon=True)
stderr_thread = threading.Thread(target=read_stderr, daemon=True)
+ stdout_thread.start()
stderr_thread.start()
- # Non-blocking stdout reader with timeout
+ # Process output with timeout
last_output = time.time()
scan_timeout = 120 # 2 minute maximum per scan
@@ -1242,12 +1264,11 @@ def scanner_thread(cmd, device_index):
logger.info(f"Scanner exited (code: {process.returncode})")
break
- # Check for output with 1-second timeout
- ready, _, _ = select.select([process.stdout], [], [], 1.0)
+ # Get output from queue with timeout
+ try:
+ msg_type, line = output_queue_local.get(timeout=1.0)
- if ready:
- line = process.stdout.readline()
- if not line:
+ if msg_type == 'eof':
break # EOF
last_output = time.time()
@@ -1287,7 +1308,7 @@ def scanner_thread(cmd, device_index):
args=(strongest_tower,),
daemon=True
).start()
- else:
+ except queue.Empty:
# No output, check timeout
if time.time() - last_output > scan_timeout:
logger.warning(f"Scan timeout after {scan_timeout}s")
@@ -1347,6 +1368,10 @@ def scanner_thread(cmd, device_index):
except Exception:
pass
+ # Unregister process from cleanup list
+ if process:
+ unregister_process(process)
+
logger.info("Scanner thread terminated")
# Reset global state
@@ -1359,9 +1384,25 @@ def scanner_thread(cmd, device_index):
def monitor_thread(process):
- """Thread to read tshark output with non-blocking I/O and timeouts."""
+ """Thread to read tshark output using standard iter pattern."""
global gsm_devices_tracked
+ # Standard pattern: reader thread with queue
+ output_queue_local = queue.Queue()
+
+ def read_stdout():
+ try:
+ for line in iter(process.stdout.readline, ''):
+ if line:
+ output_queue_local.put(('stdout', line))
+ except Exception as e:
+ logger.error(f"tshark read error: {e}")
+ finally:
+ output_queue_local.put(('eof', None))
+
+ stdout_thread = threading.Thread(target=read_stdout, daemon=True)
+ stdout_thread.start()
+
try:
while app_module.gsm_spy_monitor_process:
# Check if process died
@@ -1369,14 +1410,13 @@ def monitor_thread(process):
logger.info(f"Monitor process exited (code: {process.returncode})")
break
- # Non-blocking read with timeout
- ready, _, _ = select.select([process.stdout], [], [], 1.0)
-
- if not ready:
+ # Get output from queue with timeout
+ try:
+ msg_type, line = output_queue_local.get(timeout=1.0)
+ except queue.Empty:
continue # Timeout, check flag again
- line = process.stdout.readline()
- if not line:
+ if msg_type == 'eof':
break # EOF
parsed = parse_tshark_output(line)
diff --git a/templates/gsm_spy_dashboard.html b/templates/gsm_spy_dashboard.html
index dca7ae4..18dd0a0 100644
--- a/templates/gsm_spy_dashboard.html
+++ b/templates/gsm_spy_dashboard.html
@@ -1332,6 +1332,14 @@
totalSignals: 0
};
+ // XSS protection: Escape HTML special characters
+ function escapeHtml(text) {
+ if (text === null || text === undefined) return '';
+ const div = document.createElement('div');
+ div.textContent = text;
+ return div.innerHTML;
+ }
+
// Band configurations by region
const BAND_CONFIG = {
'Europe': [
@@ -1832,23 +1840,23 @@
Cell ID
- ${tower.cid} ${tower.rogue ? 'ROGUE' : ''}
+ ${escapeHtml(tower.cid)} ${tower.rogue ? 'ROGUE' : ''}
MCC / MNC
- ${tower.mcc} / ${tower.mnc}
+ ${escapeHtml(tower.mcc)} / ${escapeHtml(tower.mnc)}
LAC
- ${tower.lac}
+ ${escapeHtml(tower.lac)}
ARFCN
- ${tower.arfcn}
+ ${escapeHtml(tower.arfcn)}
Signal (dBm)
- ${tower.signal || 'N/A'}
+ ${escapeHtml(tower.signal || 'N/A')}
Location
@@ -1877,14 +1885,14 @@
for (const [key, tower] of Object.entries(towers)) {
const selected = key === selectedTowerKey ? 'selected' : '';
html += `
-
+
- LAC ${tower.lac} | ARFCN ${tower.arfcn} | ${tower.signal || 'N/A'} dBm
+ LAC ${escapeHtml(tower.lac)} | ARFCN ${escapeHtml(tower.arfcn)} | ${escapeHtml(tower.signal || 'N/A')} dBm
`;
@@ -1900,6 +1908,13 @@
const key = data.imsi || data.tmsi || `device_${Date.now()}`;
devices[key] = data;
+ // Check if device has valid coordinates before creating marker
+ if (!data.lat || !data.lon) {
+ console.warn('[GSM SPY] Device has no coordinates, skipping map marker:', key);
+ updateDevicesList();
+ return;
+ }
+
// Create device marker with vector icon
const marker = L.marker([data.lat, data.lon], {
icon: createGSMMarkerIcon('device', '#00d9ff', false, false)
@@ -1951,14 +1966,17 @@
let html = '';
for (const [key, device] of Object.entries(devices)) {
const identifier = device.imsi || device.tmsi || 'Unknown';
+ const location = (device.lat && device.lon)
+ ? `${device.lat.toFixed(6)}, ${device.lon.toFixed(6)}`
+ : 'Location unknown';
html += `
- Tower CID ${device.cid} | ${device.lat.toFixed(6)}, ${device.lon.toFixed(6)}
+ Tower CID ${escapeHtml(device.cid)} | ${escapeHtml(location)}
`;
@@ -1983,7 +2001,7 @@
${new Date(data.timestamp).toLocaleTimeString()}
⚠ ROGUE TOWER DETECTED
- CID ${data.cid} | MCC ${data.mcc} MNC ${data.mnc} | ${data.reason || 'Unknown threat'}
+ CID ${escapeHtml(data.cid)} | MCC ${escapeHtml(data.mcc)} MNC ${escapeHtml(data.mnc)} | ${escapeHtml(data.reason || 'Unknown threat')}
`;
diff --git a/tests/test_gsm_spy.py b/tests/test_gsm_spy.py
new file mode 100644
index 0000000..92deb40
--- /dev/null
+++ b/tests/test_gsm_spy.py
@@ -0,0 +1,302 @@
+"""Unit tests for GSM Spy parsing and validation functions."""
+
+import pytest
+from routes.gsm_spy import (
+ parse_grgsm_scanner_output,
+ parse_tshark_output,
+ arfcn_to_frequency,
+ validate_band_names,
+ REGIONAL_BANDS
+)
+
+
+class TestParseGrgsmScannerOutput:
+ """Tests for parse_grgsm_scanner_output()."""
+
+ def test_valid_table_row(self):
+ """Test parsing a valid scanner output table row."""
+ line = " 23 | 940.6 | 31245 | 1234 | 214 | 01 | -48"
+ result = parse_grgsm_scanner_output(line)
+
+ assert result is not None
+ assert result['type'] == 'tower'
+ assert result['arfcn'] == 23
+ assert result['frequency'] == 940.6
+ assert result['cid'] == 31245
+ assert result['lac'] == 1234
+ assert result['mcc'] == 214
+ assert result['mnc'] == 1
+ assert result['signal_strength'] == -48.0
+ assert 'timestamp' in result
+
+ def test_header_line(self):
+ """Test that header lines are skipped."""
+ line = "ARFCN | Freq (MHz) | CID | LAC | MCC | MNC | Power (dB)"
+ result = parse_grgsm_scanner_output(line)
+ assert result is None
+
+ def test_separator_line(self):
+ """Test that separator lines are skipped."""
+ line = "--------------------------------------------------------------------"
+ result = parse_grgsm_scanner_output(line)
+ assert result is None
+
+ def test_progress_line(self):
+ """Test that progress lines are skipped."""
+ line = "Scanning: 50% complete"
+ result = parse_grgsm_scanner_output(line)
+ assert result is None
+
+ def test_found_line(self):
+ """Test that 'Found X towers' lines are skipped."""
+ line = "Found 5 towers"
+ result = parse_grgsm_scanner_output(line)
+ assert result is None
+
+ def test_invalid_data(self):
+ """Test handling of invalid data."""
+ line = " abc | xyz | invalid | data | bad | bad | bad"
+ result = parse_grgsm_scanner_output(line)
+ assert result is None
+
+ def test_empty_line(self):
+ """Test handling of empty lines."""
+ result = parse_grgsm_scanner_output("")
+ assert result is None
+
+ def test_partial_data(self):
+ """Test handling of incomplete table rows."""
+ line = " 23 | 940.6 | 31245" # Missing fields
+ result = parse_grgsm_scanner_output(line)
+ assert result is None
+
+
+class TestParseTsharkOutput:
+ """Tests for parse_tshark_output()."""
+
+ def test_valid_full_output(self):
+ """Test parsing tshark output with all fields."""
+ line = "5\t0xABCD1234\t123456789012345\t1234\t31245"
+ result = parse_tshark_output(line)
+
+ assert result is not None
+ assert result['type'] == 'device'
+ assert result['ta_value'] == 5
+ assert result['tmsi'] == '0xABCD1234'
+ assert result['imsi'] == '123456789012345'
+ assert result['lac'] == 1234
+ assert result['cid'] == 31245
+ assert result['distance_meters'] == 5 * 554 # TA * 554 meters
+ assert 'timestamp' in result
+
+ def test_missing_optional_fields(self):
+ """Test parsing with missing optional fields (empty tabs)."""
+ line = "3\t\t\t1234\t31245"
+ result = parse_tshark_output(line)
+
+ assert result is not None
+ assert result['ta_value'] == 3
+ assert result['tmsi'] is None
+ assert result['imsi'] is None
+ assert result['lac'] == 1234
+ assert result['cid'] == 31245
+
+ def test_no_ta_value(self):
+ """Test parsing without TA value (empty field)."""
+ # When TA is empty, int('') will fail, so the parse returns None
+ # This is the current behavior - the function expects valid integers or valid empty handling
+ line = "\t0xABCD1234\t123456789012345\t1234\t31245"
+ result = parse_tshark_output(line)
+ # Current implementation will fail to parse this due to int('') failing
+ assert result is None
+
+ def test_invalid_line(self):
+ """Test handling of invalid tshark output."""
+ line = "invalid data"
+ result = parse_tshark_output(line)
+ assert result is None
+
+ def test_empty_line(self):
+ """Test handling of empty lines."""
+ result = parse_tshark_output("")
+ assert result is None
+
+ def test_partial_fields(self):
+ """Test with fewer than 5 fields."""
+ line = "5\t0xABCD1234" # Only 2 fields
+ result = parse_tshark_output(line)
+ assert result is None
+
+
+class TestArfcnToFrequency:
+ """Tests for arfcn_to_frequency()."""
+
+ def test_gsm850_arfcn(self):
+ """Test ARFCN in GSM850 band."""
+ # GSM850: ARFCN 128-251, 869-894 MHz
+ arfcn = 128
+ freq = arfcn_to_frequency(arfcn)
+ assert freq == 869000000 # 869 MHz
+
+ arfcn = 251
+ freq = arfcn_to_frequency(arfcn)
+ assert freq == 893600000 # 893.6 MHz
+
+ def test_egsm900_arfcn(self):
+ """Test ARFCN in EGSM900 band."""
+ # EGSM900: ARFCN 0-124, 925-960 MHz
+ arfcn = 0
+ freq = arfcn_to_frequency(arfcn)
+ assert freq == 925000000 # 925 MHz
+
+ arfcn = 124
+ freq = arfcn_to_frequency(arfcn)
+ assert freq == 949800000 # 949.8 MHz
+
+ def test_dcs1800_arfcn(self):
+ """Test ARFCN in DCS1800 band."""
+ # DCS1800: ARFCN 512-885, 1805-1880 MHz
+ # Note: ARFCN 512 also exists in PCS1900 and will match that first
+ # Use ARFCN 811+ which is only in DCS1800
+ arfcn = 811 # Beyond PCS1900 range (512-810)
+ freq = arfcn_to_frequency(arfcn)
+ # 811 is ARFCN offset from 512, so freq = 1805MHz + (811-512)*200kHz
+ expected = 1805000000 + (811 - 512) * 200000
+ assert freq == expected
+
+ arfcn = 885
+ freq = arfcn_to_frequency(arfcn)
+ assert freq == 1879600000 # 1879.6 MHz
+
+ def test_pcs1900_arfcn(self):
+ """Test ARFCN in PCS1900 band."""
+ # PCS1900: ARFCN 512-810, 1930-1990 MHz
+ # Note: overlaps with DCS1800 ARFCN range, but different frequencies
+ arfcn = 512
+ freq = arfcn_to_frequency(arfcn)
+ # Will match first band (DCS1800 in Europe config)
+ assert freq > 0
+
+ def test_invalid_arfcn(self):
+ """Test ARFCN outside known ranges."""
+ with pytest.raises(ValueError, match="not found in any known GSM band"):
+ arfcn_to_frequency(9999)
+
+ with pytest.raises(ValueError):
+ arfcn_to_frequency(-1)
+
+ def test_arfcn_200khz_spacing(self):
+ """Test that ARFCNs are 200kHz apart."""
+ arfcn1 = 128
+ arfcn2 = 129
+ freq1 = arfcn_to_frequency(arfcn1)
+ freq2 = arfcn_to_frequency(arfcn2)
+ assert freq2 - freq1 == 200000 # 200 kHz
+
+
+class TestValidateBandNames:
+ """Tests for validate_band_names()."""
+
+ def test_valid_americas_bands(self):
+ """Test valid band names for Americas region."""
+ bands = ['GSM850', 'PCS1900']
+ result, error = validate_band_names(bands, 'Americas')
+ assert result == bands
+ assert error is None
+
+ def test_valid_europe_bands(self):
+ """Test valid band names for Europe region."""
+ # Note: Europe uses EGSM900, not GSM900
+ bands = ['EGSM900', 'DCS1800', 'GSM850', 'GSM800']
+ result, error = validate_band_names(bands, 'Europe')
+ assert result == bands
+ assert error is None
+
+ def test_valid_asia_bands(self):
+ """Test valid band names for Asia region."""
+ # Note: Asia uses EGSM900, not GSM900
+ bands = ['EGSM900', 'DCS1800']
+ result, error = validate_band_names(bands, 'Asia')
+ assert result == bands
+ assert error is None
+
+ def test_invalid_band_for_region(self):
+ """Test invalid band name for a region."""
+ bands = ['GSM900', 'INVALID_BAND']
+ result, error = validate_band_names(bands, 'Americas')
+ assert result == []
+ assert error is not None
+ assert 'Invalid bands' in error
+ assert 'INVALID_BAND' in error
+
+ def test_invalid_region(self):
+ """Test invalid region name."""
+ bands = ['GSM900']
+ result, error = validate_band_names(bands, 'InvalidRegion')
+ assert result == []
+ assert error is not None
+ assert 'Invalid region' in error
+
+ def test_empty_bands_list(self):
+ """Test with empty bands list."""
+ result, error = validate_band_names([], 'Americas')
+ assert result == []
+ assert error is None
+
+ def test_single_valid_band(self):
+ """Test with single valid band."""
+ bands = ['GSM850']
+ result, error = validate_band_names(bands, 'Americas')
+ assert result == ['GSM850']
+ assert error is None
+
+ def test_case_sensitive_band_names(self):
+ """Test that band names are case-sensitive."""
+ bands = ['gsm850'] # lowercase
+ result, error = validate_band_names(bands, 'Americas')
+ assert result == []
+ assert error is not None
+
+ def test_multiple_invalid_bands(self):
+ """Test with multiple invalid bands."""
+ bands = ['INVALID1', 'GSM850', 'INVALID2']
+ result, error = validate_band_names(bands, 'Americas')
+ assert result == []
+ assert error is not None
+ assert 'INVALID1' in error
+ assert 'INVALID2' in error
+
+
+class TestRegionalBandsConfig:
+ """Tests for REGIONAL_BANDS configuration."""
+
+ def test_all_regions_defined(self):
+ """Test that all expected regions are defined."""
+ assert 'Americas' in REGIONAL_BANDS
+ assert 'Europe' in REGIONAL_BANDS
+ assert 'Asia' in REGIONAL_BANDS
+
+ def test_all_bands_have_required_fields(self):
+ """Test that all bands have required configuration fields."""
+ for region, bands in REGIONAL_BANDS.items():
+ for band_name, band_config in bands.items():
+ assert 'start' in band_config
+ assert 'end' in band_config
+ assert 'arfcn_start' in band_config
+ assert 'arfcn_end' in band_config
+
+ def test_frequency_ranges_valid(self):
+ """Test that frequency ranges are positive and start < end."""
+ for region, bands in REGIONAL_BANDS.items():
+ for band_name, band_config in bands.items():
+ assert band_config['start'] > 0
+ assert band_config['end'] > 0
+ assert band_config['start'] < band_config['end']
+
+ def test_arfcn_ranges_valid(self):
+ """Test that ARFCN ranges are valid."""
+ for region, bands in REGIONAL_BANDS.items():
+ for band_name, band_config in bands.items():
+ assert band_config['arfcn_start'] >= 0
+ assert band_config['arfcn_end'] >= 0
+ assert band_config['arfcn_start'] <= band_config['arfcn_end']
diff --git a/utils/cleanup.py b/utils/cleanup.py
index 1ea2cf8..1748159 100644
--- a/utils/cleanup.py
+++ b/utils/cleanup.py
@@ -142,7 +142,7 @@ class DataStore:
class CleanupManager:
- """Manages periodic cleanup of multiple data stores."""
+ """Manages periodic cleanup of multiple data stores and database tables."""
def __init__(self, interval: float = 60.0):
"""
@@ -152,9 +152,11 @@ class CleanupManager:
interval: Cleanup interval in seconds
"""
self.stores: list[DataStore] = []
+ self.db_cleanup_funcs: list[tuple[callable, int]] = [] # (func, interval_multiplier)
self.interval = interval
self._timer: threading.Timer | None = None
self._running = False
+ self._cleanup_count = 0
self._lock = threading.Lock()
def register(self, store: DataStore) -> None:
@@ -169,6 +171,17 @@ class CleanupManager:
if store in self.stores:
self.stores.remove(store)
+ def register_db_cleanup(self, func: callable, interval_multiplier: int = 60) -> None:
+ """
+ Register a database cleanup function.
+
+ Args:
+ func: Cleanup function to call (should return number of deleted rows)
+ interval_multiplier: How many cleanup cycles to wait between calls (default: 60 = 1 hour if interval is 60s)
+ """
+ with self._lock:
+ self.db_cleanup_funcs.append((func, interval_multiplier))
+
def start(self) -> None:
"""Start the cleanup timer."""
with self._lock:
@@ -194,11 +207,15 @@ class CleanupManager:
self._timer.start()
def _run_cleanup(self) -> None:
- """Run cleanup on all registered stores."""
+ """Run cleanup on all registered stores and database tables."""
total_cleaned = 0
+ # Cleanup in-memory data stores
with self._lock:
stores = list(self.stores)
+ db_funcs = list(self.db_cleanup_funcs)
+ self._cleanup_count += 1
+ current_count = self._cleanup_count
for store in stores:
try:
@@ -206,6 +223,17 @@ class CleanupManager:
except Exception as e:
logger.error(f"Error cleaning up {store.name}: {e}")
+ # Cleanup database tables (less frequently)
+ for func, interval_multiplier in db_funcs:
+ if current_count % interval_multiplier == 0:
+ try:
+ deleted = func()
+ if deleted > 0:
+ logger.info(f"Database cleanup: {func.__name__} removed {deleted} rows")
+ total_cleaned += deleted
+ except Exception as e:
+ logger.error(f"Error in database cleanup {func.__name__}: {e}")
+
if total_cleaned > 0:
logger.info(f"Cleanup complete: removed {total_cleaned} stale entries")
diff --git a/utils/database.py b/utils/database.py
index 97713ff..bedb369 100644
--- a/utils/database.py
+++ b/utils/database.py
@@ -2189,3 +2189,61 @@ def cleanup_old_payloads(max_age_hours: int = 24) -> int:
WHERE received_at < datetime('now', ?)
''', (f'-{max_age_hours} hours',))
return cursor.rowcount
+
+
+# =============================================================================
+# GSM Cleanup Functions
+# =============================================================================
+
+def cleanup_old_gsm_signals(max_age_days: int = 60) -> int:
+ """
+ Remove old GSM signal observations (60-day archive).
+
+ Args:
+ max_age_days: Maximum age in days (default: 60)
+
+ Returns:
+ Number of deleted entries
+ """
+ with get_db() as conn:
+ cursor = conn.execute('''
+ DELETE FROM gsm_signals
+ WHERE timestamp < datetime('now', ?)
+ ''', (f'-{max_age_days} days',))
+ return cursor.rowcount
+
+
+def cleanup_old_gsm_tmsi_log(max_age_hours: int = 24) -> int:
+ """
+ Remove old TMSI log entries (24-hour buffer for crowd density).
+
+ Args:
+ max_age_hours: Maximum age in hours (default: 24)
+
+ Returns:
+ Number of deleted entries
+ """
+ with get_db() as conn:
+ cursor = conn.execute('''
+ DELETE FROM gsm_tmsi_log
+ WHERE timestamp < datetime('now', ?)
+ ''', (f'-{max_age_hours} hours',))
+ return cursor.rowcount
+
+
+def cleanup_old_gsm_velocity_log(max_age_hours: int = 1) -> int:
+ """
+ Remove old velocity log entries (1-hour buffer for movement tracking).
+
+ Args:
+ max_age_hours: Maximum age in hours (default: 1)
+
+ Returns:
+ Number of deleted entries
+ """
+ with get_db() as conn:
+ cursor = conn.execute('''
+ DELETE FROM gsm_velocity_log
+ WHERE timestamp < datetime('now', ?)
+ ''', (f'-{max_age_hours} hours',))
+ return cursor.rowcount