diff --git a/routes/gsm_spy.py b/routes/gsm_spy.py index 4540382..9197d71 100644 --- a/routes/gsm_spy.py +++ b/routes/gsm_spy.py @@ -245,9 +245,9 @@ _tshark_fields_cache: dict[str, str] | None = None def _discover_tshark_fields() -> dict[str, str]: """Discover correct tshark field names for GSM A protocol. - Different Wireshark versions use different field names: - - Wireshark 3.x: gsm_a.rr.timing_advance, gsm_a.tmsi, gsm_a.imsi, gsm_a.lac, gsm_a.cellid - - Wireshark 4.x: gsm_a_rr.timing_adv, gsm_a_dtap.tmsi, e212.imsi, e212.lac, e212.ci + Different Wireshark versions use different field names for -e extraction. + We validate candidates by actually testing them with tshark, since + `tshark -G fields` can list fields that aren't valid for -T fields -e. Returns: Dict mapping logical names to actual tshark field names: @@ -270,8 +270,8 @@ def _discover_tshark_fields() -> dict[str, str]: 'gsm_a.dtap.tmsi', ], 'imsi': [ - 'gsm_a.imsi', 'e212.imsi', + 'gsm_a.imsi', 'gsm_a_dtap.imsi', ], 'lac': [ @@ -287,31 +287,55 @@ def _discover_tshark_fields() -> dict[str, str]: ], } - # Query tshark for all available GSM fields - available_fields = set() - try: - result = subprocess.run( - ['tshark', '-G', 'fields'], - capture_output=True, text=True, timeout=10 - ) - for line in result.stdout.splitlines(): - parts = line.split('\t') - if len(parts) >= 3: - available_fields.add(parts[2]) # Field name is 3rd column - except Exception as e: - logger.warning(f"Could not query tshark fields: {e}") + def _test_fields(field_list: list[str]) -> set[str]: + """Test which fields tshark accepts for -e extraction.""" + cmd = ['tshark', '-T', 'fields'] + for f in field_list: + cmd.extend(['-e', f]) + cmd.extend(['-r', '/dev/null']) + try: + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + return set(field_list) + # Parse stderr for invalid fields + invalid = set() + for line in result.stderr.splitlines(): + line = line.strip() + # tshark lists invalid fields one per line after the error header + for f in field_list: + if f in line: + invalid.add(f) + return set(field_list) - invalid + except Exception as e: + logger.warning(f"Could not validate tshark fields: {e}") + return set() - # Match each logical field to the first available candidate + # Collect all unique candidate field names and test them in one call + all_candidates = [] + for field_list in candidates.values(): + all_candidates.extend(field_list) + valid_fields = _test_fields(list(set(all_candidates))) + logger.info(f"Valid tshark -e fields: {sorted(valid_fields)}") + + # Match each logical field to the first valid candidate resolved = {} for logical_name, field_candidates in candidates.items(): - resolved[logical_name] = field_candidates[0] # default fallback - if available_fields: - for candidate in field_candidates: - if candidate in available_fields: - resolved[logical_name] = candidate - break + resolved[logical_name] = None + for candidate in field_candidates: + if candidate in valid_fields: + resolved[logical_name] = candidate + break + if resolved[logical_name] is None: + # No candidate was valid - use first as fallback + resolved[logical_name] = field_candidates[0] + logger.warning( + f"No valid tshark field found for '{logical_name}', " + f"using fallback: {field_candidates[0]}" + ) - logger.info(f"Discovered tshark fields: {resolved}") + logger.info(f"Resolved tshark fields: {resolved}") _tshark_fields_cache = resolved return resolved