Fix tshark crash by skipping invalid fields instead of using fallbacks

When tshark field discovery finds no valid candidate for a logical field
(e.g. timing_advance, cellid), the old code fell back to the first
candidate name even though it was known to be invalid. This caused tshark
to exit immediately with "Some fields aren't valid".

Now fields resolve to None when no valid candidate exists, and the tshark
command is built using only validated fields. The parser dynamically maps
columns via field_order instead of assuming a fixed 5-column layout.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-08 19:14:09 +00:00
parent 182e1f3239
commit 4d7be047da
+163 -102
View File
@@ -242,97 +242,117 @@ def validate_band_names(bands: list[str], region: str) -> tuple[list[str], str |
_tshark_fields_cache: dict[str, str] | None = None
def _discover_tshark_fields() -> dict[str, str]:
def _discover_tshark_fields() -> dict[str, str | None]:
"""Discover correct tshark field names for GSM A protocol.
Different Wireshark versions use different field names for -e extraction.
We validate candidates by actually testing them with tshark, since
`tshark -G fields` can list fields that aren't valid for -T fields -e.
Searches tshark's registered fields for keywords to find the actual
names used by the installed Wireshark version, then validates them.
Returns:
Dict mapping logical names to actual tshark field names:
Dict mapping logical names to actual tshark field names (or None):
{'ta': ..., 'tmsi': ..., 'imsi': ..., 'lac': ..., 'cid': ...}
"""
global _tshark_fields_cache
if _tshark_fields_cache is not None:
return _tshark_fields_cache
# Candidate field names for each logical field, in preference order
candidates = {
'ta': [
'gsm_a.rr.timing_advance',
'gsm_a_rr.timing_adv',
'gsm_a_rr.timing_advance',
],
'tmsi': [
'gsm_a.tmsi',
'gsm_a_dtap.tmsi',
'gsm_a.dtap.tmsi',
],
'imsi': [
'e212.imsi',
'gsm_a.imsi',
'gsm_a_dtap.imsi',
],
'lac': [
'gsm_a.lac',
'e212.lac',
'gsm_a_bssmap.lac',
],
'cid': [
'gsm_a.cellid',
'e212.ci',
'gsm_a_bssmap.cell_ci',
'gsm_a.cell_ci',
],
# Search patterns for each logical field (applied to tshark -G fields output)
# Each entry: (keyword_patterns, exclusion_patterns)
# We search the field filter name column for matches
search_config = {
'ta': {
'keywords': ['timing_adv', 'timing_advance'],
'prefer': ['gsm_a'], # prefer GSM A protocol fields
},
'tmsi': {
'keywords': ['.tmsi'],
'prefer': ['gsm_a'],
},
'imsi': {
'keywords': ['.imsi'],
'prefer': ['e212', 'gsm_a'],
},
'lac': {
'keywords': ['.lac'],
'prefer': ['gsm_a', 'e212'],
},
'cid': {
'keywords': ['cellid', 'cell_ci', '.cell_id', 'e212.ci'],
'prefer': ['gsm_a', 'e212'],
},
}
def _test_fields(field_list: list[str]) -> set[str]:
"""Test which fields tshark accepts for -e extraction."""
cmd = ['tshark', '-T', 'fields']
for f in field_list:
cmd.extend(['-e', f])
cmd.extend(['-r', '/dev/null'])
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
return set(field_list)
# Parse stderr for invalid fields
invalid = set()
for line in result.stderr.splitlines():
line = line.strip()
# tshark lists invalid fields one per line after the error header
for f in field_list:
if f in line:
invalid.add(f)
return set(field_list) - invalid
except Exception as e:
logger.warning(f"Could not validate tshark fields: {e}")
return set()
# Step 1: Get all field names from tshark (F lines only, not P protocol lines)
all_field_names = []
try:
result = subprocess.run(
['tshark', '-G', 'fields'],
capture_output=True, text=True, timeout=15
)
for line in result.stdout.splitlines():
if not line.startswith('F\t'):
continue # Only actual fields, not protocols
parts = line.split('\t')
if len(parts) >= 3:
all_field_names.append(parts[2])
except Exception as e:
logger.warning(f"Could not query tshark fields: {e}")
# Collect all unique candidate field names and test them in one call
all_candidates = []
# Step 2: Search for candidate fields matching each logical name
candidates: dict[str, list[str]] = {}
for logical_name, config in search_config.items():
matches = []
for field_name in all_field_names:
for keyword in config['keywords']:
if keyword in field_name:
matches.append(field_name)
break
# Sort: preferred protocol prefixes first
def sort_key(name):
for i, pref in enumerate(config['prefer']):
if name.startswith(pref):
return i
return 100
matches.sort(key=sort_key)
candidates[logical_name] = matches
logger.info(f"tshark field candidates from -G fields: {candidates}")
# Step 3: Validate candidates by testing with tshark -r /dev/null
# Collect all unique candidate names
all_candidate_names = set()
for field_list in candidates.values():
all_candidates.extend(field_list)
valid_fields = _test_fields(list(set(all_candidates)))
logger.info(f"Valid tshark -e fields: {sorted(valid_fields)}")
all_candidate_names.update(field_list)
# Match each logical field to the first valid candidate
resolved = {}
valid_fields = set()
if all_candidate_names:
# Test in batches to identify which are valid
# Test each individually since batch testing makes it hard to identify valid ones
for field_name in all_candidate_names:
try:
result = subprocess.run(
['tshark', '-T', 'fields', '-e', field_name, '-r', '/dev/null'],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0 or 'aren\'t valid' not in result.stderr:
valid_fields.add(field_name)
except Exception:
pass
logger.info(f"Validated tshark -e fields: {sorted(valid_fields)}")
# Step 4: Resolve each logical field to the first valid candidate
resolved: dict[str, str | None] = {}
for logical_name, field_candidates in candidates.items():
resolved[logical_name] = None
for candidate in field_candidates:
if candidate in valid_fields:
resolved[logical_name] = candidate
break
if resolved[logical_name] is None:
# No candidate was valid - use first as fallback
resolved[logical_name] = field_candidates[0]
if resolved[logical_name] is None and field_candidates:
logger.warning(
f"No valid tshark field found for '{logical_name}', "
f"using fallback: {field_candidates[0]}"
f"No valid tshark field for '{logical_name}'. "
f"Candidates were: {field_candidates}"
)
logger.info(f"Resolved tshark fields: {resolved}")
@@ -340,15 +360,16 @@ def _discover_tshark_fields() -> dict[str, str]:
return resolved
def _start_monitoring_processes(arfcn: int, device_index: int) -> tuple[subprocess.Popen, subprocess.Popen]:
def _start_monitoring_processes(arfcn: int, device_index: int) -> tuple[subprocess.Popen, subprocess.Popen, list[str]]:
"""Start grgsm_livemon and tshark processes for monitoring an ARFCN.
Returns:
Tuple of (grgsm_process, tshark_process)
Tuple of (grgsm_process, tshark_process, field_order)
field_order is the list of logical field names in tshark column order.
Raises:
FileNotFoundError: If grgsm_livemon or tshark not found
RuntimeError: If grgsm_livemon exits immediately
RuntimeError: If grgsm_livemon or tshark exits immediately
"""
frequency_hz = arfcn_to_frequency(arfcn)
frequency_mhz = frequency_hz / 1e6
@@ -415,20 +436,39 @@ def _start_monitoring_processes(arfcn: int, device_index: int) -> tuple[subproce
raise FileNotFoundError('tshark not found. Please install wireshark/tshark.')
fields = _discover_tshark_fields()
display_filter = f"{fields['ta']} || {fields['tmsi']} || {fields['imsi']}"
# Build field list from only valid (non-None) fields
# Track order so parser knows which column is which
field_order = [] # list of logical names in column order
tshark_cmd = [
'tshark',
'-i', 'lo',
'-l', # Line-buffered output for live capture
'-f', 'udp port 4729', # Capture filter: only GSMTAP packets
'-Y', display_filter,
'-T', 'fields',
'-e', fields['ta'],
'-e', fields['tmsi'],
'-e', fields['imsi'],
'-e', fields['lac'],
'-e', fields['cid']
]
# Build display filter from available fields
filter_parts = []
for logical_name in ['ta', 'tmsi', 'imsi']:
if fields.get(logical_name):
filter_parts.append(fields[logical_name])
if filter_parts:
tshark_cmd.extend(['-Y', ' || '.join(filter_parts)])
tshark_cmd.extend(['-T', 'fields'])
# Add -e for each available field in known order
for logical_name in ['ta', 'tmsi', 'imsi', 'lac', 'cid']:
if fields.get(logical_name):
tshark_cmd.extend(['-e', fields[logical_name]])
field_order.append(logical_name)
if not field_order:
safe_terminate(grgsm_proc)
unregister_process(grgsm_proc)
raise RuntimeError('No valid tshark fields found for GSM capture')
logger.info(f"tshark field order: {field_order}")
logger.info(f"Starting tshark: {' '.join(tshark_cmd)}")
tshark_proc = subprocess.Popen(
tshark_cmd,
@@ -456,7 +496,7 @@ def _start_monitoring_processes(arfcn: int, device_index: int) -> tuple[subproce
unregister_process(tshark_proc)
raise RuntimeError(f'tshark failed (exit code {exit_code}): {stderr_output[:200]}')
return grgsm_proc, tshark_proc
return grgsm_proc, tshark_proc, field_order
def _start_and_register_monitor(arfcn: int, device_index: int) -> None:
@@ -470,7 +510,7 @@ def _start_and_register_monitor(arfcn: int, device_index: int) -> None:
device_index: SDR device index
"""
# Start monitoring processes
grgsm_proc, tshark_proc = _start_monitoring_processes(arfcn, device_index)
grgsm_proc, tshark_proc, field_order = _start_monitoring_processes(arfcn, device_index)
app_module.gsm_spy_livemon_process = grgsm_proc
app_module.gsm_spy_monitor_process = tshark_proc
app_module.gsm_spy_selected_arfcn = arfcn
@@ -478,7 +518,7 @@ def _start_and_register_monitor(arfcn: int, device_index: int) -> None:
# Start monitoring thread
monitor_thread_obj = threading.Thread(
target=monitor_thread,
args=(tshark_proc,),
args=(tshark_proc, field_order),
daemon=True
)
monitor_thread_obj.start()
@@ -1336,28 +1376,49 @@ def parse_grgsm_scanner_output(line: str) -> dict[str, Any] | None:
return None
def parse_tshark_output(line: str) -> dict[str, Any] | None:
"""Parse tshark filtered GSM output."""
def parse_tshark_output(line: str, field_order: list[str] | None = None) -> dict[str, Any] | None:
"""Parse tshark filtered GSM output.
Args:
line: Tab-separated tshark output line
field_order: List of logical field names in column order.
If None, assumes legacy order: ['ta', 'tmsi', 'imsi', 'lac', 'cid']
"""
if field_order is None:
field_order = ['ta', 'tmsi', 'imsi', 'lac', 'cid']
try:
# tshark output format: ta_value\ttmsi\timsi\tlac\tcid
parts = line.strip().split('\t')
if len(parts) >= 5:
data = {
'type': 'device',
'ta_value': int(parts[0]) if parts[0] else None,
'tmsi': parts[1] if parts[1] else None,
'imsi': parts[2] if parts[2] else None,
'lac': int(parts[3]) if parts[3] else None,
'cid': int(parts[4]) if parts[4] else None,
'timestamp': datetime.now().isoformat()
}
if len(parts) < len(field_order):
return None
# Calculate distance from TA
if data['ta_value'] is not None:
data['distance_meters'] = data['ta_value'] * config.GSM_TA_METERS_PER_UNIT
# Map logical names to column values
field_map = {}
for i, logical_name in enumerate(field_order):
field_map[logical_name] = parts[i] if parts[i] else None
return data
# Convert types
ta_raw = field_map.get('ta')
data = {
'type': 'device',
'ta_value': int(ta_raw) if ta_raw else None,
'tmsi': field_map.get('tmsi'),
'imsi': field_map.get('imsi'),
'lac': int(field_map['lac']) if field_map.get('lac') else None,
'cid': int(field_map['cid']) if field_map.get('cid') else None,
'timestamp': datetime.now().isoformat()
}
# Need at least one identifier
if not data['tmsi'] and not data['imsi']:
return None
# Calculate distance from TA
if data['ta_value'] is not None:
data['distance_meters'] = data['ta_value'] * config.GSM_TA_METERS_PER_UNIT
return data
except Exception as e:
logger.debug(f"Failed to parse tshark line: {line} - {e}")
@@ -1702,7 +1763,7 @@ def scanner_thread(cmd, device_index):
logger.info("Monitor is running, keeping SDR device allocated")
def monitor_thread(process):
def monitor_thread(process, field_order=None):
"""Thread to read tshark output using standard iter pattern."""
global gsm_devices_tracked
@@ -1767,7 +1828,7 @@ def monitor_thread(process):
if msg_type == 'eof':
break # EOF
parsed = parse_tshark_output(line)
parsed = parse_tshark_output(line, field_order)
if parsed:
packets_captured += 1