v2.26.0: fix SSE fanout crash and branded logo FOUC

- Fix SSE fanout thread AttributeError when source queue is None during
  interpreter shutdown by snapshotting to local variable with null guard
- Fix branded "i" logo rendering oversized on first page load (FOUC) by
  adding inline width/height to SVG elements across 10 templates
- Bump version to 2.26.0 in config.py, pyproject.toml, and CHANGELOG.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-03-13 11:51:27 +00:00
parent 00362bcd57
commit e00fbfddc1
183 changed files with 2006 additions and 4243 deletions
+36 -37
View File
@@ -24,7 +24,7 @@ import subprocess
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Optional
from typing import Any
logger = logging.getLogger('intercept.tscm.advanced')
@@ -893,10 +893,10 @@ def _calculate_baseline_health(diff: BaselineDiff, baseline: dict) -> None:
class DeviceObservation:
"""A single observation of a device."""
timestamp: datetime
rssi: Optional[int] = None
rssi: int | None = None
present: bool = True
channel: Optional[int] = None
frequency: Optional[float] = None
channel: int | None = None
frequency: float | None = None
attributes: dict = field(default_factory=dict)
@@ -910,21 +910,21 @@ class DeviceTimeline:
"""
identifier: str
protocol: str
name: Optional[str] = None
name: str | None = None
# Observation history (time-bucketed)
observations: list[DeviceObservation] = field(default_factory=list)
# Computed metrics
first_seen: Optional[datetime] = None
last_seen: Optional[datetime] = None
first_seen: datetime | None = None
last_seen: datetime | None = None
total_observations: int = 0
presence_ratio: float = 0.0 # % of time device was present
# Signal metrics
rssi_min: Optional[int] = None
rssi_max: Optional[int] = None
rssi_mean: Optional[float] = None
rssi_min: int | None = None
rssi_max: int | None = None
rssi_mean: float | None = None
rssi_stability: float = 0.0 # 0-1, higher = more stable
# Movement assessment
@@ -991,17 +991,17 @@ class TimelineManager:
self.bucket_seconds = bucket_seconds
self.max_observations = max_observations
self.timelines: dict[str, DeviceTimeline] = {}
self._meeting_windows: list[tuple[datetime, Optional[datetime]]] = []
self._meeting_windows: list[tuple[datetime, datetime | None]] = []
def add_observation(
self,
identifier: str,
protocol: str,
rssi: Optional[int] = None,
channel: Optional[int] = None,
frequency: Optional[float] = None,
name: Optional[str] = None,
attributes: Optional[dict] = None
rssi: int | None = None,
channel: int | None = None,
frequency: float | None = None,
name: str | None = None,
attributes: dict | None = None
) -> None:
"""Add an observation for a device."""
key = f"{protocol}:{identifier.upper()}"
@@ -1080,7 +1080,7 @@ class TimelineManager:
return True
return False
def compute_metrics(self, identifier: str, protocol: str) -> Optional[DeviceTimeline]:
def compute_metrics(self, identifier: str, protocol: str) -> DeviceTimeline | None:
"""Compute all metrics for a device timeline."""
key = f"{protocol}:{identifier.upper()}"
if key not in self.timelines:
@@ -1125,7 +1125,7 @@ class TimelineManager:
return timeline
def get_timeline(self, identifier: str, protocol: str) -> Optional[DeviceTimeline]:
def get_timeline(self, identifier: str, protocol: str) -> DeviceTimeline | None:
"""Get computed timeline for a device."""
return self.compute_metrics(identifier, protocol)
@@ -1150,9 +1150,9 @@ class MeetingWindowSummary:
and applies meeting-window scoring modifiers.
"""
meeting_id: int
name: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
name: str | None = None
start_time: datetime | None = None
end_time: datetime | None = None
duration_minutes: float = 0.0
# Devices first seen during meeting (high interest)
@@ -1431,7 +1431,7 @@ class WiFiAdvancedDetector:
self.indicators.extend(indicators)
return indicators
def add_probe_request(self, frame: dict) -> Optional[WiFiAdvancedIndicator]:
def add_probe_request(self, frame: dict) -> WiFiAdvancedIndicator | None:
"""
Record a probe request frame (requires monitor mode).
@@ -1475,7 +1475,7 @@ class WiFiAdvancedDetector:
details={
'ssid': ssid,
'probe_count': len(recent_probes),
'source_macs': list(set(p['src_mac'] for p in recent_probes)),
'source_macs': list({p['src_mac'] for p in recent_probes}),
'pattern': 'Multiple probe requests for potentially sensitive network',
},
requires_monitor_mode=True,
@@ -1485,7 +1485,7 @@ class WiFiAdvancedDetector:
return None
def add_deauth_frame(self, frame: dict) -> Optional[WiFiAdvancedIndicator]:
def add_deauth_frame(self, frame: dict) -> WiFiAdvancedIndicator | None:
"""
Record a deauthentication frame (requires monitor mode).
@@ -1523,7 +1523,7 @@ class WiFiAdvancedDetector:
'deauth_count': len(recent_deauths),
'time_window_seconds': 10,
'targeted_bssid': bssid if targeting_bssid else None,
'unique_sources': len(set(d['src_mac'] for d in recent_deauths)),
'unique_sources': len({d['src_mac'] for d in recent_deauths}),
'pattern': 'Abnormal deauthentication frame volume',
},
requires_monitor_mode=True,
@@ -1574,7 +1574,7 @@ class BLERiskExplanation:
and recommended actions.
"""
identifier: str
name: Optional[str] = None
name: str | None = None
# Risk assessment
risk_level: str = 'informational'
@@ -1588,7 +1588,7 @@ class BLERiskExplanation:
# Tracker detection
is_tracker: bool = False
tracker_type: Optional[str] = None
tracker_type: str | None = None
tracker_explanation: str = ''
# Meeting correlation
@@ -1686,7 +1686,7 @@ def estimate_ble_proximity(rssi: int) -> tuple[BLEProximity, str, str]:
def generate_ble_risk_explanation(
device: dict,
profile: Optional[dict] = None,
profile: dict | None = None,
is_during_meeting: bool = False
) -> BLERiskExplanation:
"""
@@ -1722,7 +1722,7 @@ def generate_ble_risk_explanation(
explanation.proximity_explanation = "Could not parse RSSI value"
# Tracker detection with explanation
tracker_info = device.get('tracker_type') or device.get('is_tracker')
device.get('tracker_type') or device.get('is_tracker')
if device.get('is_airtag'):
explanation.is_tracker = True
explanation.tracker_type = 'Apple AirTag'
@@ -1902,7 +1902,7 @@ class PlaybookStep:
step_number: int
action: str
details: str
safety_note: Optional[str] = None
safety_note: str | None = None
@dataclass
@@ -2145,8 +2145,8 @@ PLAYBOOKS = {
def get_playbook_for_finding(
risk_level: str,
finding_type: Optional[str] = None,
indicators: Optional[list[dict]] = None
finding_type: str | None = None,
indicators: list[dict] | None = None
) -> OperatorPlaybook:
"""
Get appropriate playbook for a finding.
@@ -2166,9 +2166,8 @@ def get_playbook_for_finding(
# Check indicators for tracker
if indicators:
tracker_types = ['airtag_detected', 'tile_detected', 'smarttag_detected', 'known_tracker']
if any(i.get('type') in tracker_types for i in indicators):
if risk_level == 'high_interest':
return PLAYBOOKS['high_interest_tracker']
if any(i.get('type') in tracker_types for i in indicators) and risk_level == 'high_interest':
return PLAYBOOKS['high_interest_tracker']
# Return based on risk level
if risk_level == 'high_interest':
@@ -2207,8 +2206,8 @@ def attach_playbook_to_finding(finding: dict) -> dict:
# Global Instance Management
# =============================================================================
_timeline_manager: Optional[TimelineManager] = None
_wifi_detector: Optional[WiFiAdvancedDetector] = None
_timeline_manager: TimelineManager | None = None
_wifi_detector: WiFiAdvancedDetector | None = None
def get_timeline_manager() -> TimelineManager:
-3
View File
@@ -9,12 +9,10 @@ from __future__ import annotations
import logging
from datetime import datetime
from typing import Any
from utils.database import (
create_tscm_baseline,
get_active_tscm_baseline,
get_tscm_baseline,
update_tscm_baseline,
)
@@ -107,7 +105,6 @@ class BaselineRecorder:
f"{summary['bt_count']} BT, {summary['rf_count']} RF"
)
baseline_id = self.current_baseline_id
self.current_baseline_id = None
return summary
+2 -4
View File
@@ -180,9 +180,7 @@ class BLEScanner:
ble_device.manufacturer_name = COMPANY_IDS.get(company_id, f'Unknown ({hex(company_id)})')
# Handle various data types safely
try:
if isinstance(data, (bytes, bytearray)):
ble_device.manufacturer_data = bytes(data)
elif isinstance(data, (list, tuple)):
if isinstance(data, (bytes, bytearray, list, tuple)):
ble_device.manufacturer_data = bytes(data)
elif isinstance(data, str):
ble_device.manufacturer_data = bytes.fromhex(data)
@@ -237,7 +235,7 @@ class BLEScanner:
try:
# Try to get existing event loop
try:
loop = asyncio.get_running_loop()
asyncio.get_running_loop()
# We're in an async context, can't use run()
future = asyncio.ensure_future(self.scan_async(duration))
return asyncio.get_event_loop().run_until_complete(future)
+23 -29
View File
@@ -10,11 +10,11 @@ Findings indicate anomalies and indicators, not confirmed surveillance devices.
from __future__ import annotations
import contextlib
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
logger = logging.getLogger('intercept.tscm.correlation')
@@ -119,36 +119,36 @@ class DeviceProfile:
protocol: str # 'bluetooth', 'wifi', 'rf'
# Device info
name: Optional[str] = None
manufacturer: Optional[str] = None
device_type: Optional[str] = None
tracker_type: Optional[str] = None
tracker_name: Optional[str] = None
tracker_confidence: Optional[str] = None
tracker_confidence_score: Optional[float] = None
name: str | None = None
manufacturer: str | None = None
device_type: str | None = None
tracker_type: str | None = None
tracker_name: str | None = None
tracker_confidence: str | None = None
tracker_confidence_score: float | None = None
tracker_evidence: list[str] = field(default_factory=list)
# Bluetooth-specific
services: list[str] = field(default_factory=list)
company_id: Optional[int] = None
advertising_interval: Optional[int] = None
company_id: int | None = None
advertising_interval: int | None = None
# Wi-Fi-specific
ssid: Optional[str] = None
channel: Optional[int] = None
encryption: Optional[str] = None
beacon_interval: Optional[int] = None
ssid: str | None = None
channel: int | None = None
encryption: str | None = None
beacon_interval: int | None = None
is_hidden: bool = False
# RF-specific
frequency: Optional[float] = None
bandwidth: Optional[float] = None
modulation: Optional[str] = None
frequency: float | None = None
bandwidth: float | None = None
modulation: str | None = None
# Common measurements
rssi_samples: list[tuple[datetime, int]] = field(default_factory=list)
first_seen: Optional[datetime] = None
last_seen: Optional[datetime] = None
first_seen: datetime | None = None
last_seen: datetime | None = None
detection_count: int = 0
# Behavioral analysis
@@ -163,7 +163,7 @@ class DeviceProfile:
confidence: float = 0.0
recommended_action: str = 'monitor'
known_device: bool = False
known_device_name: Optional[str] = None
known_device_name: str | None = None
score_modifier: int = 0
def add_rssi_sample(self, rssi: int) -> None:
@@ -466,10 +466,8 @@ class CorrelationEngine:
# Add RSSI sample
rssi = device.get('rssi', device.get('signal'))
if rssi:
try:
with contextlib.suppress(ValueError, TypeError):
profile.add_rssi_sample(int(rssi))
except (ValueError, TypeError):
pass
# Clear previous indicators for fresh analysis
profile.indicators = []
@@ -789,10 +787,8 @@ class CorrelationEngine:
# Add RSSI sample
rssi = device.get('rssi', device.get('power', device.get('signal')))
if rssi:
try:
with contextlib.suppress(ValueError, TypeError):
profile.add_rssi_sample(int(rssi))
except (ValueError, TypeError):
pass
# Clear previous indicators
profile.indicators = []
@@ -937,10 +933,8 @@ class CorrelationEngine:
# Add power sample
power = signal.get('power', signal.get('level'))
if power:
try:
with contextlib.suppress(ValueError, TypeError):
profile.add_rssi_sample(int(float(power)))
except (ValueError, TypeError):
pass
# Clear previous indicators
profile.indicators = []
+1 -7
View File
@@ -9,21 +9,15 @@ from __future__ import annotations
import logging
from datetime import datetime
from typing import Any
from data.tscm_frequencies import (
BLE_TRACKER_SIGNATURES,
THREAT_TYPES,
WIFI_CAMERA_PATTERNS,
get_frequency_risk,
get_threat_severity,
is_known_tracker,
is_potential_camera,
)
from utils.tscm.signal_classification import (
classify_signal_strength,
get_signal_strength_info,
SignalStrength,
)
logger = logging.getLogger('intercept.tscm.detector')
@@ -337,7 +331,7 @@ class ThreatDetector:
"""
frequency = signal.get('frequency', 0)
power = signal.get('power', signal.get('level', -100))
band = signal.get('band', '')
signal.get('band', '')
reasons = []
classification = 'informational'
+48 -52
View File
@@ -26,13 +26,11 @@ from __future__ import annotations
import hashlib
import logging
import math
import statistics
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
logger = logging.getLogger('intercept.tscm.device_identity')
@@ -119,18 +117,18 @@ class BLEObservation:
timestamp: datetime
addr: str # MAC-like address
addr_type: AddressType = AddressType.UNKNOWN
rssi: Optional[int] = None
tx_power: Optional[int] = None
rssi: int | None = None
tx_power: int | None = None
adv_type: AdvType = AdvType.UNKNOWN
adv_flags: Optional[int] = None
manufacturer_id: Optional[int] = None
manufacturer_data: Optional[bytes] = None
adv_flags: int | None = None
manufacturer_id: int | None = None
manufacturer_data: bytes | None = None
service_uuids: list[str] = field(default_factory=list)
service_data: Optional[bytes] = None
local_name: Optional[str] = None
appearance: Optional[int] = None
packet_length: Optional[int] = None
phy: Optional[str] = None
service_data: bytes | None = None
local_name: str | None = None
appearance: int | None = None
packet_length: int | None = None
phy: str | None = None
def __post_init__(self):
if isinstance(self.addr_type, str):
@@ -202,26 +200,26 @@ class WifiObservation:
"""Single WiFi frame observation."""
timestamp: datetime
src_mac: str
dst_mac: Optional[str] = None
bssid: Optional[str] = None
ssid: Optional[str] = None
dst_mac: str | None = None
bssid: str | None = None
ssid: str | None = None
frame_type: WifiFrameType = WifiFrameType.UNKNOWN
rssi: Optional[int] = None
channel: Optional[int] = None
bandwidth: Optional[int] = None # 20/40/80/160
encryption: Optional[str] = None
beacon_interval: Optional[int] = None
capabilities: Optional[int] = None
rssi: int | None = None
channel: int | None = None
bandwidth: int | None = None # 20/40/80/160
encryption: str | None = None
beacon_interval: int | None = None
capabilities: int | None = None
supported_rates: list[float] = field(default_factory=list)
extended_rates: list[float] = field(default_factory=list)
ht_capable: bool = False
vht_capable: bool = False
he_capable: bool = False
ht_capabilities: Optional[int] = None
vht_capabilities: Optional[int] = None
ht_capabilities: int | None = None
vht_capabilities: int | None = None
vendor_ies: list[tuple[str, int]] = field(default_factory=list) # (OUI, length)
wps_present: bool = False
sequence_number: Optional[int] = None
sequence_number: int | None = None
probed_ssids: list[str] = field(default_factory=list)
def __post_init__(self):
@@ -263,7 +261,7 @@ class WifiObservation:
# Vendor IE fingerprint (OUIs only, not content)
if self.vendor_ies:
ouis = sorted(set(oui for oui, _ in self.vendor_ies))
ouis = sorted({oui for oui, _ in self.vendor_ies})
components.append(f"vie:{','.join(ouis)}")
if self.capabilities is not None:
@@ -301,7 +299,7 @@ class DeviceSession:
first_seen: datetime
last_seen: datetime
observations: list = field(default_factory=list)
primary_mac: Optional[str] = None
primary_mac: str | None = None
observed_macs: set[str] = field(default_factory=set)
fingerprint_hashes: set[str] = field(default_factory=set)
@@ -341,7 +339,7 @@ class DeviceSession:
"""Get session duration."""
return self.last_seen - self.first_seen
def get_mean_rssi(self) -> Optional[float]:
def get_mean_rssi(self) -> float | None:
"""Get mean RSSI across session."""
if not self.rssi_samples:
return None
@@ -362,7 +360,7 @@ class DeviceSession:
except statistics.StatisticsError:
return 0.0
def get_mean_interval(self) -> Optional[float]:
def get_mean_interval(self) -> float | None:
"""Get mean advertising/probing interval."""
if not self.observation_intervals:
return None
@@ -427,10 +425,10 @@ class DeviceCluster:
link_evidence: list[dict] = field(default_factory=list)
# Best available identifiers
best_name: Optional[str] = None
manufacturer_id: Optional[int] = None
manufacturer_name: Optional[str] = None
device_type: Optional[str] = None
best_name: str | None = None
manufacturer_id: int | None = None
manufacturer_name: str | None = None
device_type: str | None = None
# TSCM risk assessment
risk_level: RiskLevel = RiskLevel.INFORMATIONAL
@@ -439,8 +437,8 @@ class DeviceCluster:
# Behavioral profile
total_observations: int = 0
first_seen: Optional[datetime] = None
last_seen: Optional[datetime] = None
first_seen: datetime | None = None
last_seen: datetime | None = None
presence_ratio: float = 0.0 # % of monitoring period device was present
def add_session(self, session: DeviceSession, link_reason: str,
@@ -532,8 +530,8 @@ def jaccard_similarity(set1: set, set2: set) -> float:
return intersection / union if union > 0 else 0.0
def manufacturer_data_similarity(data1: Optional[bytes],
data2: Optional[bytes]) -> float:
def manufacturer_data_similarity(data1: bytes | None,
data2: bytes | None) -> float:
"""
Calculate similarity between manufacturer data blobs.
@@ -626,7 +624,7 @@ def timing_pattern_similarity(intervals1: list[float],
return 0.7 * ratio + 0.3 * max(0, cv_sim)
def name_similarity(name1: Optional[str], name2: Optional[str]) -> float:
def name_similarity(name1: str | None, name2: str | None) -> float:
"""Calculate similarity between device names."""
if not name1 or not name2:
return 0.0
@@ -673,8 +671,8 @@ class DeviceIdentityEngine:
self._cluster_counter = 0
# Monitoring period for presence calculation
self.monitoring_start: Optional[datetime] = None
self.monitoring_end: Optional[datetime] = None
self.monitoring_start: datetime | None = None
self.monitoring_end: datetime | None = None
def _generate_session_id(self, protocol: str) -> str:
"""Generate unique session ID."""
@@ -714,9 +712,8 @@ class DeviceIdentityEngine:
# Update fingerprint index
fp = obs.compute_fingerprint_hash()
if fp:
if session.session_id not in self._fingerprint_to_sessions[fp]:
self._fingerprint_to_sessions[fp].append(session.session_id)
if fp and session.session_id not in self._fingerprint_to_sessions[fp]:
self._fingerprint_to_sessions[fp].append(session.session_id)
return session
@@ -757,9 +754,8 @@ class DeviceIdentityEngine:
# Update fingerprint index
fp = obs.compute_fingerprint_hash()
if fp:
if session.session_id not in self._fingerprint_to_sessions[fp]:
self._fingerprint_to_sessions[fp].append(session.session_id)
if fp and session.session_id not in self._fingerprint_to_sessions[fp]:
self._fingerprint_to_sessions[fp].append(session.session_id)
return session
@@ -784,7 +780,7 @@ class DeviceIdentityEngine:
similarity = self._calculate_cluster_similarity(cluster, session)
cluster.add_session(
session,
link_reason=f"Fingerprint/behavioral match",
link_reason="Fingerprint/behavioral match",
link_confidence=similarity
)
else:
@@ -795,7 +791,7 @@ class DeviceIdentityEngine:
# Run risk assessment on the cluster
self._assess_cluster_risk(cluster)
def _find_matching_cluster(self, session: DeviceSession) -> Optional[DeviceCluster]:
def _find_matching_cluster(self, session: DeviceSession) -> DeviceCluster | None:
"""
Find an existing cluster that matches this session.
@@ -884,7 +880,7 @@ class DeviceIdentityEngine:
return weighted_sum / total_weight if total_weight > 0 else 0.0
def _get_cluster_manufacturer_data(self, cluster: DeviceCluster) -> Optional[bytes]:
def _get_cluster_manufacturer_data(self, cluster: DeviceCluster) -> bytes | None:
"""Get representative manufacturer data from cluster."""
for session in cluster.sessions:
for obs in session.observations:
@@ -892,7 +888,7 @@ class DeviceIdentityEngine:
return obs.manufacturer_data
return None
def _get_session_manufacturer_data(self, session: DeviceSession) -> Optional[bytes]:
def _get_session_manufacturer_data(self, session: DeviceSession) -> bytes | None:
"""Get manufacturer data from session."""
for obs in session.observations:
if hasattr(obs, 'manufacturer_data') and obs.manufacturer_data:
@@ -923,7 +919,7 @@ class DeviceIdentityEngine:
intervals.extend(session.observation_intervals)
return intervals
def _get_session_name(self, session: DeviceSession) -> Optional[str]:
def _get_session_name(self, session: DeviceSession) -> str | None:
"""Get device name from session."""
for obs in session.observations:
if hasattr(obs, 'local_name') and obs.local_name:
@@ -1140,7 +1136,7 @@ class DeviceIdentityEngine:
# =============================================================================
# Global engine instance
_identity_engine: Optional[DeviceIdentityEngine] = None
_identity_engine: DeviceIdentityEngine | None = None
def get_identity_engine() -> DeviceIdentityEngine:
@@ -1157,7 +1153,7 @@ def reset_identity_engine() -> None:
_identity_engine = DeviceIdentityEngine()
def _convert_to_bytes(value) -> Optional[bytes]:
def _convert_to_bytes(value) -> bytes | None:
"""Convert various data types to bytes safely."""
if value is None:
return None
+35 -44
View File
@@ -13,21 +13,14 @@ from __future__ import annotations
import csv
import io
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
from utils.tscm.signal_classification import (
SignalStrength,
ConfidenceLevel,
assess_signal,
classify_signal_strength,
describe_signal_for_report,
format_signal_for_dashboard,
generate_hedged_statement,
SIGNAL_ANALYSIS_DISCLAIMER,
assess_signal,
generate_hedged_statement,
)
logger = logging.getLogger('intercept.tscm.reports')
@@ -41,7 +34,7 @@ class ReportFinding:
"""A single finding for the report."""
identifier: str
protocol: str
name: Optional[str]
name: str | None
risk_level: str
risk_score: int
description: str
@@ -49,18 +42,18 @@ class ReportFinding:
recommended_action: str = ''
playbook_reference: str = ''
# Signal classification data
signal_strength: Optional[str] = None # minimal, weak, moderate, strong, very_strong
signal_confidence: Optional[str] = None # low, medium, high
signal_interpretation: Optional[str] = None
signal_strength: str | None = None # minimal, weak, moderate, strong, very_strong
signal_confidence: str | None = None # low, medium, high
signal_interpretation: str | None = None
signal_caveats: list[str] = field(default_factory=list)
@dataclass
class ReportMeetingSummary:
"""Meeting window summary for report."""
name: Optional[str]
name: str | None
start_time: str
end_time: Optional[str]
end_time: str | None
duration_minutes: float
devices_first_seen: int
behavior_changes: int
@@ -81,9 +74,9 @@ class TSCMReport:
sweep_type: str
# Location and context
location: Optional[str] = None
baseline_id: Optional[int] = None
baseline_name: Optional[str] = None
location: str | None = None
baseline_id: int | None = None
baseline_name: str | None = None
# Executive summary
executive_summary: str = ''
@@ -112,14 +105,14 @@ class TSCMReport:
missing_devices: int = 0
# Sweep duration
sweep_start: Optional[datetime] = None
sweep_end: Optional[datetime] = None
sweep_start: datetime | None = None
sweep_end: datetime | None = None
duration_minutes: float = 0.0
# Technical data (for annex only)
device_timelines: list[dict] = field(default_factory=list)
all_indicators: list[dict] = field(default_factory=list)
baseline_diff: Optional[dict] = None
baseline_diff: dict | None = None
correlation_data: list[dict] = field(default_factory=list)
@@ -613,15 +606,15 @@ class TSCMReportBuilder:
sweep_type='standard',
)
def set_sweep_type(self, sweep_type: str) -> 'TSCMReportBuilder':
def set_sweep_type(self, sweep_type: str) -> TSCMReportBuilder:
self.report.sweep_type = sweep_type
return self
def set_location(self, location: str) -> 'TSCMReportBuilder':
def set_location(self, location: str) -> TSCMReportBuilder:
self.report.location = location
return self
def set_baseline(self, baseline_id: int, baseline_name: str) -> 'TSCMReportBuilder':
def set_baseline(self, baseline_id: int, baseline_name: str) -> TSCMReportBuilder:
self.report.baseline_id = baseline_id
self.report.baseline_name = baseline_name
return self
@@ -629,8 +622,8 @@ class TSCMReportBuilder:
def set_sweep_times(
self,
start: datetime,
end: Optional[datetime] = None
) -> 'TSCMReportBuilder':
end: datetime | None = None
) -> TSCMReportBuilder:
self.report.sweep_start = start
self.report.sweep_end = end or datetime.now()
self.report.duration_minutes = (
@@ -638,12 +631,12 @@ class TSCMReportBuilder:
)
return self
def add_capabilities(self, capabilities: dict) -> 'TSCMReportBuilder':
def add_capabilities(self, capabilities: dict) -> TSCMReportBuilder:
self.report.capabilities = capabilities
self.report.limitations = capabilities.get('all_limitations', [])
return self
def add_finding(self, finding: ReportFinding) -> 'TSCMReportBuilder':
def add_finding(self, finding: ReportFinding) -> TSCMReportBuilder:
if finding.risk_level == 'high_interest':
self.report.high_interest_findings.append(finding)
elif finding.risk_level in ['review', 'needs_review']:
@@ -652,7 +645,7 @@ class TSCMReportBuilder:
self.report.informational_findings.append(finding)
return self
def add_findings_from_profiles(self, profiles: list[dict]) -> 'TSCMReportBuilder':
def add_findings_from_profiles(self, profiles: list[dict]) -> TSCMReportBuilder:
"""Add findings from correlation engine device profiles."""
for profile in profiles:
# Get signal classification data
@@ -759,9 +752,8 @@ class TSCMReportBuilder:
# Check for tracker
tracker_types = ['airtag_detected', 'tile_detected', 'smarttag_detected', 'known_tracker']
if any(i.get('type') in tracker_types for i in indicators):
if risk_level == 'high_interest':
return 'PB-001 (Tracker Detection)'
if any(i.get('type') in tracker_types for i in indicators) and risk_level == 'high_interest':
return 'PB-001 (Tracker Detection)'
if risk_level == 'high_interest':
return 'PB-002 (Suspicious Device)'
@@ -770,7 +762,7 @@ class TSCMReportBuilder:
return ''
def add_meeting_summary(self, summary: dict) -> 'TSCMReportBuilder':
def add_meeting_summary(self, summary: dict) -> TSCMReportBuilder:
"""Add meeting window summary."""
meeting = ReportMeetingSummary(
name=summary.get('name'),
@@ -792,7 +784,7 @@ class TSCMReportBuilder:
rf: int = 0,
new: int = 0,
missing: int = 0
) -> 'TSCMReportBuilder':
) -> TSCMReportBuilder:
self.report.wifi_devices = wifi
self.report.wifi_clients = wifi_clients
self.report.bluetooth_devices = bluetooth
@@ -802,19 +794,19 @@ class TSCMReportBuilder:
self.report.missing_devices = missing
return self
def add_device_timelines(self, timelines: list[dict]) -> 'TSCMReportBuilder':
def add_device_timelines(self, timelines: list[dict]) -> TSCMReportBuilder:
self.report.device_timelines = timelines
return self
def add_all_indicators(self, indicators: list[dict]) -> 'TSCMReportBuilder':
def add_all_indicators(self, indicators: list[dict]) -> TSCMReportBuilder:
self.report.all_indicators = indicators
return self
def add_baseline_diff(self, diff: dict) -> 'TSCMReportBuilder':
def add_baseline_diff(self, diff: dict) -> TSCMReportBuilder:
self.report.baseline_diff = diff
return self
def add_correlations(self, correlations: list[dict]) -> 'TSCMReportBuilder':
def add_correlations(self, correlations: list[dict]) -> TSCMReportBuilder:
self.report.correlation_data = correlations
return self
@@ -852,9 +844,9 @@ def generate_report(
device_profiles: list[dict],
capabilities: dict,
timelines: list[dict],
baseline_diff: Optional[dict] = None,
meeting_summaries: Optional[list[dict]] = None,
correlations: Optional[list[dict]] = None,
baseline_diff: dict | None = None,
meeting_summaries: list[dict] | None = None,
correlations: list[dict] | None = None,
) -> TSCMReport:
"""
Generate a complete TSCM report from sweep data.
@@ -883,9 +875,8 @@ def generate_report(
if started_at:
if isinstance(started_at, str):
started_at = datetime.fromisoformat(started_at.replace('Z', '+00:00')).replace(tzinfo=None)
if completed_at:
if isinstance(completed_at, str):
completed_at = datetime.fromisoformat(completed_at.replace('Z', '+00:00')).replace(tzinfo=None)
if completed_at and isinstance(completed_at, str):
completed_at = datetime.fromisoformat(completed_at.replace('Z', '+00:00')).replace(tzinfo=None)
builder.set_sweep_times(started_at, completed_at)
# Capabilities
+2 -4
View File
@@ -11,8 +11,6 @@ from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Optional
# =============================================================================
# Signal Strength Classification
@@ -208,8 +206,8 @@ class ConfidenceLevel(Enum):
@dataclass
class SignalAssessment:
"""Complete signal assessment with confidence-safe language."""
rssi: Optional[float]
duration_seconds: Optional[float]
rssi: float | None
duration_seconds: float | None
observation_count: int
signal_strength: SignalStrength