diff --git a/tests/test_utils.py b/tests/test_utils.py index 1e394be..f9ba56d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,7 +1,7 @@ """Tests for utility modules.""" +import threading import time -from unittest.mock import patch from data.oui import get_manufacturer from utils.cleanup import DataStore @@ -93,35 +93,54 @@ class TestDataStoreCleanup: assert "new" in store def test_cleanup_does_not_delete_refreshed_entry(self): - """An entry refreshed after the cleanup snapshot must survive cleanup().""" + """Entry refreshed after the snapshot is taken must survive deletion re-validation.""" store = DataStore(max_age_seconds=0.05, name="test") store.set("key", "old") time.sleep(0.06) # expire it - # Mock time.time to return different values on successive calls. - # This simulates the scenario where cleanup() snapshots the timestamp, - # then between snapshot and deletion a refresh happens (timestamp updates), - # and the re-validation check uses a different time value. - call_sequence = iter( - [ - time.time() - 1.0, # cleanup() first call: now = old time, so "key" appears expired - time.time() - 1.0, # re-validation: now = same time, still expired... but wait - ] - ) + real_lock = store._lock + snapshot_done = threading.Event() + ok_to_delete = threading.Event() + exit_count = [0] - original_time = time.time + class PauseLock: + def acquire(self, *a, **kw): + return real_lock.acquire(*a, **kw) - def mocked_time(): - try: - return next(call_sequence) - except StopIteration: - # After we've used the mock sequence, return current time - return original_time() + def release(self): + real_lock.release() - with patch("utils.cleanup.time.time", mocked_time): - # Just before cleanup, refresh the key so it has a fresh timestamp - store.set("key", "refreshed") - removed = store.cleanup() + def __enter__(self): + self.acquire() + return self - assert removed == 0, "Entry refreshed before cleanup must survive" - assert "key" in store + def __exit__(self, *a): + self.release() + exit_count[0] += 1 + if exit_count[0] == 1: + snapshot_done.set() + ok_to_delete.wait(timeout=2.0) + + store._lock = PauseLock() + + result = [None] + + def run(): + result[0] = store.cleanup() + + t = threading.Thread(target=run, daemon=True) + t.start() + + snapshot_done.wait(timeout=2.0) + + # Inject refresh using real_lock directly (bypasses PauseLock) + with real_lock: + store.data["key"] = "refreshed" + store.timestamps["key"] = time.time() + + ok_to_delete.set() + t.join(timeout=2.0) + store._lock = real_lock + + assert result[0] == 0, "Re-validation guard must prevent deletion of refreshed entry" + assert store.get("key") == "refreshed"