Remove GSM spy functionality for legal compliance

Remove all GSM cellular intelligence features including tower scanning,
signal monitoring, rogue detection, crowd density analysis, and
OpenCellID integration across routes, templates, utils, tests, and
build configuration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-08 22:04:12 +00:00
parent 2bed35dd64
commit c2891938ab
17 changed files with 21 additions and 6516 deletions

View File

@@ -48,39 +48,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
airspy \
limesuite \
hackrf \
# GSM Intelligence (tshark for packet parsing)
tshark \
# Utilities
curl \
procps \
&& rm -rf /var/lib/apt/lists/*
# GSM Intelligence: gr-gsm (grgsm_scanner, grgsm_livemon)
# Install from apt if available, otherwise build from source
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
gnuradio gr-osmosdr gr-gsm 2>/dev/null \
|| ( \
apt-get install -y --no-install-recommends \
gnuradio gnuradio-dev gr-osmosdr \
git cmake libboost-all-dev libcppunit-dev swig \
doxygen liblog4cpp5-dev python3-scipy python3-numpy \
libvolk-dev libfftw3-dev build-essential \
&& cd /tmp \
&& git clone --depth 1 https://github.com/bkerler/gr-gsm.git \
&& cd gr-gsm \
&& mkdir build && cd build \
&& cmake .. \
&& make -j$(nproc) \
&& make install \
&& ldconfig \
&& rm -rf /tmp/gr-gsm \
&& apt-get remove -y gnuradio-dev libcppunit-dev swig doxygen \
liblog4cpp5-dev libvolk-dev build-essential git cmake \
&& apt-get autoremove -y \
) \
&& rm -rf /var/lib/apt/lists/*
# Build dump1090-fa and acarsdec from source (packages not available in slim repos)
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \

59
app.py
View File

@@ -39,7 +39,6 @@ from utils.constants import (
MAX_VESSEL_AGE_SECONDS,
MAX_DSC_MESSAGE_AGE_SECONDS,
MAX_DEAUTH_ALERTS_AGE_SECONDS,
MAX_GSM_AGE_SECONDS,
QUEUE_MAX_SIZE,
)
import logging
@@ -188,16 +187,6 @@ deauth_detector = None
deauth_detector_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
deauth_detector_lock = threading.Lock()
# GSM Spy
gsm_spy_scanner_running = False # Flag: scanner thread active
gsm_spy_livemon_process = None # For grgsm_livemon process
gsm_spy_monitor_process = None # For tshark monitoring process
gsm_spy_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)
gsm_spy_lock = threading.Lock()
gsm_spy_active_device = None
gsm_spy_selected_arfcn = None
gsm_spy_region = 'Americas' # Default band
# ============================================
# GLOBAL STATE DICTIONARIES
# ============================================
@@ -230,16 +219,6 @@ dsc_messages = DataStore(max_age_seconds=MAX_DSC_MESSAGE_AGE_SECONDS, name='dsc_
# Deauth alerts - using DataStore for automatic cleanup
deauth_alerts = DataStore(max_age_seconds=MAX_DEAUTH_ALERTS_AGE_SECONDS, name='deauth_alerts')
# GSM Spy data stores
gsm_spy_towers = DataStore(
max_age_seconds=MAX_GSM_AGE_SECONDS,
name='gsm_spy_towers'
)
gsm_spy_devices = DataStore(
max_age_seconds=MAX_GSM_AGE_SECONDS,
name='gsm_spy_devices'
)
# Satellite state
satellite_passes = [] # Predicted satellite passes (not auto-cleaned, calculated)
@@ -252,8 +231,6 @@ cleanup_manager.register(adsb_aircraft)
cleanup_manager.register(ais_vessels)
cleanup_manager.register(dsc_messages)
cleanup_manager.register(deauth_alerts)
cleanup_manager.register(gsm_spy_towers)
cleanup_manager.register(gsm_spy_devices)
# ============================================
# SDR DEVICE REGISTRY
@@ -687,8 +664,6 @@ def kill_all() -> Response:
global current_process, sensor_process, wifi_process, adsb_process, ais_process, acars_process
global aprs_process, aprs_rtl_process, dsc_process, dsc_rtl_process, bt_process
global dmr_process, dmr_rtl_process
global gsm_spy_livemon_process, gsm_spy_monitor_process
global gsm_spy_scanner_running, gsm_spy_active_device, gsm_spy_selected_arfcn, gsm_spy_region
# Import adsb and ais modules to reset their state
from routes import adsb as adsb_module
@@ -701,8 +676,7 @@ def kill_all() -> Response:
'airodump-ng', 'aireplay-ng', 'airmon-ng',
'dump1090', 'acarsdec', 'direwolf', 'AIS-catcher',
'hcitool', 'bluetoothctl', 'dsd',
'rtl_tcp', 'rtl_power', 'rtlamr', 'ffmpeg',
'grgsm_scanner', 'grgsm_livemon', 'tshark'
'rtl_tcp', 'rtl_power', 'rtlamr', 'ffmpeg'
]
for proc in processes_to_kill:
@@ -771,29 +745,6 @@ def kill_all() -> Response:
except Exception:
pass
# Reset GSM Spy state
with gsm_spy_lock:
gsm_spy_scanner_running = False
gsm_spy_active_device = None
gsm_spy_selected_arfcn = None
gsm_spy_region = 'Americas'
if gsm_spy_livemon_process:
try:
if safe_terminate(gsm_spy_livemon_process):
killed.append('grgsm_livemon')
except Exception:
pass
gsm_spy_livemon_process = None
if gsm_spy_monitor_process:
try:
if safe_terminate(gsm_spy_monitor_process):
killed.append('tshark')
except Exception:
pass
gsm_spy_monitor_process = None
# Clear SDR device registry
with sdr_device_registry_lock:
sdr_device_registry.clear()
@@ -885,19 +836,11 @@ def main() -> None:
# Register database cleanup functions
from utils.database import (
cleanup_old_gsm_signals,
cleanup_old_gsm_tmsi_log,
cleanup_old_gsm_velocity_log,
cleanup_old_signal_history,
cleanup_old_timeline_entries,
cleanup_old_dsc_alerts,
cleanup_old_payloads
)
# GSM cleanups: signals (60 days), TMSI log (24 hours), velocity (1 hour)
# Interval multiplier: cleanup every N cycles (60s interval = 1 cleanup per hour at multiplier 60)
cleanup_manager.register_db_cleanup(cleanup_old_gsm_tmsi_log, interval_multiplier=60) # Every hour
cleanup_manager.register_db_cleanup(cleanup_old_gsm_velocity_log, interval_multiplier=60) # Every hour
cleanup_manager.register_db_cleanup(cleanup_old_gsm_signals, interval_multiplier=1440) # Every 24 hours
cleanup_manager.register_db_cleanup(cleanup_old_signal_history, interval_multiplier=1440) # Every 24 hours
cleanup_manager.register_db_cleanup(cleanup_old_timeline_entries, interval_multiplier=1440) # Every 24 hours
cleanup_manager.register_db_cleanup(cleanup_old_dsc_alerts, interval_multiplier=1440) # Every 24 hours

View File

@@ -204,25 +204,20 @@ SATELLITE_UPDATE_INTERVAL = _get_env_int('SATELLITE_UPDATE_INTERVAL', 30)
SATELLITE_TRAJECTORY_POINTS = _get_env_int('SATELLITE_TRAJECTORY_POINTS', 30)
SATELLITE_ORBIT_MINUTES = _get_env_int('SATELLITE_ORBIT_MINUTES', 45)
# Update checking
GITHUB_REPO = _get_env('GITHUB_REPO', 'smittix/intercept')
UPDATE_CHECK_ENABLED = _get_env_bool('UPDATE_CHECK_ENABLED', True)
UPDATE_CHECK_INTERVAL_HOURS = _get_env_int('UPDATE_CHECK_INTERVAL_HOURS', 6)
# Alerting
ALERT_WEBHOOK_URL = _get_env('ALERT_WEBHOOK_URL', '')
ALERT_WEBHOOK_SECRET = _get_env('ALERT_WEBHOOK_SECRET', '')
ALERT_WEBHOOK_TIMEOUT = _get_env_int('ALERT_WEBHOOK_TIMEOUT', 5)
# Admin credentials
ADMIN_USERNAME = _get_env('ADMIN_USERNAME', 'admin')
ADMIN_PASSWORD = _get_env('ADMIN_PASSWORD', 'admin')
# Update checking
GITHUB_REPO = _get_env('GITHUB_REPO', 'smittix/intercept')
UPDATE_CHECK_ENABLED = _get_env_bool('UPDATE_CHECK_ENABLED', True)
UPDATE_CHECK_INTERVAL_HOURS = _get_env_int('UPDATE_CHECK_INTERVAL_HOURS', 6)
# Alerting
ALERT_WEBHOOK_URL = _get_env('ALERT_WEBHOOK_URL', '')
ALERT_WEBHOOK_SECRET = _get_env('ALERT_WEBHOOK_SECRET', '')
ALERT_WEBHOOK_TIMEOUT = _get_env_int('ALERT_WEBHOOK_TIMEOUT', 5)
# Admin credentials
ADMIN_USERNAME = _get_env('ADMIN_USERNAME', 'admin')
ADMIN_PASSWORD = _get_env('ADMIN_PASSWORD', 'admin')
# GSM Spy settings
GSM_OPENCELLID_API_KEY = _get_env('GSM_OPENCELLID_API_KEY', '')
GSM_OPENCELLID_API_URL = _get_env('GSM_OPENCELLID_API_URL', 'https://opencellid.org/cell/get')
GSM_API_DAILY_LIMIT = _get_env_int('GSM_API_DAILY_LIMIT', 1000)
GSM_TA_METERS_PER_UNIT = _get_env_int('GSM_TA_METERS_PER_UNIT', 554)
def configure_logging() -> None:
"""Configure application logging."""

View File

@@ -31,7 +31,6 @@ def register_blueprints(app):
from .websdr import websdr_bp
from .alerts import alerts_bp
from .recordings import recordings_bp
from .gsm_spy import gsm_spy_bp
app.register_blueprint(pager_bp)
app.register_blueprint(sensor_bp)
@@ -62,7 +61,6 @@ def register_blueprints(app):
app.register_blueprint(websdr_bp) # HF/Shortwave WebSDR
app.register_blueprint(alerts_bp) # Cross-mode alerts
app.register_blueprint(recordings_bp) # Session recordings
app.register_blueprint(gsm_spy_bp) # GSM cellular intelligence
# Initialize TSCM state with queue and lock from app
import app as app_module

File diff suppressed because it is too large Load Diff

127
setup.sh
View File

@@ -243,12 +243,6 @@ check_tools() {
check_required "hcitool" "Bluetooth scan utility" hcitool
check_required "hciconfig" "Bluetooth adapter config" hciconfig
echo
info "GSM Intelligence:"
check_recommended "grgsm_scanner" "GSM tower scanner (gr-gsm)" grgsm_scanner
check_recommended "grgsm_livemon" "GSM live monitor (gr-gsm)" grgsm_livemon
check_recommended "tshark" "Packet analysis (Wireshark)" tshark
echo
info "SoapySDR:"
check_required "SoapySDRUtil" "SoapySDR CLI utility" SoapySDRUtil
@@ -713,47 +707,6 @@ install_macos_packages() {
progress "Installing gpsd"
brew_install gpsd
# gr-gsm for GSM Intelligence
progress "Installing gr-gsm"
if ! cmd_exists grgsm_scanner; then
brew_install gnuradio
(brew_install gr-gsm) || {
warn "gr-gsm not available in Homebrew, building from source..."
(
tmp_dir="$(mktemp -d)"
trap 'rm -rf "$tmp_dir"' EXIT
info "Cloning gr-gsm repository..."
git clone --depth 1 https://github.com/bkerler/gr-gsm.git "$tmp_dir/gr-gsm" >/dev/null 2>&1 \
|| { warn "Failed to clone gr-gsm. GSM Spy feature will not work."; exit 1; }
cd "$tmp_dir/gr-gsm"
mkdir -p build && cd build
info "Compiling gr-gsm (this may take several minutes)..."
if cmake .. >/dev/null 2>&1 && make -j$(sysctl -n hw.ncpu) >/dev/null 2>&1; then
if [[ -w /usr/local/lib ]]; then
make install >/dev/null 2>&1
else
sudo make install >/dev/null 2>&1
fi
ok "gr-gsm installed successfully from source"
else
warn "Failed to build gr-gsm. GSM Spy feature will not work."
fi
)
}
else
ok "gr-gsm already installed"
fi
# Wireshark (tshark) for GSM packet analysis
progress "Installing tshark"
if ! cmd_exists tshark; then
brew_install wireshark
else
ok "tshark already installed"
fi
progress "Installing Ubertooth tools (optional)"
if ! cmd_exists ubertooth-btle; then
echo
@@ -1164,82 +1117,6 @@ install_debian_packages() {
progress "Installing gpsd"
apt_install gpsd gpsd-clients || true
# gr-gsm for GSM Intelligence
progress "Installing GNU Radio and gr-gsm"
if ! cmd_exists grgsm_scanner; then
# Try to install gr-gsm directly from package repositories
apt_install gnuradio gnuradio-dev gr-osmosdr gr-gsm || {
warn "gr-gsm package not available in repositories. Attempting source build..."
# Fallback: Build from source
progress "Building gr-gsm from source"
apt_install git cmake libboost-all-dev libcppunit-dev swig \
doxygen liblog4cpp5-dev python3-scipy python3-numpy \
libvolk-dev libuhd-dev libfftw3-dev || true
info "Cloning gr-gsm repository..."
if [ -d /tmp/gr-gsm ]; then
rm -rf /tmp/gr-gsm
fi
git clone https://github.com/bkerler/gr-gsm.git /tmp/gr-gsm || {
warn "Failed to clone gr-gsm repository. GSM Spy will not be available."
return 0
}
cd /tmp/gr-gsm
mkdir -p build && cd build
# Try to find GNU Radio cmake files
if [ -d /usr/lib/x86_64-linux-gnu/cmake/gnuradio ]; then
export CMAKE_PREFIX_PATH="/usr/lib/x86_64-linux-gnu/cmake/gnuradio:$CMAKE_PREFIX_PATH"
fi
info "Running CMake configuration..."
if cmake .. 2>/dev/null; then
info "Compiling gr-gsm (this may take several minutes)..."
if make -j$(nproc) 2>/dev/null; then
$SUDO make install
$SUDO ldconfig
cd ~
rm -rf /tmp/gr-gsm
ok "gr-gsm built and installed successfully"
else
warn "gr-gsm compilation failed. GSM Spy feature will not work."
cd ~
rm -rf /tmp/gr-gsm
fi
else
warn "gr-gsm CMake configuration failed. GNU Radio 3.8+ may not be available."
cd ~
rm -rf /tmp/gr-gsm
fi
}
# Verify installation
if cmd_exists grgsm_scanner; then
ok "gr-gsm installed successfully"
else
warn "gr-gsm installation incomplete. GSM Spy feature will not work."
fi
else
ok "gr-gsm already installed"
fi
# Wireshark (tshark) for GSM packet analysis
progress "Installing tshark"
if ! cmd_exists tshark; then
# Pre-accept non-root capture prompt for non-interactive install
echo 'wireshark-common wireshark-common/install-setuid boolean true' | $SUDO debconf-set-selections
apt_install tshark || true
# Allow non-root capture
$SUDO dpkg-reconfigure wireshark-common 2>/dev/null || true
$SUDO usermod -a -G wireshark $USER 2>/dev/null || true
ok "tshark installed. You may need to re-login for wireshark group permissions."
else
ok "tshark already installed"
fi
progress "Installing Python packages"
apt_install python3-venv python3-pip || true
# Install Python packages via apt (more reliable than pip on modern Debian/Ubuntu)
@@ -1327,7 +1204,7 @@ final_summary_and_hard_fail() {
warn "Missing RECOMMENDED tools (some features will not work):"
for t in "${missing_recommended[@]}"; do echo " - $t"; done
echo
warn "Install these for full functionality (GSM Intelligence, etc.)"
warn "Install these for full functionality"
fi
}
@@ -1376,7 +1253,7 @@ main() {
install_python_deps
# Download leaflet-heat plugin for GSM heatmap (offline mode)
# Download leaflet-heat plugin (offline mode)
if [ ! -f "static/vendor/leaflet-heat/leaflet-heat.js" ]; then
info "Downloading leaflet-heat plugin..."
mkdir -p static/vendor/leaflet-heat

View File

@@ -946,32 +946,9 @@ function loadApiKeyStatus() {
if (!badge) return;
fetch('/gsm_spy/settings/api_key')
.then(r => r.json())
.then(data => {
if (data.configured) {
badge.textContent = 'Configured';
badge.className = 'asset-badge available';
desc.textContent = 'Source: ' + (data.source === 'env' ? 'Environment variable' : 'Database');
} else {
badge.textContent = 'Not configured';
badge.className = 'asset-badge missing';
desc.textContent = 'No API key set';
}
if (usage) {
usage.textContent = (data.usage_today || 0) + ' / ' + (data.api_limit || 1000);
}
if (bar) {
const pct = Math.min(100, ((data.usage_today || 0) / (data.api_limit || 1000)) * 100);
bar.style.width = pct + '%';
bar.style.background = pct > 90 ? 'var(--accent-red)' : pct > 70 ? 'var(--accent-yellow)' : 'var(--accent-cyan)';
}
})
.catch(() => {
badge.textContent = 'Error';
badge.className = 'asset-badge missing';
desc.textContent = 'Could not load status';
});
badge.textContent = 'Not available';
badge.className = 'asset-badge missing';
desc.textContent = 'GSM feature removed';
}
/**
@@ -994,30 +971,8 @@ function saveApiKey() {
result.style.color = 'var(--text-dim)';
result.textContent = 'Saving...';
fetch('/gsm_spy/settings/api_key', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ key: key })
})
.then(r => r.json())
.then(data => {
if (data.error) {
result.style.color = 'var(--accent-red)';
result.textContent = data.error;
} else {
result.style.color = 'var(--accent-green)';
result.textContent = 'API key saved successfully.';
input.value = '';
loadApiKeyStatus();
// Hide the banner if visible
const banner = document.getElementById('apiKeyBanner');
if (banner) banner.style.display = 'none';
}
})
.catch(() => {
result.style.color = 'var(--accent-red)';
result.textContent = 'Error saving API key.';
});
result.style.color = 'var(--accent-red)';
result.textContent = 'GSM feature has been removed.';
}
/**

File diff suppressed because it is too large Load Diff

View File

@@ -171,10 +171,6 @@
<span class="mode-icon icon"><svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M3 18l2 2h14l2-2"/><path d="M5 18v-4a2 2 0 0 1 2-2h10a2 2 0 0 1 2 2v4"/><path d="M12 12V6"/></svg></span>
<span class="mode-name">Vessels</span>
</a>
<a href="/gsm_spy/dashboard" class="mode-card mode-card-sm" style="text-decoration: none;">
<span class="mode-icon icon"><svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><rect x="5" y="2" width="14" height="20" rx="2" ry="2"/><line x1="12" y1="18" x2="12.01" y2="18"/><path d="M8 6h8M8 10h8M8 14h8"/></svg></span>
<span class="mode-name">GSM SPY</span>
</a>
<button class="mode-card mode-card-sm" onclick="selectMode('aprs')">
<span class="mode-icon icon"><svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M20 10c0 6-8 12-8 12s-8-6-8-12a8 8 0 0 1 16 0Z"/><circle cx="12" cy="10" r="3"/></svg></span>
<span class="mode-name">APRS</span>

View File

@@ -67,7 +67,6 @@
{{ mode_item('rtlamr', 'Meters', '<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M22 12h-4l-3 9L9 3l-3 9H2"/></svg>') }}
{{ mode_item('adsb', 'Aircraft', '<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M21 16v-2l-8-5V3.5a1.5 1.5 0 0 0-3 0V9l-8 5v2l8-2.5V19l-2 1.5V22l3.5-1 3.5 1v-1.5L13 19v-5.5l8 2.5z"/></svg>', '/adsb/dashboard') }}
{{ mode_item('ais', 'Vessels', '<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M3 18l2 2h14l2-2"/><path d="M5 18v-4a2 2 0 0 1 2-2h10a2 2 0 0 1 2 2v4"/><path d="M12 12V6"/><path d="M12 6l4 3"/></svg>', '/ais/dashboard') }}
{{ mode_item('gsm', 'GSM SPY', '<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><rect x="5" y="2" width="14" height="20" rx="2" ry="2"/><line x1="12" y1="18" x2="12.01" y2="18"/><path d="M8 6h8M8 10h8M8 14h8"/></svg>', '/gsm_spy/dashboard') }}
{{ mode_item('aprs', 'APRS', '<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M20 10c0 6-8 12-8 12s-8-6-8-12a8 8 0 0 1 16 0Z"/><circle cx="12" cy="10" r="3"/></svg>') }}
{{ mode_item('listening', 'Listening Post', '<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><rect x="3" y="3" width="18" height="18" rx="2"/><path d="M3 9h18"/><path d="M9 21V9"/></svg>') }}
{{ mode_item('spystations', 'Spy Stations', '<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M4.9 19.1C1 15.2 1 8.8 4.9 4.9"/><path d="M7.8 16.2c-2.3-2.3-2.3-6.1 0-8.5"/><circle cx="12" cy="12" r="2"/><path d="M16.2 7.8c2.3 2.3 2.3 6.1 0 8.5"/><path d="M19.1 4.9C23 8.8 23 15.1 19.1 19"/></svg>') }}

View File

@@ -17,7 +17,6 @@
<button class="settings-tab" data-tab="tools" onclick="switchSettingsTab('tools')">Tools</button>
<button class="settings-tab" data-tab="alerts" onclick="switchSettingsTab('alerts')">Alerts</button>
<button class="settings-tab" data-tab="recording" onclick="switchSettingsTab('recording')">Recording</button>
<button class="settings-tab" data-tab="apikeys" onclick="switchSettingsTab('apikeys')">API Keys</button>
<button class="settings-tab" data-tab="about" onclick="switchSettingsTab('about')">About</button>
</div>
@@ -360,70 +359,6 @@
</div>
</div>
<!-- API Keys Section -->
<div id="settings-apikeys" class="settings-section">
<div class="settings-group">
<div class="settings-group-title">OpenCellID API Key</div>
<p style="color: var(--text-dim); margin-bottom: 15px; font-size: 12px;">
Required for GSM cell tower geolocation. Get a free key at
<a href="https://opencellid.org/register" target="_blank" style="color: var(--accent-cyan);">opencellid.org/register</a>
(1,000 lookups/day).
</p>
<div class="settings-row">
<div class="settings-label">
<span class="settings-label-text">Status</span>
<span class="settings-label-desc" id="apiKeyStatusDesc">Checking...</span>
</div>
<span id="apiKeyStatusBadge" class="asset-badge checking">Checking...</span>
</div>
<div class="settings-row" style="flex-wrap: wrap; gap: 8px;">
<div class="settings-label" style="width: 100%;">
<span class="settings-label-text">API Key</span>
<span class="settings-label-desc">Paste your OpenCellID API token</span>
</div>
<div style="display: flex; gap: 8px; width: 100%;">
<input type="password" id="apiKeyInput" class="settings-input"
placeholder="Enter your OpenCellID API key"
style="flex: 1; font-family: var(--font-mono); font-size: 11px;">
<button class="check-assets-btn" onclick="toggleApiKeyVisibility()" style="padding: 6px 10px; font-size: 11px;" title="Show/Hide">
<svg id="apiKeyEyeIcon" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" style="width: 14px; height: 14px;">
<path d="M1 12s4-8 11-8 11 8 11 8-4 8-11 8-11-8-11-8z"/>
<circle cx="12" cy="12" r="3"/>
</svg>
</button>
</div>
</div>
<div style="display: flex; gap: 10px; margin-top: 12px;">
<button class="check-assets-btn" onclick="saveApiKey()" style="flex: 1; background: var(--accent-cyan); color: #000;">
Save Key
</button>
</div>
<div id="apiKeySaveResult" style="margin-top: 10px; font-size: 11px; display: none;"></div>
</div>
<div class="settings-group">
<div class="settings-group-title">Usage Today</div>
<div style="padding: 12px; background: var(--bg-tertiary); border-radius: 6px; font-family: var(--font-mono); font-size: 12px;">
<div style="display: flex; justify-content: space-between; margin-bottom: 6px;">
<span style="color: var(--text-dim);">API Calls</span>
<span id="apiKeyUsageCount" style="color: var(--accent-cyan);">-- / --</span>
</div>
<div style="background: var(--bg-dark); border-radius: 3px; height: 6px; overflow: hidden; margin-top: 4px;">
<div id="apiKeyUsageBar" style="height: 100%; background: var(--accent-cyan); width: 0%; transition: width 0.3s;"></div>
</div>
</div>
</div>
<div class="settings-info">
<strong>Note:</strong> The environment variable <code>INTERCEPT_GSM_OPENCELLID_API_KEY</code> takes priority over the saved key.
Keys saved here persist across restarts.
</div>
</div>
<!-- About Section -->
<div id="settings-about" class="settings-section">
<div class="settings-group">

View File

@@ -1,360 +0,0 @@
"""Unit tests for GSM Spy parsing and validation functions."""
import pytest
from routes.gsm_spy import (
parse_grgsm_scanner_output,
parse_tshark_output,
arfcn_to_frequency,
validate_band_names,
REGIONAL_BANDS
)
class TestParseGrgsmScannerOutput:
"""Tests for parse_grgsm_scanner_output()."""
def test_valid_output_line(self):
"""Test parsing a valid grgsm_scanner output line."""
line = "ARFCN: 23, Freq: 940.6M, CID: 31245, LAC: 1234, MCC: 214, MNC: 01, Pwr: -48"
result = parse_grgsm_scanner_output(line)
assert result is not None
assert result['type'] == 'tower'
assert result['arfcn'] == 23
assert result['frequency'] == 940.6
assert result['cid'] == 31245
assert result['lac'] == 1234
assert result['mcc'] == 214
assert result['mnc'] == 1
assert result['signal_strength'] == -48.0
assert 'timestamp' in result
def test_freq_without_suffix(self):
"""Test parsing frequency without M suffix."""
line = "ARFCN: 975, Freq: 925.2, CID: 13522, LAC: 38722, MCC: 262, MNC: 1, Pwr: -58"
result = parse_grgsm_scanner_output(line)
assert result is not None
assert result['frequency'] == 925.2
def test_config_line(self):
"""Test that configuration lines are skipped."""
line = " Configuration: 1 CCCH, not combined"
result = parse_grgsm_scanner_output(line)
assert result is None
def test_neighbour_line(self):
"""Test that neighbour cell lines are skipped."""
line = " Neighbour Cells: 57, 61, 70, 71, 72, 86"
result = parse_grgsm_scanner_output(line)
assert result is None
def test_cell_arfcn_line(self):
"""Test that cell ARFCN lines are skipped."""
line = " Cell ARFCNs: 63, 76"
result = parse_grgsm_scanner_output(line)
assert result is None
def test_progress_line(self):
"""Test that progress/status lines are skipped."""
line = "Scanning GSM900 band..."
result = parse_grgsm_scanner_output(line)
assert result is None
def test_empty_line(self):
"""Test handling of empty lines."""
result = parse_grgsm_scanner_output("")
assert result is None
def test_invalid_data(self):
"""Test handling of non-numeric values."""
line = "ARFCN: abc, Freq: xyz, CID: bad, LAC: data, MCC: bad, MNC: bad, Pwr: bad"
result = parse_grgsm_scanner_output(line)
assert result is None
def test_no_identity_filtered(self):
"""Test that MCC=0/MNC=0 entries (no network identity) are filtered out."""
line = "ARFCN: 115, Freq: 925.0M, CID: 0, LAC: 0, MCC: 0, MNC: 0, Pwr: -100"
result = parse_grgsm_scanner_output(line)
assert result is None
def test_mcc_zero_mnc_zero_filtered(self):
"""Test that MCC=0/MNC=0 even with valid CID is filtered out."""
line = "ARFCN: 113, Freq: 924.6M, CID: 1234, LAC: 5678, MCC: 0, MNC: 0, Pwr: -90"
result = parse_grgsm_scanner_output(line)
assert result is None
def test_cid_zero_valid_mcc_passes(self):
"""Test that CID=0 with valid MCC/MNC passes (partially decoded cell)."""
line = "ARFCN: 115, Freq: 958.0M, CID: 0, LAC: 21864, MCC: 234, MNC: 10, Pwr: -51"
result = parse_grgsm_scanner_output(line)
assert result is not None
assert result['cid'] == 0
assert result['mcc'] == 234
assert result['signal_strength'] == -51.0
def test_valid_cid_nonzero(self):
"""Test that valid non-zero CID/MCC entries pass through."""
line = "ARFCN: 115, Freq: 925.0M, CID: 19088, LAC: 21864, MCC: 234, MNC: 10, Pwr: -58"
result = parse_grgsm_scanner_output(line)
assert result is not None
assert result['cid'] == 19088
assert result['signal_strength'] == -58.0
class TestParseTsharkOutput:
"""Tests for parse_tshark_output()."""
def test_valid_full_output(self):
"""Test parsing tshark output with all fields."""
line = "5\t0xABCD1234\t123456789012345\t1234\t31245"
result = parse_tshark_output(line)
assert result is not None
assert result['type'] == 'device'
assert result['ta_value'] == 5
assert result['tmsi'] == '0xABCD1234'
assert result['imsi'] == '123456789012345'
assert result['lac'] == 1234
assert result['cid'] == 31245
assert result['distance_meters'] == 5 * 554 # TA * 554 meters
assert 'timestamp' in result
def test_missing_optional_fields(self):
"""Test parsing with missing optional fields (empty tabs).
A packet with TA but no TMSI/IMSI is discarded since there's
no device identifier to track.
"""
line = "3\t\t\t1234\t31245"
result = parse_tshark_output(line)
assert result is None
def test_missing_optional_fields_with_tmsi(self):
"""Test parsing with TMSI but missing TA, IMSI, CID."""
line = "\t0xABCD\t\t1234\t"
result = parse_tshark_output(line)
assert result is not None
assert result['ta_value'] is None
assert result['tmsi'] == '0xABCD'
assert result['imsi'] is None
assert result['lac'] == 1234
assert result['cid'] is None
def test_no_ta_value(self):
"""Test parsing without TA value (empty first field)."""
line = "\t0xABCD1234\t123456789012345\t1234\t31245"
result = parse_tshark_output(line)
assert result is not None
assert result['ta_value'] is None
assert result['tmsi'] == '0xABCD1234'
assert result['imsi'] == '123456789012345'
assert result['lac'] == 1234
assert result['cid'] == 31245
def test_invalid_line(self):
"""Test handling of invalid tshark output."""
line = "invalid data"
result = parse_tshark_output(line)
assert result is None
def test_empty_line(self):
"""Test handling of empty lines."""
result = parse_tshark_output("")
assert result is None
def test_partial_fields(self):
"""Test with fewer than 5 fields."""
line = "5\t0xABCD1234" # Only 2 fields
result = parse_tshark_output(line)
assert result is None
class TestArfcnToFrequency:
"""Tests for arfcn_to_frequency()."""
def test_gsm850_arfcn(self):
"""Test ARFCN in GSM850 band."""
# GSM850: ARFCN 128-251, 869-894 MHz
arfcn = 128
freq = arfcn_to_frequency(arfcn)
assert freq == 869000000 # 869 MHz
arfcn = 251
freq = arfcn_to_frequency(arfcn)
assert freq == 893600000 # 893.6 MHz
def test_egsm900_arfcn(self):
"""Test ARFCN in EGSM900 band."""
# EGSM900: ARFCN 0-124, DL = 935 + 0.2*ARFCN MHz
arfcn = 0
freq = arfcn_to_frequency(arfcn)
assert freq == 935000000 # 935.0 MHz
arfcn = 22
freq = arfcn_to_frequency(arfcn)
assert freq == 939400000 # 939.4 MHz
arfcn = 124
freq = arfcn_to_frequency(arfcn)
assert freq == 959800000 # 959.8 MHz
def test_egsm900_ext_arfcn(self):
"""Test ARFCN in EGSM900 extension band."""
# EGSM900_EXT: ARFCN 975-1023, DL = 925.2 + 0.2*(ARFCN-975) MHz
arfcn = 975
freq = arfcn_to_frequency(arfcn)
assert freq == 925200000 # 925.2 MHz
arfcn = 1023
freq = arfcn_to_frequency(arfcn)
assert freq == 934800000 # 934.8 MHz
def test_dcs1800_arfcn(self):
"""Test ARFCN in DCS1800 band."""
# DCS1800: ARFCN 512-885, 1805-1880 MHz
# Note: ARFCN 512 also exists in PCS1900 and will match that first
# Use ARFCN 811+ which is only in DCS1800
arfcn = 811 # Beyond PCS1900 range (512-810)
freq = arfcn_to_frequency(arfcn)
# 811 is ARFCN offset from 512, so freq = 1805MHz + (811-512)*200kHz
expected = 1805000000 + (811 - 512) * 200000
assert freq == expected
arfcn = 885
freq = arfcn_to_frequency(arfcn)
assert freq == 1879600000 # 1879.6 MHz
def test_pcs1900_arfcn(self):
"""Test ARFCN in PCS1900 band."""
# PCS1900: ARFCN 512-810, 1930-1990 MHz
# Note: overlaps with DCS1800 ARFCN range, but different frequencies
arfcn = 512
freq = arfcn_to_frequency(arfcn)
# Will match first band (DCS1800 in Europe config)
assert freq > 0
def test_invalid_arfcn(self):
"""Test ARFCN outside known ranges."""
with pytest.raises(ValueError, match="not found in any known GSM band"):
arfcn_to_frequency(9999)
with pytest.raises(ValueError):
arfcn_to_frequency(-1)
def test_arfcn_200khz_spacing(self):
"""Test that ARFCNs are 200kHz apart."""
arfcn1 = 128
arfcn2 = 129
freq1 = arfcn_to_frequency(arfcn1)
freq2 = arfcn_to_frequency(arfcn2)
assert freq2 - freq1 == 200000 # 200 kHz
class TestValidateBandNames:
"""Tests for validate_band_names()."""
def test_valid_americas_bands(self):
"""Test valid band names for Americas region."""
bands = ['GSM850', 'PCS1900']
result, error = validate_band_names(bands, 'Americas')
assert result == bands
assert error is None
def test_valid_europe_bands(self):
"""Test valid band names for Europe region."""
# Note: Europe uses EGSM900, not GSM900
bands = ['EGSM900', 'DCS1800', 'GSM850', 'GSM800']
result, error = validate_band_names(bands, 'Europe')
assert result == bands
assert error is None
def test_valid_asia_bands(self):
"""Test valid band names for Asia region."""
# Note: Asia uses EGSM900, not GSM900
bands = ['EGSM900', 'DCS1800']
result, error = validate_band_names(bands, 'Asia')
assert result == bands
assert error is None
def test_invalid_band_for_region(self):
"""Test invalid band name for a region."""
bands = ['GSM900', 'INVALID_BAND']
result, error = validate_band_names(bands, 'Americas')
assert result == []
assert error is not None
assert 'Invalid bands' in error
assert 'INVALID_BAND' in error
def test_invalid_region(self):
"""Test invalid region name."""
bands = ['GSM900']
result, error = validate_band_names(bands, 'InvalidRegion')
assert result == []
assert error is not None
assert 'Invalid region' in error
def test_empty_bands_list(self):
"""Test with empty bands list."""
result, error = validate_band_names([], 'Americas')
assert result == []
assert error is None
def test_single_valid_band(self):
"""Test with single valid band."""
bands = ['GSM850']
result, error = validate_band_names(bands, 'Americas')
assert result == ['GSM850']
assert error is None
def test_case_sensitive_band_names(self):
"""Test that band names are case-sensitive."""
bands = ['gsm850'] # lowercase
result, error = validate_band_names(bands, 'Americas')
assert result == []
assert error is not None
def test_multiple_invalid_bands(self):
"""Test with multiple invalid bands."""
bands = ['INVALID1', 'GSM850', 'INVALID2']
result, error = validate_band_names(bands, 'Americas')
assert result == []
assert error is not None
assert 'INVALID1' in error
assert 'INVALID2' in error
class TestRegionalBandsConfig:
"""Tests for REGIONAL_BANDS configuration."""
def test_all_regions_defined(self):
"""Test that all expected regions are defined."""
assert 'Americas' in REGIONAL_BANDS
assert 'Europe' in REGIONAL_BANDS
assert 'Asia' in REGIONAL_BANDS
def test_all_bands_have_required_fields(self):
"""Test that all bands have required configuration fields."""
for region, bands in REGIONAL_BANDS.items():
for band_name, band_config in bands.items():
assert 'start' in band_config
assert 'end' in band_config
assert 'arfcn_start' in band_config
assert 'arfcn_end' in band_config
def test_frequency_ranges_valid(self):
"""Test that frequency ranges are positive and start < end."""
for region, bands in REGIONAL_BANDS.items():
for band_name, band_config in bands.items():
assert band_config['start'] > 0
assert band_config['end'] > 0
assert band_config['start'] < band_config['end']
def test_arfcn_ranges_valid(self):
"""Test that ARFCN ranges are valid."""
for region, bands in REGIONAL_BANDS.items():
for band_name, band_config in bands.items():
assert band_config['arfcn_start'] >= 0
assert band_config['arfcn_end'] >= 0
assert band_config['arfcn_start'] <= band_config['arfcn_end']

View File

@@ -275,13 +275,3 @@ MAX_DEAUTH_ALERTS_AGE_SECONDS = 300 # 5 minutes
# Deauth detector sniff timeout (seconds)
DEAUTH_SNIFF_TIMEOUT = 0.5
# =============================================================================
# GSM SPY (Cellular Intelligence)
# =============================================================================
# Maximum age for GSM tower/device data in DataStore (seconds)
MAX_GSM_AGE_SECONDS = 300 # 5 minutes
# Timing Advance conversion to meters
GSM_TA_METERS_PER_UNIT = 554

View File

@@ -453,134 +453,6 @@ def init_db() -> None:
ON tscm_cases(status, created_at)
''')
# =====================================================================
# GSM (Global System for Mobile) Intelligence Tables
# =====================================================================
# gsm_cells - Known cell towers (OpenCellID cache)
conn.execute('''
CREATE TABLE IF NOT EXISTS gsm_cells (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mcc INTEGER NOT NULL,
mnc INTEGER NOT NULL,
lac INTEGER NOT NULL,
cid INTEGER NOT NULL,
lat REAL,
lon REAL,
azimuth INTEGER,
range_meters INTEGER,
samples INTEGER,
radio TEXT,
operator TEXT,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_verified TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata TEXT,
UNIQUE(mcc, mnc, lac, cid)
)
''')
# gsm_rogues - Detected rogue towers / IMSI catchers
conn.execute('''
CREATE TABLE IF NOT EXISTS gsm_rogues (
id INTEGER PRIMARY KEY AUTOINCREMENT,
arfcn INTEGER NOT NULL,
mcc INTEGER,
mnc INTEGER,
lac INTEGER,
cid INTEGER,
signal_strength REAL,
reason TEXT NOT NULL,
threat_level TEXT DEFAULT 'medium',
detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
location_lat REAL,
location_lon REAL,
acknowledged BOOLEAN DEFAULT 0,
notes TEXT,
metadata TEXT
)
''')
# gsm_signals - 60-day archive of signal observations
conn.execute('''
CREATE TABLE IF NOT EXISTS gsm_signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
imsi TEXT,
tmsi TEXT,
mcc INTEGER,
mnc INTEGER,
lac INTEGER,
cid INTEGER,
ta_value INTEGER,
signal_strength REAL,
arfcn INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata TEXT
)
''')
# gsm_tmsi_log - 24-hour raw pings for crowd density
conn.execute('''
CREATE TABLE IF NOT EXISTS gsm_tmsi_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tmsi TEXT NOT NULL,
lac INTEGER,
cid INTEGER,
ta_value INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# gsm_velocity_log - 1-hour buffer for movement tracking
conn.execute('''
CREATE TABLE IF NOT EXISTS gsm_velocity_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
prev_ta INTEGER,
curr_ta INTEGER,
prev_cid INTEGER,
curr_cid INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
estimated_velocity REAL,
metadata TEXT
)
''')
# GSM indexes for performance
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_gsm_cells_location
ON gsm_cells(lat, lon)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_gsm_cells_identity
ON gsm_cells(mcc, mnc, lac, cid)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_gsm_rogues_severity
ON gsm_rogues(threat_level, detected_at)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_gsm_signals_cell_time
ON gsm_signals(cid, lac, timestamp)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_gsm_signals_device
ON gsm_signals(imsi, tmsi, timestamp)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_gsm_tmsi_log_time
ON gsm_tmsi_log(timestamp)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_gsm_velocity_log_device
ON gsm_velocity_log(device_id, timestamp)
''')
# =====================================================================
# DSC (Digital Selective Calling) Tables
# =====================================================================
@@ -2298,60 +2170,3 @@ def cleanup_old_payloads(max_age_hours: int = 24) -> int:
''', (f'-{max_age_hours} hours',))
return cursor.rowcount
# =============================================================================
# GSM Cleanup Functions
# =============================================================================
def cleanup_old_gsm_signals(max_age_days: int = 60) -> int:
"""
Remove old GSM signal observations (60-day archive).
Args:
max_age_days: Maximum age in days (default: 60)
Returns:
Number of deleted entries
"""
with get_db() as conn:
cursor = conn.execute('''
DELETE FROM gsm_signals
WHERE timestamp < datetime('now', ?)
''', (f'-{max_age_days} days',))
return cursor.rowcount
def cleanup_old_gsm_tmsi_log(max_age_hours: int = 24) -> int:
"""
Remove old TMSI log entries (24-hour buffer for crowd density).
Args:
max_age_hours: Maximum age in hours (default: 24)
Returns:
Number of deleted entries
"""
with get_db() as conn:
cursor = conn.execute('''
DELETE FROM gsm_tmsi_log
WHERE timestamp < datetime('now', ?)
''', (f'-{max_age_hours} hours',))
return cursor.rowcount
def cleanup_old_gsm_velocity_log(max_age_hours: int = 1) -> int:
"""
Remove old velocity log entries (1-hour buffer for movement tracking).
Args:
max_age_hours: Maximum age in hours (default: 1)
Returns:
Number of deleted entries
"""
with get_db() as conn:
cursor = conn.execute('''
DELETE FROM gsm_velocity_log
WHERE timestamp < datetime('now', ?)
''', (f'-{max_age_hours} hours',))
return cursor.rowcount

View File

@@ -444,38 +444,6 @@ TOOL_DEPENDENCIES = {
}
}
},
'gsm': {
'name': 'GSM Intelligence',
'tools': {
'grgsm_scanner': {
'required': True,
'description': 'gr-gsm scanner for finding GSM towers',
'install': {
'apt': 'Build gr-gsm from source: https://github.com/bkerler/gr-gsm',
'brew': 'brew install gr-gsm (may require manual build)',
'manual': 'https://github.com/bkerler/gr-gsm'
}
},
'grgsm_livemon': {
'required': True,
'description': 'gr-gsm live monitor for decoding GSM signals',
'install': {
'apt': 'Included with gr-gsm package',
'brew': 'Included with gr-gsm',
'manual': 'Included with gr-gsm'
}
},
'tshark': {
'required': True,
'description': 'Wireshark CLI for parsing GSM packets',
'install': {
'apt': 'sudo apt-get install tshark',
'brew': 'brew install wireshark',
'manual': 'https://www.wireshark.org/download.html'
}
}
}
}
}

View File

@@ -1,226 +0,0 @@
"""GSM Cell Tower Geocoding Service.
Provides hybrid cache-first geocoding with async API fallback for cell towers.
"""
from __future__ import annotations
import logging
import queue
from typing import Any
import requests
import config
from utils.database import get_db
logger = logging.getLogger('intercept.gsm_geocoding')
# Queue for pending geocoding requests
_geocoding_queue = queue.Queue(maxsize=100)
def lookup_cell_coordinates(mcc: int, mnc: int, lac: int, cid: int) -> dict[str, Any] | None:
"""
Lookup cell tower coordinates with cache-first strategy.
Strategy:
1. Check gsm_cells table (cache) - fast synchronous lookup
2. If not found, return None (caller decides whether to use API)
Args:
mcc: Mobile Country Code
mnc: Mobile Network Code
lac: Location Area Code
cid: Cell ID
Returns:
dict with keys: lat, lon, source='cache', azimuth (optional),
range_meters (optional), operator (optional), radio (optional)
Returns None if not found in cache.
"""
try:
with get_db() as conn:
result = conn.execute('''
SELECT lat, lon, azimuth, range_meters, operator, radio
FROM gsm_cells
WHERE mcc = ? AND mnc = ? AND lac = ? AND cid = ?
''', (mcc, mnc, lac, cid)).fetchone()
if result and result['lat'] is not None and result['lon'] is not None:
return {
'lat': result['lat'],
'lon': result['lon'],
'source': 'cache',
'azimuth': result['azimuth'],
'range_meters': result['range_meters'],
'operator': result['operator'],
'radio': result['radio']
}
return None
except Exception as e:
logger.error(f"Error looking up coordinates from cache: {e}")
return None
def _get_api_key() -> str:
"""Get OpenCellID API key at runtime (env var first, then database)."""
env_key = config.GSM_OPENCELLID_API_KEY
if env_key:
return env_key
from utils.database import get_setting
return get_setting('gsm.opencellid.api_key', '')
def lookup_cell_from_api(mcc: int, mnc: int, lac: int, cid: int) -> dict[str, Any] | None:
"""
Lookup cell tower from OpenCellID API and cache result.
Args:
mcc: Mobile Country Code
mnc: Mobile Network Code
lac: Location Area Code
cid: Cell ID
Returns:
dict with keys: lat, lon, source='api', azimuth (optional),
range_meters (optional), operator (optional), radio (optional)
Returns None if API call fails or cell not found.
"""
try:
api_key = _get_api_key()
if not api_key:
logger.warning("OpenCellID API key not configured")
return None
api_url = config.GSM_OPENCELLID_API_URL
params = {
'key': api_key,
'mcc': mcc,
'mnc': mnc,
'lac': lac,
'cellid': cid,
'format': 'json'
}
response = requests.get(api_url, params=params, timeout=10)
if response.status_code == 200:
cell_data = response.json()
lat = cell_data.get('lat')
lon = cell_data.get('lon')
# Validate response has actual coordinates
if lat is None or lon is None:
logger.warning(
f"OpenCellID API returned 200 but no coordinates for "
f"MCC={mcc} MNC={mnc} LAC={lac} CID={cid}: {cell_data}"
)
return None
# Cache the result
with get_db() as conn:
conn.execute('''
INSERT OR REPLACE INTO gsm_cells
(mcc, mnc, lac, cid, lat, lon, azimuth, range_meters, samples, radio, operator, last_verified)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
mcc, mnc, lac, cid,
lat, lon,
cell_data.get('azimuth'),
cell_data.get('range'),
cell_data.get('samples'),
cell_data.get('radio'),
cell_data.get('operator')
))
conn.commit()
logger.info(f"Cached cell tower from API: MCC={mcc} MNC={mnc} LAC={lac} CID={cid} -> ({lat}, {lon})")
return {
'lat': lat,
'lon': lon,
'source': 'api',
'azimuth': cell_data.get('azimuth'),
'range_meters': cell_data.get('range'),
'operator': cell_data.get('operator'),
'radio': cell_data.get('radio')
}
else:
logger.warning(
f"OpenCellID API returned {response.status_code} for "
f"MCC={mcc} MNC={mnc} LAC={lac} CID={cid}: {response.text[:200]}"
)
return None
except Exception as e:
logger.error(f"Error calling OpenCellID API: {e}")
return None
def enrich_tower_data(tower_data: dict[str, Any]) -> dict[str, Any]:
"""
Enrich tower data with coordinates using cache-first strategy.
If coordinates found in cache, adds them immediately.
If not found, marks as 'pending' and queues for background API lookup.
Args:
tower_data: Dictionary with keys mcc, mnc, lac, cid (and other tower data)
Returns:
Enriched tower_data dict with added fields:
- lat, lon (if found in cache)
- status='pending' (if needs API lookup)
- source='cache' (if from cache)
"""
mcc = tower_data.get('mcc')
mnc = tower_data.get('mnc')
lac = tower_data.get('lac')
cid = tower_data.get('cid')
# Validate required fields
if not all([mcc is not None, mnc is not None, lac is not None, cid is not None]):
logger.warning(f"Tower data missing required fields: {tower_data}")
return tower_data
# Try cache lookup
coords = lookup_cell_coordinates(mcc, mnc, lac, cid)
if coords:
# Found in cache - add coordinates immediately
tower_data['lat'] = coords['lat']
tower_data['lon'] = coords['lon']
tower_data['source'] = 'cache'
# Add optional fields if available
if coords.get('azimuth') is not None:
tower_data['azimuth'] = coords['azimuth']
if coords.get('range_meters') is not None:
tower_data['range_meters'] = coords['range_meters']
if coords.get('operator'):
tower_data['operator'] = coords['operator']
if coords.get('radio'):
tower_data['radio'] = coords['radio']
logger.debug(f"Cache hit for tower: MCC={mcc} MNC={mnc} LAC={lac} CID={cid}")
else:
# Not in cache - mark as pending and queue for API lookup
tower_data['status'] = 'pending'
tower_data['source'] = 'unknown'
# Queue for background geocoding (non-blocking)
try:
_geocoding_queue.put_nowait(tower_data.copy())
logger.debug(f"Queued tower for geocoding: MCC={mcc} MNC={mnc} LAC={lac} CID={cid}")
except queue.Full:
logger.warning("Geocoding queue full, dropping tower")
return tower_data
def get_geocoding_queue() -> queue.Queue:
"""Get the geocoding queue for the background worker."""
return _geocoding_queue

View File

@@ -28,4 +28,3 @@ wifi_logger = get_logger('intercept.wifi')
bluetooth_logger = get_logger('intercept.bluetooth')
adsb_logger = get_logger('intercept.adsb')
satellite_logger = get_logger('intercept.satellite')
gsm_spy_logger = get_logger('intercept.gsm_spy')