Fix TSCM sweep KeyError on RiskLevel.NEEDS_REVIEW

The RiskLevel.NEEDS_REVIEW enum value was 'review' but the
devices_by_risk dict and all summary keys used 'needs_review',
causing a KeyError during sweep correlation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-07 15:02:57 +00:00
parent 1ee64efc81
commit b208576068

View File

@@ -22,7 +22,7 @@ logger = logging.getLogger('intercept.tscm.correlation')
class RiskLevel(Enum):
"""Risk classification levels."""
INFORMATIONAL = 'informational' # Score 0-2
NEEDS_REVIEW = 'review' # Score 3-5
NEEDS_REVIEW = 'needs_review' # Score 3-5
HIGH_INTEREST = 'high_interest' # Score 6+
@@ -154,12 +154,12 @@ class DeviceProfile:
# Correlation
correlated_devices: list[str] = field(default_factory=list)
# Output
confidence: float = 0.0
recommended_action: str = 'monitor'
known_device: bool = False
known_device_name: Optional[str] = None
score_modifier: int = 0
# Output
confidence: float = 0.0
recommended_action: str = 'monitor'
known_device: bool = False
known_device_name: Optional[str] = None
score_modifier: int = 0
def add_rssi_sample(self, rssi: int) -> None:
"""Add an RSSI sample with timestamp."""
@@ -193,9 +193,9 @@ class DeviceProfile:
))
self._recalculate_score()
def _recalculate_score(self) -> None:
"""Recalculate total score and risk level."""
self.total_score = sum(i.score for i in self.indicators)
def _recalculate_score(self) -> None:
"""Recalculate total score and risk level."""
self.total_score = sum(i.score for i in self.indicators)
if self.total_score >= 6:
self.risk_level = RiskLevel.HIGH_INTEREST
@@ -207,29 +207,29 @@ class DeviceProfile:
self.risk_level = RiskLevel.INFORMATIONAL
self.recommended_action = 'monitor'
# Calculate confidence based on number and quality of indicators
indicator_count = len(self.indicators)
self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05))
def apply_score_modifier(self, modifier: int | None) -> None:
"""Apply a score modifier (e.g., known-good device adjustment)."""
base_score = sum(i.score for i in self.indicators)
modifier_val = int(modifier) if modifier is not None else 0
self.score_modifier = modifier_val
self.total_score = max(0, base_score + modifier_val)
if self.total_score >= 6:
self.risk_level = RiskLevel.HIGH_INTEREST
self.recommended_action = 'investigate'
elif self.total_score >= 3:
self.risk_level = RiskLevel.NEEDS_REVIEW
self.recommended_action = 'review'
else:
self.risk_level = RiskLevel.INFORMATIONAL
self.recommended_action = 'monitor'
indicator_count = len(self.indicators)
self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05))
# Calculate confidence based on number and quality of indicators
indicator_count = len(self.indicators)
self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05))
def apply_score_modifier(self, modifier: int | None) -> None:
"""Apply a score modifier (e.g., known-good device adjustment)."""
base_score = sum(i.score for i in self.indicators)
modifier_val = int(modifier) if modifier is not None else 0
self.score_modifier = modifier_val
self.total_score = max(0, base_score + modifier_val)
if self.total_score >= 6:
self.risk_level = RiskLevel.HIGH_INTEREST
self.recommended_action = 'investigate'
elif self.total_score >= 3:
self.risk_level = RiskLevel.NEEDS_REVIEW
self.recommended_action = 'review'
else:
self.risk_level = RiskLevel.INFORMATIONAL
self.recommended_action = 'monitor'
indicator_count = len(self.indicators)
self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05))
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
@@ -254,15 +254,15 @@ class DeviceProfile:
}
for i in self.indicators
],
'total_score': self.total_score,
'score_modifier': self.score_modifier,
'risk_level': self.risk_level.value,
'confidence': round(self.confidence, 2),
'recommended_action': self.recommended_action,
'correlated_devices': self.correlated_devices,
'known_device': self.known_device,
'known_device_name': self.known_device_name,
}
'total_score': self.total_score,
'score_modifier': self.score_modifier,
'risk_level': self.risk_level.value,
'confidence': round(self.confidence, 2),
'recommended_action': self.recommended_action,
'correlated_devices': self.correlated_devices,
'known_device': self.known_device,
'known_device_name': self.known_device_name,
}
# Known audio-capable BLE service UUIDs
@@ -308,11 +308,11 @@ class CorrelationEngine:
potential surveillance activity patterns.
"""
def __init__(self):
self.device_profiles: dict[str, DeviceProfile] = {}
self.meeting_windows: list[tuple[datetime, datetime]] = []
self.correlation_window = timedelta(minutes=5)
self._known_device_cache: dict[str, dict | None] = {}
def __init__(self):
self.device_profiles: dict[str, DeviceProfile] = {}
self.meeting_windows: list[tuple[datetime, datetime]] = []
self.correlation_window = timedelta(minutes=5)
self._known_device_cache: dict[str, dict | None] = {}
def start_meeting_window(self) -> None:
"""Mark the start of a sensitive period (meeting)."""
@@ -326,64 +326,64 @@ class CorrelationEngine:
self.meeting_windows[-1] = (start, datetime.now())
logger.info("Meeting window ended")
def is_during_meeting(self, timestamp: datetime = None) -> bool:
"""Check if timestamp falls within a meeting window."""
ts = timestamp or datetime.now()
for start, end in self.meeting_windows:
if end is None:
if ts >= start:
return True
elif start <= ts <= end:
return True
return False
def _lookup_known_device(self, identifier: str, protocol: str) -> dict | None:
"""Lookup known-good device details with light normalization."""
cache_key = f"{protocol}:{identifier}"
if cache_key in self._known_device_cache:
return self._known_device_cache[cache_key]
try:
from utils.database import is_known_good_device
candidates = []
if identifier:
candidates.append(str(identifier))
if protocol == 'rf':
try:
freq_val = float(identifier)
candidates.append(f"{freq_val:.3f}")
candidates.append(f"{freq_val:.1f}")
except (ValueError, TypeError):
pass
known = None
for cand in candidates:
if not cand:
continue
known = is_known_good_device(str(cand).upper())
if known:
break
except Exception:
known = None
self._known_device_cache[cache_key] = known
return known
def _apply_known_device_modifier(self, profile: DeviceProfile, identifier: str, protocol: str) -> None:
"""Apply known-good score modifier and update profile metadata."""
known = self._lookup_known_device(identifier, protocol)
if known:
profile.known_device = True
profile.known_device_name = known.get('name') if isinstance(known, dict) else None
modifier = known.get('score_modifier', 0) if isinstance(known, dict) else 0
else:
profile.known_device = False
profile.known_device_name = None
modifier = 0
profile.apply_score_modifier(modifier)
def is_during_meeting(self, timestamp: datetime = None) -> bool:
"""Check if timestamp falls within a meeting window."""
ts = timestamp or datetime.now()
for start, end in self.meeting_windows:
if end is None:
if ts >= start:
return True
elif start <= ts <= end:
return True
return False
def _lookup_known_device(self, identifier: str, protocol: str) -> dict | None:
"""Lookup known-good device details with light normalization."""
cache_key = f"{protocol}:{identifier}"
if cache_key in self._known_device_cache:
return self._known_device_cache[cache_key]
try:
from utils.database import is_known_good_device
candidates = []
if identifier:
candidates.append(str(identifier))
if protocol == 'rf':
try:
freq_val = float(identifier)
candidates.append(f"{freq_val:.3f}")
candidates.append(f"{freq_val:.1f}")
except (ValueError, TypeError):
pass
known = None
for cand in candidates:
if not cand:
continue
known = is_known_good_device(str(cand).upper())
if known:
break
except Exception:
known = None
self._known_device_cache[cache_key] = known
return known
def _apply_known_device_modifier(self, profile: DeviceProfile, identifier: str, protocol: str) -> None:
"""Apply known-good score modifier and update profile metadata."""
known = self._lookup_known_device(identifier, protocol)
if known:
profile.known_device = True
profile.known_device_name = known.get('name') if isinstance(known, dict) else None
modifier = known.get('score_modifier', 0) if isinstance(known, dict) else 0
else:
profile.known_device = False
profile.known_device_name = None
modifier = 0
profile.apply_score_modifier(modifier)
def get_or_create_profile(self, identifier: str, protocol: str) -> DeviceProfile:
"""Get existing profile or create new one."""
@@ -634,33 +634,33 @@ class CorrelationEngine:
)
# Also check name for tracker keywords
if profile.name:
name_lower = profile.name.lower()
if 'airtag' in name_lower or 'findmy' in name_lower:
profile.add_indicator(
IndicatorType.AIRTAG_DETECTED,
f'AirTag identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'AirTag'
elif 'tile' in name_lower:
profile.add_indicator(
IndicatorType.TILE_DETECTED,
f'Tile tracker identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'Tile Tracker'
elif 'smarttag' in name_lower:
profile.add_indicator(
IndicatorType.SMARTTAG_DETECTED,
f'SmartTag identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'Samsung SmartTag'
self._apply_known_device_modifier(profile, mac, 'bluetooth')
return profile
if profile.name:
name_lower = profile.name.lower()
if 'airtag' in name_lower or 'findmy' in name_lower:
profile.add_indicator(
IndicatorType.AIRTAG_DETECTED,
f'AirTag identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'AirTag'
elif 'tile' in name_lower:
profile.add_indicator(
IndicatorType.TILE_DETECTED,
f'Tile tracker identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'Tile Tracker'
elif 'smarttag' in name_lower:
profile.add_indicator(
IndicatorType.SMARTTAG_DETECTED,
f'SmartTag identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'Samsung SmartTag'
self._apply_known_device_modifier(profile, mac, 'bluetooth')
return profile
def analyze_wifi_device(self, device: dict) -> DeviceProfile:
"""
@@ -763,18 +763,18 @@ class CorrelationEngine:
)
# 8. Strong hidden AP (very suspicious)
if profile.is_hidden and profile.rssi_samples:
latest_rssi = profile.rssi_samples[-1][1]
if latest_rssi > -50:
profile.add_indicator(
IndicatorType.ROGUE_AP,
f'Strong hidden AP (RSSI: {latest_rssi} dBm)',
{'rssi': latest_rssi}
)
self._apply_known_device_modifier(profile, bssid, 'wifi')
return profile
if profile.is_hidden and profile.rssi_samples:
latest_rssi = profile.rssi_samples[-1][1]
if latest_rssi > -50:
profile.add_indicator(
IndicatorType.ROGUE_AP,
f'Strong hidden AP (RSSI: {latest_rssi} dBm)',
{'rssi': latest_rssi}
)
self._apply_known_device_modifier(profile, bssid, 'wifi')
return profile
def analyze_rf_signal(self, signal: dict) -> DeviceProfile:
"""
@@ -857,16 +857,16 @@ class CorrelationEngine:
)
# 5. Meeting correlation
if self.is_during_meeting():
profile.add_indicator(
IndicatorType.MEETING_CORRELATED,
'Signal detected during sensitive period',
{'during_meeting': True}
)
self._apply_known_device_modifier(profile, freq_key, 'rf')
return profile
if self.is_during_meeting():
profile.add_indicator(
IndicatorType.MEETING_CORRELATED,
'Signal detected during sensitive period',
{'during_meeting': True}
)
self._apply_known_device_modifier(profile, freq_key, 'rf')
return profile
def correlate_devices(self) -> list[dict]:
"""
@@ -953,26 +953,26 @@ class CorrelationEngine:
{'correlated_device': ap.identifier}
)
# Correlation 3: Same vendor BLE + WiFi
for bt in bt_devices:
if bt.manufacturer:
for wifi in wifi_devices:
if wifi.manufacturer and bt.manufacturer.lower() in wifi.manufacturer.lower():
correlation = {
# Correlation 3: Same vendor BLE + WiFi
for bt in bt_devices:
if bt.manufacturer:
for wifi in wifi_devices:
if wifi.manufacturer and bt.manufacturer.lower() in wifi.manufacturer.lower():
correlation = {
'type': 'same_vendor_bt_wifi',
'description': f'Same vendor ({bt.manufacturer}) on BLE and WiFi',
'devices': [bt.identifier, wifi.identifier],
'protocols': ['bluetooth', 'wifi'],
'score_boost': 2,
'significance': 'medium',
}
correlations.append(correlation)
# Re-apply known-good modifiers after correlation boosts
for profile in self.device_profiles.values():
self._apply_known_device_modifier(profile, profile.identifier, profile.protocol)
return correlations
}
correlations.append(correlation)
# Re-apply known-good modifiers after correlation boosts
for profile in self.device_profiles.values():
self._apply_known_device_modifier(profile, profile.identifier, profile.protocol)
return correlations
def get_high_interest_devices(self) -> list[DeviceProfile]:
"""Get all devices classified as high interest."""