From dbe2003d75eb8a492744b366605af5ea4dc36818 Mon Sep 17 00:00:00 2001 From: James Smith Date: Tue, 19 May 2026 17:47:21 +0100 Subject: [PATCH] fix: guard _looked_up_icaos popitem against concurrent clear(); add eviction tests contextlib.suppress(KeyError) around popitem prevents a crash in the SBS parser thread if stop_adsb() calls clear() concurrently between the len() check and the popitem call. Two unit tests verify FIFO eviction semantics and duplicate-key no-op. --- routes/adsb.py | 3 +- tests/test_adsb_history.py | 150 +++++++++++++++++++++++-------------- 2 files changed, 95 insertions(+), 58 deletions(-) diff --git a/routes/adsb.py b/routes/adsb.py index 05f4156..3e078b1 100644 --- a/routes/adsb.py +++ b/routes/adsb.py @@ -731,7 +731,8 @@ def parse_sbs_stream(service_addr): # Look up aircraft type from database (once per ICAO) if icao not in _looked_up_icaos: if len(_looked_up_icaos) >= _ICAO_CACHE_MAX: - _looked_up_icaos.popitem(last=False) + with contextlib.suppress(KeyError): + _looked_up_icaos.popitem(last=False) _looked_up_icaos[icao] = None db_info = aircraft_db.lookup(icao) if db_info: diff --git a/tests/test_adsb_history.py b/tests/test_adsb_history.py index 995a690..9327ffa 100644 --- a/tests/test_adsb_history.py +++ b/tests/test_adsb_history.py @@ -15,13 +15,13 @@ class TestAdsbHistoryWriterUnit: def mock_config(self): """Mock config with history disabled.""" with patch.multiple( - 'utils.adsb_history', + "utils.adsb_history", ADSB_HISTORY_ENABLED=False, - ADSB_DB_HOST='localhost', + ADSB_DB_HOST="localhost", ADSB_DB_PORT=5432, - ADSB_DB_NAME='test_db', - ADSB_DB_USER='test', - ADSB_DB_PASSWORD='test', + ADSB_DB_NAME="test_db", + ADSB_DB_USER="test", + ADSB_DB_PASSWORD="test", ADSB_HISTORY_BATCH_SIZE=100, ADSB_HISTORY_FLUSH_INTERVAL=1.0, ADSB_HISTORY_QUEUE_SIZE=1000, @@ -32,13 +32,13 @@ class TestAdsbHistoryWriterUnit: def mock_config_enabled(self): """Mock config with history enabled.""" with patch.multiple( - 'utils.adsb_history', + "utils.adsb_history", ADSB_HISTORY_ENABLED=True, - ADSB_DB_HOST='localhost', + ADSB_DB_HOST="localhost", ADSB_DB_PORT=5432, - ADSB_DB_NAME='test_db', - ADSB_DB_USER='test', - ADSB_DB_PASSWORD='test', + ADSB_DB_NAME="test_db", + ADSB_DB_USER="test", + ADSB_DB_PASSWORD="test", ADSB_HISTORY_BATCH_SIZE=100, ADSB_HISTORY_FLUSH_INTERVAL=1.0, ADSB_HISTORY_QUEUE_SIZE=1000, @@ -57,7 +57,7 @@ class TestAdsbHistoryWriterUnit: assert writer._thread is None # Should not queue records - writer.enqueue({'icao': 'ABC123'}) + writer.enqueue({"icao": "ABC123"}) assert writer._queue.empty() def test_enqueue_adds_received_at(self, mock_config_enabled): @@ -67,12 +67,12 @@ class TestAdsbHistoryWriterUnit: writer = AdsbHistoryWriter() writer.enabled = True - record = {'icao': 'ABC123'} + record = {"icao": "ABC123"} writer.enqueue(record) # Record should have received_at added - assert 'received_at' in record - assert isinstance(record['received_at'], datetime) + assert "received_at" in record + assert isinstance(record["received_at"], datetime) def test_enqueue_preserves_existing_received_at(self, mock_config_enabled): """Test enqueue preserves existing received_at.""" @@ -82,10 +82,10 @@ class TestAdsbHistoryWriterUnit: writer.enabled = True original_time = datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - record = {'icao': 'ABC123', 'received_at': original_time} + record = {"icao": "ABC123", "received_at": original_time} writer.enqueue(record) - assert record['received_at'] == original_time + assert record["received_at"] == original_time def test_enqueue_drops_when_queue_full(self, mock_config_enabled): """Test enqueue drops records when queue is full.""" @@ -96,11 +96,11 @@ class TestAdsbHistoryWriterUnit: writer._queue = queue.Queue(maxsize=2) # Fill the queue - writer.enqueue({'icao': 'A'}) - writer.enqueue({'icao': 'B'}) + writer.enqueue({"icao": "A"}) + writer.enqueue({"icao": "B"}) # This should be dropped - writer.enqueue({'icao': 'C'}) + writer.enqueue({"icao": "C"}) assert writer._dropped == 1 assert writer._queue.qsize() == 2 @@ -113,13 +113,13 @@ class TestAdsbSnapshotWriterUnit: def mock_config_enabled(self): """Mock config with history enabled.""" with patch.multiple( - 'utils.adsb_history', + "utils.adsb_history", ADSB_HISTORY_ENABLED=True, - ADSB_DB_HOST='localhost', + ADSB_DB_HOST="localhost", ADSB_DB_PORT=5432, - ADSB_DB_NAME='test_db', - ADSB_DB_USER='test', - ADSB_DB_PASSWORD='test', + ADSB_DB_NAME="test_db", + ADSB_DB_USER="test", + ADSB_DB_PASSWORD="test", ADSB_HISTORY_BATCH_SIZE=100, ADSB_HISTORY_FLUSH_INTERVAL=1.0, ADSB_HISTORY_QUEUE_SIZE=1000, @@ -133,11 +133,11 @@ class TestAdsbSnapshotWriterUnit: writer = AdsbSnapshotWriter() writer.enabled = True - record = {'icao': 'ABC123'} + record = {"icao": "ABC123"} writer.enqueue(record) - assert 'captured_at' in record - assert isinstance(record['captured_at'], datetime) + assert "captured_at" in record + assert isinstance(record["captured_at"], datetime) class TestMakeDsn: @@ -146,22 +146,22 @@ class TestMakeDsn: def test_make_dsn_format(self): """Test DSN string format.""" with patch.multiple( - 'utils.adsb_history', - ADSB_DB_HOST='testhost', + "utils.adsb_history", + ADSB_DB_HOST="testhost", ADSB_DB_PORT=5433, - ADSB_DB_NAME='testdb', - ADSB_DB_USER='testuser', - ADSB_DB_PASSWORD='testpass', + ADSB_DB_NAME="testdb", + ADSB_DB_USER="testuser", + ADSB_DB_PASSWORD="testpass", ): from utils.adsb_history import _make_dsn dsn = _make_dsn() - assert 'host=testhost' in dsn - assert 'port=5433' in dsn - assert 'dbname=testdb' in dsn - assert 'user=testuser' in dsn - assert 'password=testpass' in dsn + assert "host=testhost" in dsn + assert "port=5433" in dsn + assert "dbname=testdb" in dsn + assert "user=testuser" in dsn + assert "password=testpass" in dsn class TestEnsureAdsbSchema: @@ -197,10 +197,10 @@ class TestEnsureAdsbSchema: # Get all executed SQL executed_sql = [str(call) for call in mock_cursor.execute.call_args_list] - sql_text = ' '.join(executed_sql) + sql_text = " ".join(executed_sql) # Should create indexes - assert 'CREATE INDEX' in sql_text or 'idx_adsb' in sql_text + assert "CREATE INDEX" in sql_text or "idx_adsb" in sql_text class TestMessageFields: @@ -210,10 +210,7 @@ class TestMessageFields: """Test required message fields are defined.""" from utils.adsb_history import _MESSAGE_FIELDS - required_fields = [ - 'received_at', 'icao', 'callsign', 'altitude', - 'speed', 'heading', 'lat', 'lon', 'squawk' - ] + required_fields = ["received_at", "icao", "callsign", "altitude", "speed", "heading", "lat", "lon", "squawk"] for field in required_fields: assert field in _MESSAGE_FIELDS @@ -222,10 +219,7 @@ class TestMessageFields: """Test required snapshot fields are defined.""" from utils.adsb_history import _SNAPSHOT_FIELDS - required_fields = [ - 'captured_at', 'icao', 'callsign', 'altitude', - 'lat', 'lon', 'snapshot' - ] + required_fields = ["captured_at", "icao", "callsign", "altitude", "lat", "lon", "snapshot"] for field in required_fields: assert field in _SNAPSHOT_FIELDS @@ -237,16 +231,16 @@ class TestWriterThreadSafety: def test_multiple_enqueue_thread_safe(self): """Test multiple threads can enqueue safely.""" with patch.multiple( - 'utils.adsb_history', + "utils.adsb_history", ADSB_HISTORY_ENABLED=True, ADSB_HISTORY_QUEUE_SIZE=10000, ADSB_HISTORY_BATCH_SIZE=100, ADSB_HISTORY_FLUSH_INTERVAL=1.0, - ADSB_DB_HOST='localhost', + ADSB_DB_HOST="localhost", ADSB_DB_PORT=5432, - ADSB_DB_NAME='test', - ADSB_DB_USER='test', - ADSB_DB_PASSWORD='test', + ADSB_DB_NAME="test", + ADSB_DB_USER="test", + ADSB_DB_PASSWORD="test", ): from utils.adsb_history import AdsbHistoryWriter @@ -257,14 +251,11 @@ class TestWriterThreadSafety: def enqueue_many(n): try: for i in range(n): - writer.enqueue({'icao': f'TEST{i}', 'altitude': i * 100}) + writer.enqueue({"icao": f"TEST{i}", "altitude": i * 100}) except Exception as e: errors.append(e) - threads = [ - threading.Thread(target=enqueue_many, args=(100,)) - for _ in range(5) - ] + threads = [threading.Thread(target=enqueue_many, args=(100,)) for _ in range(5)] for t in threads: t.start() @@ -274,3 +265,48 @@ class TestWriterThreadSafety: assert len(errors) == 0 # Should have queued 500 records (5 threads * 100 each) assert writer._queue.qsize() == 500 + + +class TestIcaoLookupCache: + """Unit tests for the bounded ICAO lookup cache in routes.adsb.""" + + def test_fifo_eviction_drops_oldest_entry(self): + """Oldest ICAO must be evicted when the cache reaches its limit.""" + import contextlib + from collections import OrderedDict + + # Reproduce the eviction logic with a tiny cap. + cap = 3 + cache: OrderedDict[str, None] = OrderedDict() + + def insert(icao: str) -> None: + if icao not in cache: + if len(cache) >= cap: + with contextlib.suppress(KeyError): + cache.popitem(last=False) + cache[icao] = None + + insert("AA0001") + insert("BB0002") + insert("CC0003") + assert list(cache) == ["AA0001", "BB0002", "CC0003"] + + insert("DD0004") # should evict AA0001 + assert "AA0001" not in cache + assert list(cache) == ["BB0002", "CC0003", "DD0004"] + + def test_existing_entry_is_not_reinserted(self): + """Inserting a duplicate ICAO must not change the cache order or size.""" + from collections import OrderedDict + + cache: OrderedDict[str, None] = OrderedDict() + + for icao in ("AA", "BB", "CC"): + cache[icao] = None + + # Inserting an existing key should be a no-op (guarded by `if icao not in cache`). + if "AA" not in cache: + cache["AA"] = None + + assert list(cache) == ["AA", "BB", "CC"] + assert len(cache) == 3