test: use threading to correctly exercise cleanup re-validation guard

This commit is contained in:
James Smith
2026-05-19 13:08:32 +01:00
parent 6ed24b758d
commit efc14b4de0
+44 -25
View File
@@ -1,7 +1,7 @@
"""Tests for utility modules."""
import threading
import time
from unittest.mock import patch
from data.oui import get_manufacturer
from utils.cleanup import DataStore
@@ -93,35 +93,54 @@ class TestDataStoreCleanup:
assert "new" in store
def test_cleanup_does_not_delete_refreshed_entry(self):
"""An entry refreshed after the cleanup snapshot must survive cleanup()."""
"""Entry refreshed after the snapshot is taken must survive deletion re-validation."""
store = DataStore(max_age_seconds=0.05, name="test")
store.set("key", "old")
time.sleep(0.06) # expire it
# Mock time.time to return different values on successive calls.
# This simulates the scenario where cleanup() snapshots the timestamp,
# then between snapshot and deletion a refresh happens (timestamp updates),
# and the re-validation check uses a different time value.
call_sequence = iter(
[
time.time() - 1.0, # cleanup() first call: now = old time, so "key" appears expired
time.time() - 1.0, # re-validation: now = same time, still expired... but wait
]
)
real_lock = store._lock
snapshot_done = threading.Event()
ok_to_delete = threading.Event()
exit_count = [0]
original_time = time.time
class PauseLock:
def acquire(self, *a, **kw):
return real_lock.acquire(*a, **kw)
def mocked_time():
try:
return next(call_sequence)
except StopIteration:
# After we've used the mock sequence, return current time
return original_time()
def release(self):
real_lock.release()
with patch("utils.cleanup.time.time", mocked_time):
# Just before cleanup, refresh the key so it has a fresh timestamp
store.set("key", "refreshed")
removed = store.cleanup()
def __enter__(self):
self.acquire()
return self
assert removed == 0, "Entry refreshed before cleanup must survive"
assert "key" in store
def __exit__(self, *a):
self.release()
exit_count[0] += 1
if exit_count[0] == 1:
snapshot_done.set()
ok_to_delete.wait(timeout=2.0)
store._lock = PauseLock()
result = [None]
def run():
result[0] = store.cleanup()
t = threading.Thread(target=run, daemon=True)
t.start()
snapshot_done.wait(timeout=2.0)
# Inject refresh using real_lock directly (bypasses PauseLock)
with real_lock:
store.data["key"] = "refreshed"
store.timestamps["key"] = time.time()
ok_to_delete.set()
t.join(timeout=2.0)
store._lock = real_lock
assert result[0] == 0, "Re-validation guard must prevent deletion of refreshed entry"
assert store.get("key") == "refreshed"