diff --git a/utils/tscm/correlation.py b/utils/tscm/correlation.py index e72e34b..1e6e795 100644 --- a/utils/tscm/correlation.py +++ b/utils/tscm/correlation.py @@ -22,7 +22,7 @@ logger = logging.getLogger('intercept.tscm.correlation') class RiskLevel(Enum): """Risk classification levels.""" INFORMATIONAL = 'informational' # Score 0-2 - NEEDS_REVIEW = 'review' # Score 3-5 + NEEDS_REVIEW = 'needs_review' # Score 3-5 HIGH_INTEREST = 'high_interest' # Score 6+ @@ -154,12 +154,12 @@ class DeviceProfile: # Correlation correlated_devices: list[str] = field(default_factory=list) - # Output - confidence: float = 0.0 - recommended_action: str = 'monitor' - known_device: bool = False - known_device_name: Optional[str] = None - score_modifier: int = 0 + # Output + confidence: float = 0.0 + recommended_action: str = 'monitor' + known_device: bool = False + known_device_name: Optional[str] = None + score_modifier: int = 0 def add_rssi_sample(self, rssi: int) -> None: """Add an RSSI sample with timestamp.""" @@ -193,9 +193,9 @@ class DeviceProfile: )) self._recalculate_score() - def _recalculate_score(self) -> None: - """Recalculate total score and risk level.""" - self.total_score = sum(i.score for i in self.indicators) + def _recalculate_score(self) -> None: + """Recalculate total score and risk level.""" + self.total_score = sum(i.score for i in self.indicators) if self.total_score >= 6: self.risk_level = RiskLevel.HIGH_INTEREST @@ -207,29 +207,29 @@ class DeviceProfile: self.risk_level = RiskLevel.INFORMATIONAL self.recommended_action = 'monitor' - # Calculate confidence based on number and quality of indicators - indicator_count = len(self.indicators) - self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05)) - - def apply_score_modifier(self, modifier: int | None) -> None: - """Apply a score modifier (e.g., known-good device adjustment).""" - base_score = sum(i.score for i in self.indicators) - modifier_val = int(modifier) if modifier is not None else 0 - self.score_modifier = modifier_val - self.total_score = max(0, base_score + modifier_val) - - if self.total_score >= 6: - self.risk_level = RiskLevel.HIGH_INTEREST - self.recommended_action = 'investigate' - elif self.total_score >= 3: - self.risk_level = RiskLevel.NEEDS_REVIEW - self.recommended_action = 'review' - else: - self.risk_level = RiskLevel.INFORMATIONAL - self.recommended_action = 'monitor' - - indicator_count = len(self.indicators) - self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05)) + # Calculate confidence based on number and quality of indicators + indicator_count = len(self.indicators) + self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05)) + + def apply_score_modifier(self, modifier: int | None) -> None: + """Apply a score modifier (e.g., known-good device adjustment).""" + base_score = sum(i.score for i in self.indicators) + modifier_val = int(modifier) if modifier is not None else 0 + self.score_modifier = modifier_val + self.total_score = max(0, base_score + modifier_val) + + if self.total_score >= 6: + self.risk_level = RiskLevel.HIGH_INTEREST + self.recommended_action = 'investigate' + elif self.total_score >= 3: + self.risk_level = RiskLevel.NEEDS_REVIEW + self.recommended_action = 'review' + else: + self.risk_level = RiskLevel.INFORMATIONAL + self.recommended_action = 'monitor' + + indicator_count = len(self.indicators) + self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05)) def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" @@ -254,15 +254,15 @@ class DeviceProfile: } for i in self.indicators ], - 'total_score': self.total_score, - 'score_modifier': self.score_modifier, - 'risk_level': self.risk_level.value, - 'confidence': round(self.confidence, 2), - 'recommended_action': self.recommended_action, - 'correlated_devices': self.correlated_devices, - 'known_device': self.known_device, - 'known_device_name': self.known_device_name, - } + 'total_score': self.total_score, + 'score_modifier': self.score_modifier, + 'risk_level': self.risk_level.value, + 'confidence': round(self.confidence, 2), + 'recommended_action': self.recommended_action, + 'correlated_devices': self.correlated_devices, + 'known_device': self.known_device, + 'known_device_name': self.known_device_name, + } # Known audio-capable BLE service UUIDs @@ -308,11 +308,11 @@ class CorrelationEngine: potential surveillance activity patterns. """ - def __init__(self): - self.device_profiles: dict[str, DeviceProfile] = {} - self.meeting_windows: list[tuple[datetime, datetime]] = [] - self.correlation_window = timedelta(minutes=5) - self._known_device_cache: dict[str, dict | None] = {} + def __init__(self): + self.device_profiles: dict[str, DeviceProfile] = {} + self.meeting_windows: list[tuple[datetime, datetime]] = [] + self.correlation_window = timedelta(minutes=5) + self._known_device_cache: dict[str, dict | None] = {} def start_meeting_window(self) -> None: """Mark the start of a sensitive period (meeting).""" @@ -326,64 +326,64 @@ class CorrelationEngine: self.meeting_windows[-1] = (start, datetime.now()) logger.info("Meeting window ended") - def is_during_meeting(self, timestamp: datetime = None) -> bool: - """Check if timestamp falls within a meeting window.""" - ts = timestamp or datetime.now() - for start, end in self.meeting_windows: - if end is None: - if ts >= start: - return True - elif start <= ts <= end: - return True - return False - - def _lookup_known_device(self, identifier: str, protocol: str) -> dict | None: - """Lookup known-good device details with light normalization.""" - cache_key = f"{protocol}:{identifier}" - if cache_key in self._known_device_cache: - return self._known_device_cache[cache_key] - - try: - from utils.database import is_known_good_device - - candidates = [] - if identifier: - candidates.append(str(identifier)) - - if protocol == 'rf': - try: - freq_val = float(identifier) - candidates.append(f"{freq_val:.3f}") - candidates.append(f"{freq_val:.1f}") - except (ValueError, TypeError): - pass - - known = None - for cand in candidates: - if not cand: - continue - known = is_known_good_device(str(cand).upper()) - if known: - break - except Exception: - known = None - - self._known_device_cache[cache_key] = known - return known - - def _apply_known_device_modifier(self, profile: DeviceProfile, identifier: str, protocol: str) -> None: - """Apply known-good score modifier and update profile metadata.""" - known = self._lookup_known_device(identifier, protocol) - if known: - profile.known_device = True - profile.known_device_name = known.get('name') if isinstance(known, dict) else None - modifier = known.get('score_modifier', 0) if isinstance(known, dict) else 0 - else: - profile.known_device = False - profile.known_device_name = None - modifier = 0 - - profile.apply_score_modifier(modifier) + def is_during_meeting(self, timestamp: datetime = None) -> bool: + """Check if timestamp falls within a meeting window.""" + ts = timestamp or datetime.now() + for start, end in self.meeting_windows: + if end is None: + if ts >= start: + return True + elif start <= ts <= end: + return True + return False + + def _lookup_known_device(self, identifier: str, protocol: str) -> dict | None: + """Lookup known-good device details with light normalization.""" + cache_key = f"{protocol}:{identifier}" + if cache_key in self._known_device_cache: + return self._known_device_cache[cache_key] + + try: + from utils.database import is_known_good_device + + candidates = [] + if identifier: + candidates.append(str(identifier)) + + if protocol == 'rf': + try: + freq_val = float(identifier) + candidates.append(f"{freq_val:.3f}") + candidates.append(f"{freq_val:.1f}") + except (ValueError, TypeError): + pass + + known = None + for cand in candidates: + if not cand: + continue + known = is_known_good_device(str(cand).upper()) + if known: + break + except Exception: + known = None + + self._known_device_cache[cache_key] = known + return known + + def _apply_known_device_modifier(self, profile: DeviceProfile, identifier: str, protocol: str) -> None: + """Apply known-good score modifier and update profile metadata.""" + known = self._lookup_known_device(identifier, protocol) + if known: + profile.known_device = True + profile.known_device_name = known.get('name') if isinstance(known, dict) else None + modifier = known.get('score_modifier', 0) if isinstance(known, dict) else 0 + else: + profile.known_device = False + profile.known_device_name = None + modifier = 0 + + profile.apply_score_modifier(modifier) def get_or_create_profile(self, identifier: str, protocol: str) -> DeviceProfile: """Get existing profile or create new one.""" @@ -634,33 +634,33 @@ class CorrelationEngine: ) # Also check name for tracker keywords - if profile.name: - name_lower = profile.name.lower() - if 'airtag' in name_lower or 'findmy' in name_lower: - profile.add_indicator( - IndicatorType.AIRTAG_DETECTED, - f'AirTag identified by name: {profile.name}', - {'name': profile.name} - ) - profile.device_type = 'AirTag' - elif 'tile' in name_lower: - profile.add_indicator( - IndicatorType.TILE_DETECTED, - f'Tile tracker identified by name: {profile.name}', - {'name': profile.name} - ) - profile.device_type = 'Tile Tracker' - elif 'smarttag' in name_lower: - profile.add_indicator( - IndicatorType.SMARTTAG_DETECTED, - f'SmartTag identified by name: {profile.name}', - {'name': profile.name} - ) - profile.device_type = 'Samsung SmartTag' - - self._apply_known_device_modifier(profile, mac, 'bluetooth') - - return profile + if profile.name: + name_lower = profile.name.lower() + if 'airtag' in name_lower or 'findmy' in name_lower: + profile.add_indicator( + IndicatorType.AIRTAG_DETECTED, + f'AirTag identified by name: {profile.name}', + {'name': profile.name} + ) + profile.device_type = 'AirTag' + elif 'tile' in name_lower: + profile.add_indicator( + IndicatorType.TILE_DETECTED, + f'Tile tracker identified by name: {profile.name}', + {'name': profile.name} + ) + profile.device_type = 'Tile Tracker' + elif 'smarttag' in name_lower: + profile.add_indicator( + IndicatorType.SMARTTAG_DETECTED, + f'SmartTag identified by name: {profile.name}', + {'name': profile.name} + ) + profile.device_type = 'Samsung SmartTag' + + self._apply_known_device_modifier(profile, mac, 'bluetooth') + + return profile def analyze_wifi_device(self, device: dict) -> DeviceProfile: """ @@ -763,18 +763,18 @@ class CorrelationEngine: ) # 8. Strong hidden AP (very suspicious) - if profile.is_hidden and profile.rssi_samples: - latest_rssi = profile.rssi_samples[-1][1] - if latest_rssi > -50: - profile.add_indicator( - IndicatorType.ROGUE_AP, - f'Strong hidden AP (RSSI: {latest_rssi} dBm)', - {'rssi': latest_rssi} - ) - - self._apply_known_device_modifier(profile, bssid, 'wifi') - - return profile + if profile.is_hidden and profile.rssi_samples: + latest_rssi = profile.rssi_samples[-1][1] + if latest_rssi > -50: + profile.add_indicator( + IndicatorType.ROGUE_AP, + f'Strong hidden AP (RSSI: {latest_rssi} dBm)', + {'rssi': latest_rssi} + ) + + self._apply_known_device_modifier(profile, bssid, 'wifi') + + return profile def analyze_rf_signal(self, signal: dict) -> DeviceProfile: """ @@ -857,16 +857,16 @@ class CorrelationEngine: ) # 5. Meeting correlation - if self.is_during_meeting(): - profile.add_indicator( - IndicatorType.MEETING_CORRELATED, - 'Signal detected during sensitive period', - {'during_meeting': True} - ) - - self._apply_known_device_modifier(profile, freq_key, 'rf') - - return profile + if self.is_during_meeting(): + profile.add_indicator( + IndicatorType.MEETING_CORRELATED, + 'Signal detected during sensitive period', + {'during_meeting': True} + ) + + self._apply_known_device_modifier(profile, freq_key, 'rf') + + return profile def correlate_devices(self) -> list[dict]: """ @@ -953,26 +953,26 @@ class CorrelationEngine: {'correlated_device': ap.identifier} ) - # Correlation 3: Same vendor BLE + WiFi - for bt in bt_devices: - if bt.manufacturer: - for wifi in wifi_devices: - if wifi.manufacturer and bt.manufacturer.lower() in wifi.manufacturer.lower(): - correlation = { + # Correlation 3: Same vendor BLE + WiFi + for bt in bt_devices: + if bt.manufacturer: + for wifi in wifi_devices: + if wifi.manufacturer and bt.manufacturer.lower() in wifi.manufacturer.lower(): + correlation = { 'type': 'same_vendor_bt_wifi', 'description': f'Same vendor ({bt.manufacturer}) on BLE and WiFi', 'devices': [bt.identifier, wifi.identifier], 'protocols': ['bluetooth', 'wifi'], 'score_boost': 2, 'significance': 'medium', - } - correlations.append(correlation) - - # Re-apply known-good modifiers after correlation boosts - for profile in self.device_profiles.values(): - self._apply_known_device_modifier(profile, profile.identifier, profile.protocol) - - return correlations + } + correlations.append(correlation) + + # Re-apply known-good modifiers after correlation boosts + for profile in self.device_profiles.values(): + self._apply_known_device_modifier(profile, profile.identifier, profile.protocol) + + return correlations def get_high_interest_devices(self) -> list[DeviceProfile]: """Get all devices classified as high interest."""