Fix TSCM sweep scan resilience and add per-device error isolation

The sweep loop's WiFi/BT/RF scan processing had unprotected
timeline_manager.add_observation() calls that could crash an entire
scan iteration, silently preventing all device events from reaching
the frontend. Additionally, scan interval timestamps were only updated
at the end of processing, causing tight retry loops on persistent errors.

- Wrap timeline observation calls in try/except for all three protocols
- Move last_*_scan timestamp updates immediately after scan completes
- Add per-device try/except so one bad device doesn't block others
- Emit sweep_progress after WiFi scan for real-time status visibility
- Log warning when WiFi scan returns 0 networks for easier diagnosis
- Add known_device and score_modifier fields to correlation engine
- Add TSCM scheduling, cases, known devices, and advanced WiFi indicators

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-05 16:07:34 +00:00
parent 11941bedad
commit 5d4b19aef2
11 changed files with 6202 additions and 3555 deletions

View File

@@ -154,9 +154,12 @@ class DeviceProfile:
# Correlation
correlated_devices: list[str] = field(default_factory=list)
# Output
confidence: float = 0.0
recommended_action: str = 'monitor'
# Output
confidence: float = 0.0
recommended_action: str = 'monitor'
known_device: bool = False
known_device_name: Optional[str] = None
score_modifier: int = 0
def add_rssi_sample(self, rssi: int) -> None:
"""Add an RSSI sample with timestamp."""
@@ -190,9 +193,9 @@ class DeviceProfile:
))
self._recalculate_score()
def _recalculate_score(self) -> None:
"""Recalculate total score and risk level."""
self.total_score = sum(i.score for i in self.indicators)
def _recalculate_score(self) -> None:
"""Recalculate total score and risk level."""
self.total_score = sum(i.score for i in self.indicators)
if self.total_score >= 6:
self.risk_level = RiskLevel.HIGH_INTEREST
@@ -204,9 +207,29 @@ class DeviceProfile:
self.risk_level = RiskLevel.INFORMATIONAL
self.recommended_action = 'monitor'
# Calculate confidence based on number and quality of indicators
indicator_count = len(self.indicators)
self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05))
# Calculate confidence based on number and quality of indicators
indicator_count = len(self.indicators)
self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05))
def apply_score_modifier(self, modifier: int | None) -> None:
"""Apply a score modifier (e.g., known-good device adjustment)."""
base_score = sum(i.score for i in self.indicators)
modifier_val = int(modifier) if modifier is not None else 0
self.score_modifier = modifier_val
self.total_score = max(0, base_score + modifier_val)
if self.total_score >= 6:
self.risk_level = RiskLevel.HIGH_INTEREST
self.recommended_action = 'investigate'
elif self.total_score >= 3:
self.risk_level = RiskLevel.NEEDS_REVIEW
self.recommended_action = 'review'
else:
self.risk_level = RiskLevel.INFORMATIONAL
self.recommended_action = 'monitor'
indicator_count = len(self.indicators)
self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05))
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
@@ -231,12 +254,15 @@ class DeviceProfile:
}
for i in self.indicators
],
'total_score': self.total_score,
'risk_level': self.risk_level.value,
'confidence': round(self.confidence, 2),
'recommended_action': self.recommended_action,
'correlated_devices': self.correlated_devices,
}
'total_score': self.total_score,
'score_modifier': self.score_modifier,
'risk_level': self.risk_level.value,
'confidence': round(self.confidence, 2),
'recommended_action': self.recommended_action,
'correlated_devices': self.correlated_devices,
'known_device': self.known_device,
'known_device_name': self.known_device_name,
}
# Known audio-capable BLE service UUIDs
@@ -282,10 +308,11 @@ class CorrelationEngine:
potential surveillance activity patterns.
"""
def __init__(self):
self.device_profiles: dict[str, DeviceProfile] = {}
self.meeting_windows: list[tuple[datetime, datetime]] = []
self.correlation_window = timedelta(minutes=5)
def __init__(self):
self.device_profiles: dict[str, DeviceProfile] = {}
self.meeting_windows: list[tuple[datetime, datetime]] = []
self.correlation_window = timedelta(minutes=5)
self._known_device_cache: dict[str, dict | None] = {}
def start_meeting_window(self) -> None:
"""Mark the start of a sensitive period (meeting)."""
@@ -299,16 +326,64 @@ class CorrelationEngine:
self.meeting_windows[-1] = (start, datetime.now())
logger.info("Meeting window ended")
def is_during_meeting(self, timestamp: datetime = None) -> bool:
"""Check if timestamp falls within a meeting window."""
ts = timestamp or datetime.now()
for start, end in self.meeting_windows:
if end is None:
if ts >= start:
return True
elif start <= ts <= end:
return True
return False
def is_during_meeting(self, timestamp: datetime = None) -> bool:
"""Check if timestamp falls within a meeting window."""
ts = timestamp or datetime.now()
for start, end in self.meeting_windows:
if end is None:
if ts >= start:
return True
elif start <= ts <= end:
return True
return False
def _lookup_known_device(self, identifier: str, protocol: str) -> dict | None:
"""Lookup known-good device details with light normalization."""
cache_key = f"{protocol}:{identifier}"
if cache_key in self._known_device_cache:
return self._known_device_cache[cache_key]
try:
from utils.database import is_known_good_device
candidates = []
if identifier:
candidates.append(str(identifier))
if protocol == 'rf':
try:
freq_val = float(identifier)
candidates.append(f"{freq_val:.3f}")
candidates.append(f"{freq_val:.1f}")
except (ValueError, TypeError):
pass
known = None
for cand in candidates:
if not cand:
continue
known = is_known_good_device(str(cand).upper())
if known:
break
except Exception:
known = None
self._known_device_cache[cache_key] = known
return known
def _apply_known_device_modifier(self, profile: DeviceProfile, identifier: str, protocol: str) -> None:
"""Apply known-good score modifier and update profile metadata."""
known = self._lookup_known_device(identifier, protocol)
if known:
profile.known_device = True
profile.known_device_name = known.get('name') if isinstance(known, dict) else None
modifier = known.get('score_modifier', 0) if isinstance(known, dict) else 0
else:
profile.known_device = False
profile.known_device_name = None
modifier = 0
profile.apply_score_modifier(modifier)
def get_or_create_profile(self, identifier: str, protocol: str) -> DeviceProfile:
"""Get existing profile or create new one."""
@@ -559,31 +634,33 @@ class CorrelationEngine:
)
# Also check name for tracker keywords
if profile.name:
name_lower = profile.name.lower()
if 'airtag' in name_lower or 'findmy' in name_lower:
profile.add_indicator(
IndicatorType.AIRTAG_DETECTED,
f'AirTag identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'AirTag'
elif 'tile' in name_lower:
profile.add_indicator(
IndicatorType.TILE_DETECTED,
f'Tile tracker identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'Tile Tracker'
elif 'smarttag' in name_lower:
profile.add_indicator(
IndicatorType.SMARTTAG_DETECTED,
f'SmartTag identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'Samsung SmartTag'
return profile
if profile.name:
name_lower = profile.name.lower()
if 'airtag' in name_lower or 'findmy' in name_lower:
profile.add_indicator(
IndicatorType.AIRTAG_DETECTED,
f'AirTag identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'AirTag'
elif 'tile' in name_lower:
profile.add_indicator(
IndicatorType.TILE_DETECTED,
f'Tile tracker identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'Tile Tracker'
elif 'smarttag' in name_lower:
profile.add_indicator(
IndicatorType.SMARTTAG_DETECTED,
f'SmartTag identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'Samsung SmartTag'
self._apply_known_device_modifier(profile, mac, 'bluetooth')
return profile
def analyze_wifi_device(self, device: dict) -> DeviceProfile:
"""
@@ -686,16 +763,18 @@ class CorrelationEngine:
)
# 8. Strong hidden AP (very suspicious)
if profile.is_hidden and profile.rssi_samples:
latest_rssi = profile.rssi_samples[-1][1]
if latest_rssi > -50:
profile.add_indicator(
IndicatorType.ROGUE_AP,
f'Strong hidden AP (RSSI: {latest_rssi} dBm)',
{'rssi': latest_rssi}
)
return profile
if profile.is_hidden and profile.rssi_samples:
latest_rssi = profile.rssi_samples[-1][1]
if latest_rssi > -50:
profile.add_indicator(
IndicatorType.ROGUE_AP,
f'Strong hidden AP (RSSI: {latest_rssi} dBm)',
{'rssi': latest_rssi}
)
self._apply_known_device_modifier(profile, bssid, 'wifi')
return profile
def analyze_rf_signal(self, signal: dict) -> DeviceProfile:
"""
@@ -778,14 +857,16 @@ class CorrelationEngine:
)
# 5. Meeting correlation
if self.is_during_meeting():
profile.add_indicator(
IndicatorType.MEETING_CORRELATED,
'Signal detected during sensitive period',
{'during_meeting': True}
)
return profile
if self.is_during_meeting():
profile.add_indicator(
IndicatorType.MEETING_CORRELATED,
'Signal detected during sensitive period',
{'during_meeting': True}
)
self._apply_known_device_modifier(profile, freq_key, 'rf')
return profile
def correlate_devices(self) -> list[dict]:
"""
@@ -872,22 +953,26 @@ class CorrelationEngine:
{'correlated_device': ap.identifier}
)
# Correlation 3: Same vendor BLE + WiFi
for bt in bt_devices:
if bt.manufacturer:
for wifi in wifi_devices:
if wifi.manufacturer and bt.manufacturer.lower() in wifi.manufacturer.lower():
correlation = {
# Correlation 3: Same vendor BLE + WiFi
for bt in bt_devices:
if bt.manufacturer:
for wifi in wifi_devices:
if wifi.manufacturer and bt.manufacturer.lower() in wifi.manufacturer.lower():
correlation = {
'type': 'same_vendor_bt_wifi',
'description': f'Same vendor ({bt.manufacturer}) on BLE and WiFi',
'devices': [bt.identifier, wifi.identifier],
'protocols': ['bluetooth', 'wifi'],
'score_boost': 2,
'significance': 'medium',
}
correlations.append(correlation)
return correlations
}
correlations.append(correlation)
# Re-apply known-good modifiers after correlation boosts
for profile in self.device_profiles.values():
self._apply_known_device_modifier(profile, profile.identifier, profile.protocol)
return correlations
def get_high_interest_devices(self) -> list[DeviceProfile]:
"""Get all devices classified as high interest."""