fixing bands and how the gsm scanner loops with tshark

This commit is contained in:
Marc
2026-02-06 08:27:25 -06:00
parent 8e9588c4ff
commit e8a9afa221
6 changed files with 815 additions and 606 deletions

View File

@@ -6,6 +6,7 @@ import json
import logging
import queue
import re
import select
import subprocess
import threading
import time
@@ -33,7 +34,8 @@ REGIONAL_BANDS = {
'PCS1900': {'start': 1930e6, 'end': 1990e6, 'arfcn_start': 512, 'arfcn_end': 810}
},
'Europe': {
'EGSM900': {'start': 925e6, 'end': 960e6, 'arfcn_start': 0, 'arfcn_end': 124}
'EGSM900': {'start': 925e6, 'end': 960e6, 'arfcn_start': 0, 'arfcn_end': 124},
'DCS1800': {'start': 1805e6, 'end': 1880e6, 'arfcn_start': 512, 'arfcn_end': 885}
},
'Asia': {
'EGSM900': {'start': 925e6, 'end': 960e6, 'arfcn_start': 0, 'arfcn_end': 124},
@@ -47,6 +49,9 @@ gsm_connected = False
gsm_towers_found = 0
gsm_devices_tracked = 0
# Geocoding worker state
_geocoding_worker_thread = None
# ============================================
# API Usage Tracking Helper Functions
@@ -82,6 +87,100 @@ def can_use_api():
return current_usage < config.GSM_API_DAILY_LIMIT
# ============================================
# Background Geocoding Worker
# ============================================
def start_geocoding_worker():
"""Start background thread for async geocoding."""
global _geocoding_worker_thread
if _geocoding_worker_thread is None or not _geocoding_worker_thread.is_alive():
_geocoding_worker_thread = threading.Thread(
target=geocoding_worker,
daemon=True,
name='gsm-geocoding-worker'
)
_geocoding_worker_thread.start()
logger.info("Started geocoding worker thread")
def geocoding_worker():
"""Worker thread processes pending geocoding requests."""
from utils.gsm_geocoding import lookup_cell_from_api, get_geocoding_queue
geocoding_queue = get_geocoding_queue()
while True:
try:
# Wait for pending tower with timeout
tower_data = geocoding_queue.get(timeout=5)
# Check rate limit
if not can_use_api():
current_usage = get_api_usage_today()
logger.warning(f"OpenCellID API rate limit reached ({current_usage}/{config.GSM_API_DAILY_LIMIT})")
geocoding_queue.task_done()
continue
# Call API
mcc = tower_data.get('mcc')
mnc = tower_data.get('mnc')
lac = tower_data.get('lac')
cid = tower_data.get('cid')
logger.debug(f"Geocoding tower via API: MCC={mcc} MNC={mnc} LAC={lac} CID={cid}")
coords = lookup_cell_from_api(mcc, mnc, lac, cid)
if coords:
# Update tower data with coordinates
tower_data['lat'] = coords['lat']
tower_data['lon'] = coords['lon']
tower_data['source'] = 'api'
tower_data['status'] = 'resolved'
tower_data['type'] = 'tower_update'
# Add optional fields if available
if coords.get('azimuth') is not None:
tower_data['azimuth'] = coords['azimuth']
if coords.get('range_meters') is not None:
tower_data['range_meters'] = coords['range_meters']
if coords.get('operator'):
tower_data['operator'] = coords['operator']
if coords.get('radio'):
tower_data['radio'] = coords['radio']
# Update DataStore
key = f"{mcc}_{mnc}_{lac}_{cid}"
app_module.gsm_spy_towers[key] = tower_data
# Send update to SSE stream
try:
app_module.gsm_spy_queue.put_nowait(tower_data)
logger.info(f"Resolved coordinates for tower: MCC={mcc} MNC={mnc} LAC={lac} CID={cid}")
except queue.Full:
logger.warning("SSE queue full, dropping tower update")
# Increment API usage counter
usage_count = increment_api_usage()
logger.info(f"OpenCellID API call #{usage_count} today")
else:
logger.warning(f"Could not resolve coordinates for tower: MCC={mcc} MNC={mnc} LAC={lac} CID={cid}")
geocoding_queue.task_done()
# Rate limiting between API calls (be nice to OpenCellID)
time.sleep(1)
except queue.Empty:
# No pending towers, continue waiting
continue
except Exception as e:
logger.error(f"Geocoding worker error: {e}", exc_info=True)
time.sleep(1)
def arfcn_to_frequency(arfcn):
"""Convert ARFCN to downlink frequency in Hz.
@@ -163,22 +262,18 @@ def start_scanner():
logger.info(f"Starting GSM scanner: {' '.join(cmd)}")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
bufsize=1
)
app_module.gsm_spy_process = process
# Set a flag to indicate scanner should run
app_module.gsm_spy_active_device = device_index
app_module.gsm_spy_region = region
app_module.gsm_spy_process = True # Use as flag initially
# Start output parsing thread
# Start geocoding worker (if not already running)
start_geocoding_worker()
# Start scanning thread (will run grgsm_scanner in a loop)
scanner_thread_obj = threading.Thread(
target=scanner_thread,
args=(process,),
args=(cmd, device_index),
daemon=True
)
scanner_thread_obj.start()
@@ -242,14 +337,18 @@ def start_monitor():
logger.info(f"Starting GSM monitor: {' '.join(grgsm_cmd)} | {' '.join(tshark_cmd)}")
# Start grgsm_livemon
# Start grgsm_livemon (outputs to UDP port 4729 by default)
grgsm_proc = subprocess.Popen(
grgsm_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
logger.info(f"Started grgsm_livemon (PID: {grgsm_proc.pid})")
# Start tshark
# Give grgsm_livemon time to initialize and start sending UDP packets
time.sleep(2)
# Start tshark (captures from loopback interface where UDP packets arrive)
tshark_proc = subprocess.Popen(
tshark_cmd,
stdout=subprocess.PIPE,
@@ -257,6 +356,7 @@ def start_monitor():
universal_newlines=True,
bufsize=1
)
logger.info(f"Started tshark (PID: {tshark_proc.pid})")
app_module.gsm_spy_livemon_process = grgsm_proc
app_module.gsm_spy_monitor_process = tshark_proc
@@ -291,17 +391,10 @@ def stop_scanner():
with app_module.gsm_spy_lock:
killed = []
# Stop scanner (now just a flag, thread will see it and exit)
if app_module.gsm_spy_process:
try:
app_module.gsm_spy_process.terminate()
app_module.gsm_spy_process.wait(timeout=5)
killed.append('scanner')
except Exception:
try:
app_module.gsm_spy_process.kill()
except Exception:
pass
app_module.gsm_spy_process = None
killed.append('scanner')
if app_module.gsm_spy_livemon_process:
try:
@@ -917,33 +1010,45 @@ def traffic_correlation():
# ============================================
def parse_grgsm_scanner_output(line: str) -> dict[str, Any] | None:
"""Parse grgsm_scanner output line."""
"""Parse grgsm_scanner output line.
Actual output format is a table:
ARFCN | Freq (MHz) | CID | LAC | MCC | MNC | Power (dB)
--------------------------------------------------------------------
23 | 940.6 | 31245 | 1234 | 214 | 01 | -48
"""
try:
# Example output: "ARFCN: 123, Freq: 935.2MHz, CID: 1234, LAC: 567, MCC: 310, MNC: 260, PWR: -85dBm"
# This is a placeholder - actual format depends on grgsm_scanner output
# Skip progress, header, and separator lines
if 'Scanning:' in line or 'ARFCN' in line or '---' in line or 'Found' in line:
return None
# Simple regex patterns
arfcn_match = re.search(r'ARFCN[:\s]+(\d+)', line)
freq_match = re.search(r'Freq[:\s]+([\d.]+)', line)
cid_match = re.search(r'CID[:\s]+(\d+)', line)
lac_match = re.search(r'LAC[:\s]+(\d+)', line)
mcc_match = re.search(r'MCC[:\s]+(\d+)', line)
mnc_match = re.search(r'MNC[:\s]+(\d+)', line)
pwr_match = re.search(r'PWR[:\s]+([-\d.]+)', line)
# Parse table row: " 23 | 940.6 | 31245 | 1234 | 214 | 01 | -48"
# Split by pipe and clean whitespace
parts = [p.strip() for p in line.split('|')]
if arfcn_match:
data = {
'type': 'tower',
'arfcn': int(arfcn_match.group(1)),
'frequency': float(freq_match.group(1)) if freq_match else None,
'cid': int(cid_match.group(1)) if cid_match else None,
'lac': int(lac_match.group(1)) if lac_match else None,
'mcc': int(mcc_match.group(1)) if mcc_match else None,
'mnc': int(mnc_match.group(1)) if mnc_match else None,
'signal_strength': float(pwr_match.group(1)) if pwr_match else None,
'timestamp': datetime.now().isoformat()
}
return data
if len(parts) >= 7:
arfcn = parts[0]
freq = parts[1]
cid = parts[2]
lac = parts[3]
mcc = parts[4]
mnc = parts[5]
power = parts[6]
# Validate that we have numeric data (not header line)
if arfcn.isdigit():
data = {
'type': 'tower',
'arfcn': int(arfcn),
'frequency': float(freq),
'cid': int(cid),
'lac': int(lac),
'mcc': int(mcc),
'mnc': int(mnc),
'signal_strength': float(power),
'timestamp': datetime.now().isoformat()
}
return data
except Exception as e:
logger.debug(f"Failed to parse scanner line: {line} - {e}")
@@ -1025,14 +1130,18 @@ def auto_start_monitor(tower_data):
logger.info(f"Starting auto-monitor: {' '.join(grgsm_cmd)} | {' '.join(tshark_cmd)}")
# Start grgsm_livemon (we don't capture its output)
# Start grgsm_livemon (outputs to UDP port 4729 by default)
grgsm_proc = subprocess.Popen(
grgsm_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
logger.info(f"Started grgsm_livemon for auto-monitor (PID: {grgsm_proc.pid})")
# Start tshark
# Give grgsm_livemon time to initialize and start sending UDP packets
time.sleep(2)
# Start tshark (captures from loopback interface where UDP packets arrive)
tshark_proc = subprocess.Popen(
tshark_cmd,
stdout=subprocess.PIPE,
@@ -1040,6 +1149,7 @@ def auto_start_monitor(tower_data):
universal_newlines=True,
bufsize=1
)
logger.info(f"Started tshark for auto-monitor (PID: {tshark_proc.pid})")
app_module.gsm_spy_livemon_process = grgsm_proc
app_module.gsm_spy_monitor_process = tshark_proc
@@ -1069,66 +1179,192 @@ def auto_start_monitor(tower_data):
logger.error(f"Error in auto-monitoring: {e}")
def scanner_thread(process):
"""Thread to read grgsm_scanner output."""
def scanner_thread(cmd, device_index):
"""Thread to continuously run grgsm_scanner in a loop with non-blocking I/O.
grgsm_scanner scans once and exits, so we loop it to provide
continuous updates to the dashboard.
"""
global gsm_towers_found
strongest_tower = None
auto_monitor_triggered = False
auto_monitor_triggered = False # Moved outside loop - persists across scans
scan_count = 0
process = None
try:
for line in process.stdout:
if not line:
continue
while app_module.gsm_spy_process: # Flag check
scan_count += 1
logger.info(f"Starting GSM scan #{scan_count}")
parsed = parse_grgsm_scanner_output(line)
if parsed:
# Store in DataStore
key = f"{parsed.get('mcc')}_{parsed.get('mnc')}_{parsed.get('lac')}_{parsed.get('cid')}"
app_module.gsm_spy_towers[key] = parsed
try:
# Start scanner process
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
bufsize=1
)
# Track strongest tower for auto-monitoring
signal_strength = parsed.get('signal_strength', -999)
if strongest_tower is None or signal_strength > strongest_tower.get('signal_strength', -999):
strongest_tower = parsed
# Non-blocking stderr reader
def read_stderr():
try:
for line in process.stderr:
if line:
logger.debug(f"grgsm_scanner: {line.strip()}")
except Exception as e:
logger.error(f"stderr read error: {e}")
# Queue for SSE stream
try:
app_module.gsm_spy_queue.put_nowait(parsed)
except queue.Full:
pass
stderr_thread = threading.Thread(target=read_stderr, daemon=True)
stderr_thread.start()
gsm_towers_found += 1
# Non-blocking stdout reader with timeout
last_output = time.time()
scan_timeout = 120 # 2 minute maximum per scan
# Auto-monitor strongest tower after finding 3+ towers
if gsm_towers_found >= 3 and not auto_monitor_triggered and strongest_tower:
auto_monitor_triggered = True
threading.Thread(
target=auto_start_monitor,
args=(strongest_tower,),
daemon=True
).start()
while app_module.gsm_spy_process:
# Check if process died
if process.poll() is not None:
logger.info(f"Scanner exited (code: {process.returncode})")
break
# Check for output with 1-second timeout
ready, _, _ = select.select([process.stdout], [], [], 1.0)
if ready:
line = process.stdout.readline()
if not line:
break # EOF
last_output = time.time()
parsed = parse_grgsm_scanner_output(line)
if parsed:
# Enrich with coordinates
from utils.gsm_geocoding import enrich_tower_data
enriched = enrich_tower_data(parsed)
# Store in DataStore
key = f"{enriched['mcc']}_{enriched['mnc']}_{enriched['lac']}_{enriched['cid']}"
app_module.gsm_spy_towers[key] = enriched
# Track strongest tower
signal = enriched.get('signal_strength', -999)
if strongest_tower is None or signal > strongest_tower.get('signal_strength', -999):
strongest_tower = enriched
# Queue for SSE
try:
app_module.gsm_spy_queue.put_nowait(enriched)
except queue.Full:
logger.warning("Queue full, dropping tower update")
# Thread-safe counter update
with app_module.gsm_spy_lock:
gsm_towers_found += 1
current_count = gsm_towers_found
# Auto-monitor strongest tower (once per session)
if current_count >= 3 and not auto_monitor_triggered and strongest_tower:
auto_monitor_triggered = True
logger.info("Auto-starting monitor on strongest tower")
threading.Thread(
target=auto_start_monitor,
args=(strongest_tower,),
daemon=True
).start()
else:
# No output, check timeout
if time.time() - last_output > scan_timeout:
logger.warning(f"Scan timeout after {scan_timeout}s")
break
# Clean up process with timeout
if process.poll() is None:
logger.info("Terminating scanner process")
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.warning("Process didn't terminate, killing")
process.kill()
process.wait()
else:
process.wait() # Reap zombie
logger.info(f"Scan #{scan_count} complete")
except Exception as e:
logger.error(f"Scanner scan error: {e}", exc_info=True)
if process and process.poll() is None:
try:
process.terminate()
process.wait(timeout=2)
except Exception:
try:
process.kill()
except Exception:
pass
# Check if should continue
if not app_module.gsm_spy_process:
break
# Wait between scans with responsive flag checking
logger.info("Waiting 5 seconds before next scan")
for i in range(5):
if not app_module.gsm_spy_process:
break
time.sleep(1)
except Exception as e:
logger.error(f"Scanner thread error: {e}")
logger.error(f"Scanner thread fatal error: {e}", exc_info=True)
finally:
# Reap the process to prevent zombie (don't terminate, just wait)
try:
process.wait()
logger.info(f"Scanner process exited with code {process.returncode}")
except Exception as e:
logger.error(f"Error waiting for scanner process: {e}")
# Always cleanup
if process and process.poll() is None:
try:
process.terminate()
process.wait(timeout=5)
except Exception:
try:
process.kill()
process.wait()
except Exception:
pass
logger.info("Scanner thread terminated")
# Reset global state
with app_module.gsm_spy_lock:
app_module.gsm_spy_process = None
if app_module.gsm_spy_active_device is not None:
from app import release_sdr_device
release_sdr_device(app_module.gsm_spy_active_device)
app_module.gsm_spy_active_device = None
def monitor_thread(process):
"""Thread to read grgsm_livemon | tshark output."""
"""Thread to read tshark output with non-blocking I/O and timeouts."""
global gsm_devices_tracked
try:
for line in process.stdout:
while app_module.gsm_spy_monitor_process:
# Check if process died
if process.poll() is not None:
logger.info(f"Monitor process exited (code: {process.returncode})")
break
# Non-blocking read with timeout
ready, _, _ = select.select([process.stdout], [], [], 1.0)
if not ready:
continue # Timeout, check flag again
line = process.stdout.readline()
if not line:
continue
break # EOF
parsed = parse_tshark_output(line)
if parsed:
@@ -1218,15 +1454,28 @@ def monitor_thread(process):
except Exception as e:
logger.error(f"Error storing device data: {e}")
gsm_devices_tracked += 1
# Thread-safe counter
with app_module.gsm_spy_lock:
gsm_devices_tracked += 1
except Exception as e:
logger.error(f"Monitor thread error: {e}")
logger.error(f"Monitor thread error: {e}", exc_info=True)
finally:
# Reap the process to prevent zombie (don't terminate, just wait)
# Reap process with timeout
try:
process.wait()
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.warning("Monitor process didn't terminate, killing")
process.kill()
process.wait()
else:
process.wait()
logger.info(f"Monitor process exited with code {process.returncode}")
except Exception as e:
logger.error(f"Error waiting for monitor process: {e}")
logger.error(f"Error reaping monitor process: {e}")
logger.info("Monitor thread terminated")