mirror of
https://github.com/smittix/intercept.git
synced 2026-04-30 01:29:59 -07:00
Parse tshark GSM field values with int(value, 0) instead of int(value) to auto-detect hex 0x-prefixed output (e.g. 0x039e for TMSI/LAC/CID). Without this, every tshark line with hex values fails to parse, causing 0 devices to be captured during monitoring. Also add API Keys tab to Settings modal for configuring OpenCellID key via the UI (in addition to env var), with status display and usage bar. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
227 lines
7.6 KiB
Python
227 lines
7.6 KiB
Python
"""GSM Cell Tower Geocoding Service.
|
|
|
|
Provides hybrid cache-first geocoding with async API fallback for cell towers.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import queue
|
|
from typing import Any
|
|
|
|
import requests
|
|
|
|
import config
|
|
from utils.database import get_db
|
|
|
|
logger = logging.getLogger('intercept.gsm_geocoding')
|
|
|
|
# Queue for pending geocoding requests
|
|
_geocoding_queue = queue.Queue(maxsize=100)
|
|
|
|
|
|
def lookup_cell_coordinates(mcc: int, mnc: int, lac: int, cid: int) -> dict[str, Any] | None:
|
|
"""
|
|
Lookup cell tower coordinates with cache-first strategy.
|
|
|
|
Strategy:
|
|
1. Check gsm_cells table (cache) - fast synchronous lookup
|
|
2. If not found, return None (caller decides whether to use API)
|
|
|
|
Args:
|
|
mcc: Mobile Country Code
|
|
mnc: Mobile Network Code
|
|
lac: Location Area Code
|
|
cid: Cell ID
|
|
|
|
Returns:
|
|
dict with keys: lat, lon, source='cache', azimuth (optional),
|
|
range_meters (optional), operator (optional), radio (optional)
|
|
Returns None if not found in cache.
|
|
"""
|
|
try:
|
|
with get_db() as conn:
|
|
result = conn.execute('''
|
|
SELECT lat, lon, azimuth, range_meters, operator, radio
|
|
FROM gsm_cells
|
|
WHERE mcc = ? AND mnc = ? AND lac = ? AND cid = ?
|
|
''', (mcc, mnc, lac, cid)).fetchone()
|
|
|
|
if result and result['lat'] is not None and result['lon'] is not None:
|
|
return {
|
|
'lat': result['lat'],
|
|
'lon': result['lon'],
|
|
'source': 'cache',
|
|
'azimuth': result['azimuth'],
|
|
'range_meters': result['range_meters'],
|
|
'operator': result['operator'],
|
|
'radio': result['radio']
|
|
}
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error looking up coordinates from cache: {e}")
|
|
return None
|
|
|
|
|
|
def _get_api_key() -> str:
|
|
"""Get OpenCellID API key at runtime (env var first, then database)."""
|
|
env_key = config.GSM_OPENCELLID_API_KEY
|
|
if env_key:
|
|
return env_key
|
|
from utils.database import get_setting
|
|
return get_setting('gsm.opencellid.api_key', '')
|
|
|
|
|
|
def lookup_cell_from_api(mcc: int, mnc: int, lac: int, cid: int) -> dict[str, Any] | None:
|
|
"""
|
|
Lookup cell tower from OpenCellID API and cache result.
|
|
|
|
Args:
|
|
mcc: Mobile Country Code
|
|
mnc: Mobile Network Code
|
|
lac: Location Area Code
|
|
cid: Cell ID
|
|
|
|
Returns:
|
|
dict with keys: lat, lon, source='api', azimuth (optional),
|
|
range_meters (optional), operator (optional), radio (optional)
|
|
Returns None if API call fails or cell not found.
|
|
"""
|
|
try:
|
|
api_key = _get_api_key()
|
|
if not api_key:
|
|
logger.warning("OpenCellID API key not configured")
|
|
return None
|
|
|
|
api_url = config.GSM_OPENCELLID_API_URL
|
|
params = {
|
|
'key': api_key,
|
|
'mcc': mcc,
|
|
'mnc': mnc,
|
|
'lac': lac,
|
|
'cellid': cid,
|
|
'format': 'json'
|
|
}
|
|
|
|
response = requests.get(api_url, params=params, timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
cell_data = response.json()
|
|
lat = cell_data.get('lat')
|
|
lon = cell_data.get('lon')
|
|
|
|
# Validate response has actual coordinates
|
|
if lat is None or lon is None:
|
|
logger.warning(
|
|
f"OpenCellID API returned 200 but no coordinates for "
|
|
f"MCC={mcc} MNC={mnc} LAC={lac} CID={cid}: {cell_data}"
|
|
)
|
|
return None
|
|
|
|
# Cache the result
|
|
with get_db() as conn:
|
|
conn.execute('''
|
|
INSERT OR REPLACE INTO gsm_cells
|
|
(mcc, mnc, lac, cid, lat, lon, azimuth, range_meters, samples, radio, operator, last_verified)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
''', (
|
|
mcc, mnc, lac, cid,
|
|
lat, lon,
|
|
cell_data.get('azimuth'),
|
|
cell_data.get('range'),
|
|
cell_data.get('samples'),
|
|
cell_data.get('radio'),
|
|
cell_data.get('operator')
|
|
))
|
|
conn.commit()
|
|
|
|
logger.info(f"Cached cell tower from API: MCC={mcc} MNC={mnc} LAC={lac} CID={cid} -> ({lat}, {lon})")
|
|
|
|
return {
|
|
'lat': lat,
|
|
'lon': lon,
|
|
'source': 'api',
|
|
'azimuth': cell_data.get('azimuth'),
|
|
'range_meters': cell_data.get('range'),
|
|
'operator': cell_data.get('operator'),
|
|
'radio': cell_data.get('radio')
|
|
}
|
|
else:
|
|
logger.warning(
|
|
f"OpenCellID API returned {response.status_code} for "
|
|
f"MCC={mcc} MNC={mnc} LAC={lac} CID={cid}: {response.text[:200]}"
|
|
)
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calling OpenCellID API: {e}")
|
|
return None
|
|
|
|
|
|
def enrich_tower_data(tower_data: dict[str, Any]) -> dict[str, Any]:
|
|
"""
|
|
Enrich tower data with coordinates using cache-first strategy.
|
|
|
|
If coordinates found in cache, adds them immediately.
|
|
If not found, marks as 'pending' and queues for background API lookup.
|
|
|
|
Args:
|
|
tower_data: Dictionary with keys mcc, mnc, lac, cid (and other tower data)
|
|
|
|
Returns:
|
|
Enriched tower_data dict with added fields:
|
|
- lat, lon (if found in cache)
|
|
- status='pending' (if needs API lookup)
|
|
- source='cache' (if from cache)
|
|
"""
|
|
mcc = tower_data.get('mcc')
|
|
mnc = tower_data.get('mnc')
|
|
lac = tower_data.get('lac')
|
|
cid = tower_data.get('cid')
|
|
|
|
# Validate required fields
|
|
if not all([mcc is not None, mnc is not None, lac is not None, cid is not None]):
|
|
logger.warning(f"Tower data missing required fields: {tower_data}")
|
|
return tower_data
|
|
|
|
# Try cache lookup
|
|
coords = lookup_cell_coordinates(mcc, mnc, lac, cid)
|
|
|
|
if coords:
|
|
# Found in cache - add coordinates immediately
|
|
tower_data['lat'] = coords['lat']
|
|
tower_data['lon'] = coords['lon']
|
|
tower_data['source'] = 'cache'
|
|
|
|
# Add optional fields if available
|
|
if coords.get('azimuth') is not None:
|
|
tower_data['azimuth'] = coords['azimuth']
|
|
if coords.get('range_meters') is not None:
|
|
tower_data['range_meters'] = coords['range_meters']
|
|
if coords.get('operator'):
|
|
tower_data['operator'] = coords['operator']
|
|
if coords.get('radio'):
|
|
tower_data['radio'] = coords['radio']
|
|
|
|
logger.debug(f"Cache hit for tower: MCC={mcc} MNC={mnc} LAC={lac} CID={cid}")
|
|
else:
|
|
# Not in cache - mark as pending and queue for API lookup
|
|
tower_data['status'] = 'pending'
|
|
tower_data['source'] = 'unknown'
|
|
|
|
# Queue for background geocoding (non-blocking)
|
|
try:
|
|
_geocoding_queue.put_nowait(tower_data.copy())
|
|
logger.debug(f"Queued tower for geocoding: MCC={mcc} MNC={mnc} LAC={lac} CID={cid}")
|
|
except queue.Full:
|
|
logger.warning("Geocoding queue full, dropping tower")
|
|
|
|
return tower_data
|
|
|
|
|
|
def get_geocoding_queue() -> queue.Queue:
|
|
"""Get the geocoding queue for the background worker."""
|
|
return _geocoding_queue
|