PR #124 fixed major and minor issues

This commit is contained in:
Marc
2026-02-08 07:04:10 -06:00
parent 44b1a74838
commit bdba56bef1
7 changed files with 535 additions and 79 deletions
+25 -13
View File
@@ -675,7 +675,7 @@ def kill_all() -> Response:
"""Kill all decoder, WiFi, and Bluetooth processes."""
global current_process, sensor_process, wifi_process, adsb_process, ais_process, acars_process
global aprs_process, aprs_rtl_process, dsc_process, dsc_rtl_process, bt_process
global gsm_spy_process, gsm_spy_livemon_process, gsm_spy_monitor_process
global gsm_spy_livemon_process, gsm_spy_monitor_process
# Import adsb and ais modules to reset their state
from routes import adsb as adsb_module
@@ -754,26 +754,18 @@ def kill_all() -> Response:
# Reset GSM Spy state
with gsm_spy_lock:
if gsm_spy_process:
try:
safe_terminate(gsm_spy_process, 'grgsm_scanner')
killed.append('grgsm_scanner')
except Exception:
pass
gsm_spy_process = None
if gsm_spy_livemon_process:
try:
safe_terminate(gsm_spy_livemon_process, 'grgsm_livemon')
killed.append('grgsm_livemon')
if safe_terminate(gsm_spy_livemon_process):
killed.append('grgsm_livemon')
except Exception:
pass
gsm_spy_livemon_process = None
if gsm_spy_monitor_process:
try:
safe_terminate(gsm_spy_monitor_process, 'tshark')
killed.append('tshark')
if safe_terminate(gsm_spy_monitor_process):
killed.append('tshark')
except Exception:
pass
gsm_spy_monitor_process = None
@@ -867,6 +859,26 @@ def main() -> None:
from utils.database import init_db
init_db()
# Register database cleanup functions
from utils.database import (
cleanup_old_gsm_signals,
cleanup_old_gsm_tmsi_log,
cleanup_old_gsm_velocity_log,
cleanup_old_signal_history,
cleanup_old_timeline_entries,
cleanup_old_dsc_alerts,
cleanup_old_payloads
)
# GSM cleanups: signals (60 days), TMSI log (24 hours), velocity (1 hour)
# Interval multiplier: cleanup every N cycles (60s interval = 1 cleanup per hour at multiplier 60)
cleanup_manager.register_db_cleanup(cleanup_old_gsm_tmsi_log, interval_multiplier=60) # Every hour
cleanup_manager.register_db_cleanup(cleanup_old_gsm_velocity_log, interval_multiplier=60) # Every hour
cleanup_manager.register_db_cleanup(cleanup_old_gsm_signals, interval_multiplier=1440) # Every 24 hours
cleanup_manager.register_db_cleanup(cleanup_old_signal_history, interval_multiplier=1440) # Every 24 hours
cleanup_manager.register_db_cleanup(cleanup_old_timeline_entries, interval_multiplier=1440) # Every 24 hours
cleanup_manager.register_db_cleanup(cleanup_old_dsc_alerts, interval_multiplier=1440) # Every 24 hours
cleanup_manager.register_db_cleanup(cleanup_old_payloads, interval_multiplier=1440) # Every 24 hours
# Start automatic cleanup of stale data entries
cleanup_manager.start()
-2
View File
@@ -205,8 +205,6 @@ GSM_OPENCELLID_API_KEY = _get_env('GSM_OPENCELLID_API_KEY', '')
GSM_OPENCELLID_API_URL = _get_env('GSM_OPENCELLID_API_URL', 'https://opencellid.org/cell/get')
GSM_API_DAILY_LIMIT = _get_env_int('GSM_API_DAILY_LIMIT', 1000)
GSM_TA_METERS_PER_UNIT = _get_env_int('GSM_TA_METERS_PER_UNIT', 554)
GSM_UPDATE_INTERVAL = _get_env_float('GSM_UPDATE_INTERVAL', 2.0)
GSM_MAX_AGE_SECONDS = _get_env_int('GSM_MAX_AGE_SECONDS', 300)
def configure_logging() -> None:
"""Configure application logging."""
+90 -50
View File
@@ -6,7 +6,6 @@ import json
import logging
import queue
import re
import select
import subprocess
import threading
import time
@@ -284,6 +283,31 @@ def _start_monitoring_processes(arfcn: int, device_index: int) -> tuple[subproce
return grgsm_proc, tshark_proc
def _start_and_register_monitor(arfcn: int, device_index: int) -> None:
"""Start monitoring processes and register them in global state.
This is shared logic between start_monitor() and auto_start_monitor().
Must be called within gsm_spy_lock context.
Args:
arfcn: ARFCN to monitor
device_index: SDR device index
"""
# Start monitoring processes
grgsm_proc, tshark_proc = _start_monitoring_processes(arfcn, device_index)
app_module.gsm_spy_livemon_process = grgsm_proc
app_module.gsm_spy_monitor_process = tshark_proc
app_module.gsm_spy_selected_arfcn = arfcn
# Start monitoring thread
monitor_thread_obj = threading.Thread(
target=monitor_thread,
args=(tshark_proc,),
daemon=True
)
monitor_thread_obj.start()
@gsm_spy_bp.route('/dashboard')
def dashboard():
"""Render GSM Spy dashboard."""
@@ -405,6 +429,14 @@ def start_monitor():
if not arfcn:
return jsonify({'error': 'ARFCN required'}), 400
# Validate ARFCN is valid integer and in known GSM band ranges
try:
arfcn = int(arfcn)
# This will raise ValueError if ARFCN is not in any known band
arfcn_to_frequency(arfcn)
except (ValueError, TypeError) as e:
return jsonify({'error': f'Invalid ARFCN: {e}'}), 400
# Validate device index
try:
device_index = validate_device_index(device_index)
@@ -412,19 +444,8 @@ def start_monitor():
return jsonify({'error': str(e)}), 400
try:
# Start monitoring processes
grgsm_proc, tshark_proc = _start_monitoring_processes(arfcn, device_index)
app_module.gsm_spy_livemon_process = grgsm_proc
app_module.gsm_spy_monitor_process = tshark_proc
app_module.gsm_spy_selected_arfcn = arfcn
# Start monitoring thread
monitor_thread_obj = threading.Thread(
target=monitor_thread,
args=(tshark_proc,),
daemon=True
)
monitor_thread_obj.start()
# Start and register monitoring (shared logic)
_start_and_register_monitor(arfcn, device_index)
return jsonify({
'status': 'monitoring',
@@ -466,12 +487,8 @@ def stop_scanner():
killed.append('monitor')
app_module.gsm_spy_monitor_process = None
# Release SDR device
if app_module.gsm_spy_active_device is not None:
from app import release_sdr_device
release_sdr_device(app_module.gsm_spy_active_device)
logger.info(f"Released SDR device {app_module.gsm_spy_active_device}")
# Note: SDR device is released by scanner thread's finally block
# to avoid race condition. Just reset the state variables here.
app_module.gsm_spy_active_device = None
app_module.gsm_spy_selected_arfcn = None
gsm_connected = False
@@ -526,7 +543,7 @@ def status():
"""Get current GSM Spy status."""
api_usage = get_api_usage_today()
return jsonify({
'running': app_module.gsm_spy_scanner_running is not None,
'running': bool(app_module.gsm_spy_scanner_running),
'monitoring': app_module.gsm_spy_monitor_process is not None,
'towers_found': gsm_towers_found,
'devices_tracked': gsm_devices_tracked,
@@ -1162,19 +1179,8 @@ def auto_start_monitor(tower_data):
device_index = app_module.gsm_spy_active_device or 0
# Start monitoring processes
grgsm_proc, tshark_proc = _start_monitoring_processes(arfcn, device_index)
app_module.gsm_spy_livemon_process = grgsm_proc
app_module.gsm_spy_monitor_process = tshark_proc
app_module.gsm_spy_selected_arfcn = arfcn
# Start monitoring thread
monitor_thread_obj = threading.Thread(
target=monitor_thread,
args=(tshark_proc,),
daemon=True
)
monitor_thread_obj.start()
# Start and register monitoring (shared logic)
_start_and_register_monitor(arfcn, device_index)
# Send SSE notification
try:
@@ -1219,20 +1225,36 @@ def scanner_thread(cmd, device_index):
universal_newlines=True,
bufsize=1
)
register_process(process)
logger.info(f"Started grgsm_scanner (PID: {process.pid})")
# Standard pattern: reader threads with queue
output_queue_local = queue.Queue()
def read_stdout():
try:
for line in iter(process.stdout.readline, ''):
if line:
output_queue_local.put(('stdout', line))
except Exception as e:
logger.error(f"stdout read error: {e}")
finally:
output_queue_local.put(('eof', None))
# Non-blocking stderr reader
def read_stderr():
try:
for line in process.stderr:
for line in iter(process.stderr.readline, ''):
if line:
logger.debug(f"grgsm_scanner: {line.strip()}")
except Exception as e:
logger.error(f"stderr read error: {e}")
stdout_thread = threading.Thread(target=read_stdout, daemon=True)
stderr_thread = threading.Thread(target=read_stderr, daemon=True)
stdout_thread.start()
stderr_thread.start()
# Non-blocking stdout reader with timeout
# Process output with timeout
last_output = time.time()
scan_timeout = 120 # 2 minute maximum per scan
@@ -1242,12 +1264,11 @@ def scanner_thread(cmd, device_index):
logger.info(f"Scanner exited (code: {process.returncode})")
break
# Check for output with 1-second timeout
ready, _, _ = select.select([process.stdout], [], [], 1.0)
# Get output from queue with timeout
try:
msg_type, line = output_queue_local.get(timeout=1.0)
if ready:
line = process.stdout.readline()
if not line:
if msg_type == 'eof':
break # EOF
last_output = time.time()
@@ -1287,7 +1308,7 @@ def scanner_thread(cmd, device_index):
args=(strongest_tower,),
daemon=True
).start()
else:
except queue.Empty:
# No output, check timeout
if time.time() - last_output > scan_timeout:
logger.warning(f"Scan timeout after {scan_timeout}s")
@@ -1347,6 +1368,10 @@ def scanner_thread(cmd, device_index):
except Exception:
pass
# Unregister process from cleanup list
if process:
unregister_process(process)
logger.info("Scanner thread terminated")
# Reset global state
@@ -1359,9 +1384,25 @@ def scanner_thread(cmd, device_index):
def monitor_thread(process):
"""Thread to read tshark output with non-blocking I/O and timeouts."""
"""Thread to read tshark output using standard iter pattern."""
global gsm_devices_tracked
# Standard pattern: reader thread with queue
output_queue_local = queue.Queue()
def read_stdout():
try:
for line in iter(process.stdout.readline, ''):
if line:
output_queue_local.put(('stdout', line))
except Exception as e:
logger.error(f"tshark read error: {e}")
finally:
output_queue_local.put(('eof', None))
stdout_thread = threading.Thread(target=read_stdout, daemon=True)
stdout_thread.start()
try:
while app_module.gsm_spy_monitor_process:
# Check if process died
@@ -1369,14 +1410,13 @@ def monitor_thread(process):
logger.info(f"Monitor process exited (code: {process.returncode})")
break
# Non-blocking read with timeout
ready, _, _ = select.select([process.stdout], [], [], 1.0)
if not ready:
# Get output from queue with timeout
try:
msg_type, line = output_queue_local.get(timeout=1.0)
except queue.Empty:
continue # Timeout, check flag again
line = process.stdout.readline()
if not line:
if msg_type == 'eof':
break # EOF
parsed = parse_tshark_output(line)
+30 -12
View File
@@ -1332,6 +1332,14 @@
totalSignals: 0
};
// XSS protection: Escape HTML special characters
function escapeHtml(text) {
if (text === null || text === undefined) return '';
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
// Band configurations by region
const BAND_CONFIG = {
'Europe': [
@@ -1832,23 +1840,23 @@
<div class="tower-info">
<div class="tower-info-row">
<span class="tower-info-label">Cell ID</span>
<span class="tower-info-value">${tower.cid} ${tower.rogue ? '<span class="tower-rogue-badge">ROGUE</span>' : ''}</span>
<span class="tower-info-value">${escapeHtml(tower.cid)} ${tower.rogue ? '<span class="tower-rogue-badge">ROGUE</span>' : ''}</span>
</div>
<div class="tower-info-row">
<span class="tower-info-label">MCC / MNC</span>
<span class="tower-info-value">${tower.mcc} / ${tower.mnc}</span>
<span class="tower-info-value">${escapeHtml(tower.mcc)} / ${escapeHtml(tower.mnc)}</span>
</div>
<div class="tower-info-row">
<span class="tower-info-label">LAC</span>
<span class="tower-info-value">${tower.lac}</span>
<span class="tower-info-value">${escapeHtml(tower.lac)}</span>
</div>
<div class="tower-info-row">
<span class="tower-info-label">ARFCN</span>
<span class="tower-info-value">${tower.arfcn}</span>
<span class="tower-info-value">${escapeHtml(tower.arfcn)}</span>
</div>
<div class="tower-info-row">
<span class="tower-info-label">Signal (dBm)</span>
<span class="tower-info-value">${tower.signal || 'N/A'}</span>
<span class="tower-info-value">${escapeHtml(tower.signal || 'N/A')}</span>
</div>
<div class="tower-info-row">
<span class="tower-info-label">Location</span>
@@ -1877,14 +1885,14 @@
for (const [key, tower] of Object.entries(towers)) {
const selected = key === selectedTowerKey ? 'selected' : '';
html += `
<div class="list-item ${selected}" onclick="selectTower('${key}')">
<div class="list-item ${selected}" onclick="selectTower('${escapeHtml(key)}')">
<div class="list-item-header">
<span class="list-item-id">CID ${tower.cid}</span>
<span class="list-item-meta">${tower.mcc}-${tower.mnc}</span>
<span class="list-item-id">CID ${escapeHtml(tower.cid)}</span>
<span class="list-item-meta">${escapeHtml(tower.mcc)}-${escapeHtml(tower.mnc)}</span>
${tower.rogue ? '<span class="rogue-indicator"></span>' : ''}
</div>
<div class="list-item-details">
LAC ${tower.lac} | ARFCN ${tower.arfcn} | ${tower.signal || 'N/A'} dBm
LAC ${escapeHtml(tower.lac)} | ARFCN ${escapeHtml(tower.arfcn)} | ${escapeHtml(tower.signal || 'N/A')} dBm
</div>
</div>
`;
@@ -1900,6 +1908,13 @@
const key = data.imsi || data.tmsi || `device_${Date.now()}`;
devices[key] = data;
// Check if device has valid coordinates before creating marker
if (!data.lat || !data.lon) {
console.warn('[GSM SPY] Device has no coordinates, skipping map marker:', key);
updateDevicesList();
return;
}
// Create device marker with vector icon
const marker = L.marker([data.lat, data.lon], {
icon: createGSMMarkerIcon('device', '#00d9ff', false, false)
@@ -1951,14 +1966,17 @@
let html = '';
for (const [key, device] of Object.entries(devices)) {
const identifier = device.imsi || device.tmsi || 'Unknown';
const location = (device.lat && device.lon)
? `${device.lat.toFixed(6)}, ${device.lon.toFixed(6)}`
: 'Location unknown';
html += `
<div class="list-item">
<div class="list-item-header">
<span class="list-item-id">${identifier}</span>
<span class="list-item-id">${escapeHtml(identifier)}</span>
<span class="list-item-meta">${new Date(device.timestamp).toLocaleTimeString()}</span>
</div>
<div class="list-item-details">
Tower CID ${device.cid} | ${device.lat.toFixed(6)}, ${device.lon.toFixed(6)}
Tower CID ${escapeHtml(device.cid)} | ${escapeHtml(location)}
</div>
</div>
`;
@@ -1983,7 +2001,7 @@
<div class="alert-time">${new Date(data.timestamp).toLocaleTimeString()}</div>
<div class="alert-message">
⚠ ROGUE TOWER DETECTED<br>
CID ${data.cid} | MCC ${data.mcc} MNC ${data.mnc} | ${data.reason || 'Unknown threat'}
CID ${escapeHtml(data.cid)} | MCC ${escapeHtml(data.mcc)} MNC ${escapeHtml(data.mnc)} | ${escapeHtml(data.reason || 'Unknown threat')}
</div>
`;
+302
View File
@@ -0,0 +1,302 @@
"""Unit tests for GSM Spy parsing and validation functions."""
import pytest
from routes.gsm_spy import (
parse_grgsm_scanner_output,
parse_tshark_output,
arfcn_to_frequency,
validate_band_names,
REGIONAL_BANDS
)
class TestParseGrgsmScannerOutput:
"""Tests for parse_grgsm_scanner_output()."""
def test_valid_table_row(self):
"""Test parsing a valid scanner output table row."""
line = " 23 | 940.6 | 31245 | 1234 | 214 | 01 | -48"
result = parse_grgsm_scanner_output(line)
assert result is not None
assert result['type'] == 'tower'
assert result['arfcn'] == 23
assert result['frequency'] == 940.6
assert result['cid'] == 31245
assert result['lac'] == 1234
assert result['mcc'] == 214
assert result['mnc'] == 1
assert result['signal_strength'] == -48.0
assert 'timestamp' in result
def test_header_line(self):
"""Test that header lines are skipped."""
line = "ARFCN | Freq (MHz) | CID | LAC | MCC | MNC | Power (dB)"
result = parse_grgsm_scanner_output(line)
assert result is None
def test_separator_line(self):
"""Test that separator lines are skipped."""
line = "--------------------------------------------------------------------"
result = parse_grgsm_scanner_output(line)
assert result is None
def test_progress_line(self):
"""Test that progress lines are skipped."""
line = "Scanning: 50% complete"
result = parse_grgsm_scanner_output(line)
assert result is None
def test_found_line(self):
"""Test that 'Found X towers' lines are skipped."""
line = "Found 5 towers"
result = parse_grgsm_scanner_output(line)
assert result is None
def test_invalid_data(self):
"""Test handling of invalid data."""
line = " abc | xyz | invalid | data | bad | bad | bad"
result = parse_grgsm_scanner_output(line)
assert result is None
def test_empty_line(self):
"""Test handling of empty lines."""
result = parse_grgsm_scanner_output("")
assert result is None
def test_partial_data(self):
"""Test handling of incomplete table rows."""
line = " 23 | 940.6 | 31245" # Missing fields
result = parse_grgsm_scanner_output(line)
assert result is None
class TestParseTsharkOutput:
"""Tests for parse_tshark_output()."""
def test_valid_full_output(self):
"""Test parsing tshark output with all fields."""
line = "5\t0xABCD1234\t123456789012345\t1234\t31245"
result = parse_tshark_output(line)
assert result is not None
assert result['type'] == 'device'
assert result['ta_value'] == 5
assert result['tmsi'] == '0xABCD1234'
assert result['imsi'] == '123456789012345'
assert result['lac'] == 1234
assert result['cid'] == 31245
assert result['distance_meters'] == 5 * 554 # TA * 554 meters
assert 'timestamp' in result
def test_missing_optional_fields(self):
"""Test parsing with missing optional fields (empty tabs)."""
line = "3\t\t\t1234\t31245"
result = parse_tshark_output(line)
assert result is not None
assert result['ta_value'] == 3
assert result['tmsi'] is None
assert result['imsi'] is None
assert result['lac'] == 1234
assert result['cid'] == 31245
def test_no_ta_value(self):
"""Test parsing without TA value (empty field)."""
# When TA is empty, int('') will fail, so the parse returns None
# This is the current behavior - the function expects valid integers or valid empty handling
line = "\t0xABCD1234\t123456789012345\t1234\t31245"
result = parse_tshark_output(line)
# Current implementation will fail to parse this due to int('') failing
assert result is None
def test_invalid_line(self):
"""Test handling of invalid tshark output."""
line = "invalid data"
result = parse_tshark_output(line)
assert result is None
def test_empty_line(self):
"""Test handling of empty lines."""
result = parse_tshark_output("")
assert result is None
def test_partial_fields(self):
"""Test with fewer than 5 fields."""
line = "5\t0xABCD1234" # Only 2 fields
result = parse_tshark_output(line)
assert result is None
class TestArfcnToFrequency:
"""Tests for arfcn_to_frequency()."""
def test_gsm850_arfcn(self):
"""Test ARFCN in GSM850 band."""
# GSM850: ARFCN 128-251, 869-894 MHz
arfcn = 128
freq = arfcn_to_frequency(arfcn)
assert freq == 869000000 # 869 MHz
arfcn = 251
freq = arfcn_to_frequency(arfcn)
assert freq == 893600000 # 893.6 MHz
def test_egsm900_arfcn(self):
"""Test ARFCN in EGSM900 band."""
# EGSM900: ARFCN 0-124, 925-960 MHz
arfcn = 0
freq = arfcn_to_frequency(arfcn)
assert freq == 925000000 # 925 MHz
arfcn = 124
freq = arfcn_to_frequency(arfcn)
assert freq == 949800000 # 949.8 MHz
def test_dcs1800_arfcn(self):
"""Test ARFCN in DCS1800 band."""
# DCS1800: ARFCN 512-885, 1805-1880 MHz
# Note: ARFCN 512 also exists in PCS1900 and will match that first
# Use ARFCN 811+ which is only in DCS1800
arfcn = 811 # Beyond PCS1900 range (512-810)
freq = arfcn_to_frequency(arfcn)
# 811 is ARFCN offset from 512, so freq = 1805MHz + (811-512)*200kHz
expected = 1805000000 + (811 - 512) * 200000
assert freq == expected
arfcn = 885
freq = arfcn_to_frequency(arfcn)
assert freq == 1879600000 # 1879.6 MHz
def test_pcs1900_arfcn(self):
"""Test ARFCN in PCS1900 band."""
# PCS1900: ARFCN 512-810, 1930-1990 MHz
# Note: overlaps with DCS1800 ARFCN range, but different frequencies
arfcn = 512
freq = arfcn_to_frequency(arfcn)
# Will match first band (DCS1800 in Europe config)
assert freq > 0
def test_invalid_arfcn(self):
"""Test ARFCN outside known ranges."""
with pytest.raises(ValueError, match="not found in any known GSM band"):
arfcn_to_frequency(9999)
with pytest.raises(ValueError):
arfcn_to_frequency(-1)
def test_arfcn_200khz_spacing(self):
"""Test that ARFCNs are 200kHz apart."""
arfcn1 = 128
arfcn2 = 129
freq1 = arfcn_to_frequency(arfcn1)
freq2 = arfcn_to_frequency(arfcn2)
assert freq2 - freq1 == 200000 # 200 kHz
class TestValidateBandNames:
"""Tests for validate_band_names()."""
def test_valid_americas_bands(self):
"""Test valid band names for Americas region."""
bands = ['GSM850', 'PCS1900']
result, error = validate_band_names(bands, 'Americas')
assert result == bands
assert error is None
def test_valid_europe_bands(self):
"""Test valid band names for Europe region."""
# Note: Europe uses EGSM900, not GSM900
bands = ['EGSM900', 'DCS1800', 'GSM850', 'GSM800']
result, error = validate_band_names(bands, 'Europe')
assert result == bands
assert error is None
def test_valid_asia_bands(self):
"""Test valid band names for Asia region."""
# Note: Asia uses EGSM900, not GSM900
bands = ['EGSM900', 'DCS1800']
result, error = validate_band_names(bands, 'Asia')
assert result == bands
assert error is None
def test_invalid_band_for_region(self):
"""Test invalid band name for a region."""
bands = ['GSM900', 'INVALID_BAND']
result, error = validate_band_names(bands, 'Americas')
assert result == []
assert error is not None
assert 'Invalid bands' in error
assert 'INVALID_BAND' in error
def test_invalid_region(self):
"""Test invalid region name."""
bands = ['GSM900']
result, error = validate_band_names(bands, 'InvalidRegion')
assert result == []
assert error is not None
assert 'Invalid region' in error
def test_empty_bands_list(self):
"""Test with empty bands list."""
result, error = validate_band_names([], 'Americas')
assert result == []
assert error is None
def test_single_valid_band(self):
"""Test with single valid band."""
bands = ['GSM850']
result, error = validate_band_names(bands, 'Americas')
assert result == ['GSM850']
assert error is None
def test_case_sensitive_band_names(self):
"""Test that band names are case-sensitive."""
bands = ['gsm850'] # lowercase
result, error = validate_band_names(bands, 'Americas')
assert result == []
assert error is not None
def test_multiple_invalid_bands(self):
"""Test with multiple invalid bands."""
bands = ['INVALID1', 'GSM850', 'INVALID2']
result, error = validate_band_names(bands, 'Americas')
assert result == []
assert error is not None
assert 'INVALID1' in error
assert 'INVALID2' in error
class TestRegionalBandsConfig:
"""Tests for REGIONAL_BANDS configuration."""
def test_all_regions_defined(self):
"""Test that all expected regions are defined."""
assert 'Americas' in REGIONAL_BANDS
assert 'Europe' in REGIONAL_BANDS
assert 'Asia' in REGIONAL_BANDS
def test_all_bands_have_required_fields(self):
"""Test that all bands have required configuration fields."""
for region, bands in REGIONAL_BANDS.items():
for band_name, band_config in bands.items():
assert 'start' in band_config
assert 'end' in band_config
assert 'arfcn_start' in band_config
assert 'arfcn_end' in band_config
def test_frequency_ranges_valid(self):
"""Test that frequency ranges are positive and start < end."""
for region, bands in REGIONAL_BANDS.items():
for band_name, band_config in bands.items():
assert band_config['start'] > 0
assert band_config['end'] > 0
assert band_config['start'] < band_config['end']
def test_arfcn_ranges_valid(self):
"""Test that ARFCN ranges are valid."""
for region, bands in REGIONAL_BANDS.items():
for band_name, band_config in bands.items():
assert band_config['arfcn_start'] >= 0
assert band_config['arfcn_end'] >= 0
assert band_config['arfcn_start'] <= band_config['arfcn_end']
+30 -2
View File
@@ -142,7 +142,7 @@ class DataStore:
class CleanupManager:
"""Manages periodic cleanup of multiple data stores."""
"""Manages periodic cleanup of multiple data stores and database tables."""
def __init__(self, interval: float = 60.0):
"""
@@ -152,9 +152,11 @@ class CleanupManager:
interval: Cleanup interval in seconds
"""
self.stores: list[DataStore] = []
self.db_cleanup_funcs: list[tuple[callable, int]] = [] # (func, interval_multiplier)
self.interval = interval
self._timer: threading.Timer | None = None
self._running = False
self._cleanup_count = 0
self._lock = threading.Lock()
def register(self, store: DataStore) -> None:
@@ -169,6 +171,17 @@ class CleanupManager:
if store in self.stores:
self.stores.remove(store)
def register_db_cleanup(self, func: callable, interval_multiplier: int = 60) -> None:
"""
Register a database cleanup function.
Args:
func: Cleanup function to call (should return number of deleted rows)
interval_multiplier: How many cleanup cycles to wait between calls (default: 60 = 1 hour if interval is 60s)
"""
with self._lock:
self.db_cleanup_funcs.append((func, interval_multiplier))
def start(self) -> None:
"""Start the cleanup timer."""
with self._lock:
@@ -194,11 +207,15 @@ class CleanupManager:
self._timer.start()
def _run_cleanup(self) -> None:
"""Run cleanup on all registered stores."""
"""Run cleanup on all registered stores and database tables."""
total_cleaned = 0
# Cleanup in-memory data stores
with self._lock:
stores = list(self.stores)
db_funcs = list(self.db_cleanup_funcs)
self._cleanup_count += 1
current_count = self._cleanup_count
for store in stores:
try:
@@ -206,6 +223,17 @@ class CleanupManager:
except Exception as e:
logger.error(f"Error cleaning up {store.name}: {e}")
# Cleanup database tables (less frequently)
for func, interval_multiplier in db_funcs:
if current_count % interval_multiplier == 0:
try:
deleted = func()
if deleted > 0:
logger.info(f"Database cleanup: {func.__name__} removed {deleted} rows")
total_cleaned += deleted
except Exception as e:
logger.error(f"Error in database cleanup {func.__name__}: {e}")
if total_cleaned > 0:
logger.info(f"Cleanup complete: removed {total_cleaned} stale entries")
+58
View File
@@ -2189,3 +2189,61 @@ def cleanup_old_payloads(max_age_hours: int = 24) -> int:
WHERE received_at < datetime('now', ?)
''', (f'-{max_age_hours} hours',))
return cursor.rowcount
# =============================================================================
# GSM Cleanup Functions
# =============================================================================
def cleanup_old_gsm_signals(max_age_days: int = 60) -> int:
"""
Remove old GSM signal observations (60-day archive).
Args:
max_age_days: Maximum age in days (default: 60)
Returns:
Number of deleted entries
"""
with get_db() as conn:
cursor = conn.execute('''
DELETE FROM gsm_signals
WHERE timestamp < datetime('now', ?)
''', (f'-{max_age_days} days',))
return cursor.rowcount
def cleanup_old_gsm_tmsi_log(max_age_hours: int = 24) -> int:
"""
Remove old TMSI log entries (24-hour buffer for crowd density).
Args:
max_age_hours: Maximum age in hours (default: 24)
Returns:
Number of deleted entries
"""
with get_db() as conn:
cursor = conn.execute('''
DELETE FROM gsm_tmsi_log
WHERE timestamp < datetime('now', ?)
''', (f'-{max_age_hours} hours',))
return cursor.rowcount
def cleanup_old_gsm_velocity_log(max_age_hours: int = 1) -> int:
"""
Remove old velocity log entries (1-hour buffer for movement tracking).
Args:
max_age_hours: Maximum age in hours (default: 1)
Returns:
Number of deleted entries
"""
with get_db() as conn:
cursor = conn.execute('''
DELETE FROM gsm_velocity_log
WHERE timestamp < datetime('now', ?)
''', (f'-{max_age_hours} hours',))
return cursor.rowcount