mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 06:40:00 -07:00
Fix OpenCellID integration: CID=0 handling, API key check, tab parsing
- /lookup_cell and /detect_rogue rejected CID=0 towers because
`all([..., cid])` is falsy when cid=0; use `is not None` checks
- can_use_api() now returns False when GSM_OPENCELLID_API_KEY is empty,
preventing the geocoding worker from wasting daily quota on doomed calls
- /lookup_cell returns 503 with clear message when API key not configured
- parse_tshark_output uses rstrip('\n\r') instead of strip() to preserve
leading empty tab-separated fields (strip() ate leading tabs, shifting
all columns when the first field was empty)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -89,6 +89,8 @@ def increment_api_usage():
|
||||
|
||||
def can_use_api():
|
||||
"""Check if we can make an API call within daily limit."""
|
||||
if not config.GSM_OPENCELLID_API_KEY:
|
||||
return False
|
||||
current_usage = get_api_usage_today()
|
||||
return current_usage < config.GSM_API_DAILY_LIMIT
|
||||
|
||||
@@ -800,7 +802,7 @@ def lookup_cell():
|
||||
lac = data.get('lac')
|
||||
cid = data.get('cid')
|
||||
|
||||
if not all([mcc, mnc, lac, cid]):
|
||||
if any(v is None for v in [mcc, mnc, lac, cid]):
|
||||
return jsonify({'error': 'MCC, MNC, LAC, and CID required'}), 400
|
||||
|
||||
try:
|
||||
@@ -823,7 +825,9 @@ def lookup_cell():
|
||||
'radio': result['radio']
|
||||
})
|
||||
|
||||
# Check API usage limit
|
||||
# Check API key and usage limit
|
||||
if not config.GSM_OPENCELLID_API_KEY:
|
||||
return jsonify({'error': 'OpenCellID API key not configured'}), 503
|
||||
if not can_use_api():
|
||||
current_usage = get_api_usage_today()
|
||||
return jsonify({
|
||||
@@ -905,7 +909,7 @@ def detect_rogue():
|
||||
lac = tower_info.get('lac')
|
||||
cid = tower_info.get('cid')
|
||||
|
||||
if all([mcc, mnc, lac, cid]):
|
||||
if all(v is not None for v in [mcc, mnc, lac, cid]):
|
||||
with get_db() as conn:
|
||||
result = conn.execute('''
|
||||
SELECT id FROM gsm_cells
|
||||
@@ -1384,7 +1388,7 @@ def parse_tshark_output(line: str, field_order: list[str] | None = None) -> dict
|
||||
field_order = ['ta', 'tmsi', 'imsi', 'lac', 'cid']
|
||||
|
||||
try:
|
||||
parts = line.strip().split('\t')
|
||||
parts = line.rstrip('\n\r').split('\t')
|
||||
|
||||
if len(parts) < len(field_order):
|
||||
return None
|
||||
|
||||
@@ -120,25 +120,38 @@ class TestParseTsharkOutput:
|
||||
assert 'timestamp' in result
|
||||
|
||||
def test_missing_optional_fields(self):
|
||||
"""Test parsing with missing optional fields (empty tabs)."""
|
||||
"""Test parsing with missing optional fields (empty tabs).
|
||||
|
||||
A packet with TA but no TMSI/IMSI is discarded since there's
|
||||
no device identifier to track.
|
||||
"""
|
||||
line = "3\t\t\t1234\t31245"
|
||||
result = parse_tshark_output(line)
|
||||
assert result is None
|
||||
|
||||
def test_missing_optional_fields_with_tmsi(self):
|
||||
"""Test parsing with TMSI but missing TA, IMSI, CID."""
|
||||
line = "\t0xABCD\t\t1234\t"
|
||||
result = parse_tshark_output(line)
|
||||
|
||||
assert result is not None
|
||||
assert result['ta_value'] == 3
|
||||
assert result['tmsi'] is None
|
||||
assert result['ta_value'] is None
|
||||
assert result['tmsi'] == '0xABCD'
|
||||
assert result['imsi'] is None
|
||||
assert result['lac'] == 1234
|
||||
assert result['cid'] == 31245
|
||||
assert result['cid'] is None
|
||||
|
||||
def test_no_ta_value(self):
|
||||
"""Test parsing without TA value (empty field)."""
|
||||
# When TA is empty, int('') will fail, so the parse returns None
|
||||
# This is the current behavior - the function expects valid integers or valid empty handling
|
||||
"""Test parsing without TA value (empty first field)."""
|
||||
line = "\t0xABCD1234\t123456789012345\t1234\t31245"
|
||||
result = parse_tshark_output(line)
|
||||
# Current implementation will fail to parse this due to int('') failing
|
||||
assert result is None
|
||||
|
||||
assert result is not None
|
||||
assert result['ta_value'] is None
|
||||
assert result['tmsi'] == '0xABCD1234'
|
||||
assert result['imsi'] == '123456789012345'
|
||||
assert result['lac'] == 1234
|
||||
assert result['cid'] == 31245
|
||||
|
||||
def test_invalid_line(self):
|
||||
"""Test handling of invalid tshark output."""
|
||||
|
||||
Reference in New Issue
Block a user