diff --git a/static/css/components/signal-cards.css b/static/css/components/signal-cards.css index c411529..4fbe665 100644 --- a/static/css/components/signal-cards.css +++ b/static/css/components/signal-cards.css @@ -1136,3 +1136,129 @@ flex: 1; } } + +/* ============================================ + SIGNAL STRENGTH INDICATOR + Classification-based signal display + ============================================ */ +.signal-strength-indicator { + display: inline-flex; + align-items: center; + gap: 6px; + padding: 3px 8px; + background: var(--bg-secondary); + border-radius: 4px; + border: 1px solid var(--border-color); +} + +.signal-strength-indicator.compact { + padding: 2px 4px; + gap: 0; + background: transparent; + border: none; +} + +.signal-strength-bars { + display: inline-block; + vertical-align: middle; +} + +.signal-strength-bars rect { + transition: fill 0.2s ease; +} + +.signal-strength-label { + font-family: 'JetBrains Mono', monospace; + font-size: 10px; + font-weight: 500; + text-transform: uppercase; + letter-spacing: 0.03em; +} + +/* Confidence-based styling */ +.signal-confidence-low { + opacity: 0.7; +} + +.signal-confidence-medium { + opacity: 0.85; +} + +.signal-confidence-high { + opacity: 1; +} + +.signal-advanced-value.signal-confidence-low { + color: var(--text-dim); +} + +.signal-advanced-value.signal-confidence-medium { + color: var(--accent-amber); +} + +.signal-advanced-value.signal-confidence-high { + color: var(--accent-green); +} + +/* ============================================ + SIGNAL ASSESSMENT PANEL + Detailed signal analysis in advanced panel + ============================================ */ +.signal-assessment { + background: var(--bg-primary); + border: 1px solid var(--border-color); + border-radius: 4px; + padding: 10px; + margin-bottom: 12px; +} + +.signal-assessment-summary { + display: flex; + align-items: flex-start; + gap: 12px; + margin-bottom: 10px; + padding-bottom: 10px; + border-bottom: 1px solid var(--border-color); +} + +.signal-assessment-text { + font-size: 12px; + color: var(--text-secondary); + line-height: 1.5; + flex: 1; +} + +.signal-assessment-caveat { + font-size: 10px; + color: var(--text-dim); + font-style: italic; + margin-top: 8px; + padding-top: 8px; + border-top: 1px dashed var(--border-color); + line-height: 1.4; +} + +/* Signal assessment confidence badges */ +.signal-assessment .signal-advanced-grid { + margin-top: 8px; +} + +/* Range estimate styling */ +.signal-range-estimate { + display: flex; + align-items: center; + gap: 8px; + font-family: 'JetBrains Mono', monospace; + font-size: 11px; +} + +.signal-range-estimate .range-value { + color: var(--accent-cyan); + font-weight: 500; +} + +.signal-range-estimate .range-disclaimer { + font-size: 9px; + color: var(--text-dim); + font-style: italic; +} diff --git a/static/js/components/signal-cards.js b/static/js/components/signal-cards.js index 8bdddbe..04ff5e9 100644 --- a/static/js/components/signal-cards.js +++ b/static/js/components/signal-cards.js @@ -7,6 +7,250 @@ const SignalCards = (function() { 'use strict'; + // ========================================================================== + // Signal Strength Classification + // Translates RSSI values to confidence-safe, client-facing language + // ========================================================================== + + const SignalClassification = { + // RSSI thresholds (dBm) - upper bounds + THRESHOLDS: { + MINIMAL: -85, + WEAK: -70, + MODERATE: -55, + STRONG: -40 + // VERY_STRONG: > -40 + }, + + // Signal strength metadata + STRENGTH_INFO: { + minimal: { + label: 'Minimal', + description: 'At detection threshold', + interpretation: 'may be ambient noise or distant source', + confidence: 'low', + color: '#888888', + icon: 'signal-0', + bars: 1 + }, + weak: { + label: 'Weak', + description: 'Detectable signal', + interpretation: 'potentially distant or obstructed', + confidence: 'low', + color: '#6baed6', + icon: 'signal-1', + bars: 2 + }, + moderate: { + label: 'Moderate', + description: 'Consistent presence', + interpretation: 'likely in proximity', + confidence: 'medium', + color: '#3182bd', + icon: 'signal-2', + bars: 3 + }, + strong: { + label: 'Strong', + description: 'Clear signal', + interpretation: 'probable close proximity', + confidence: 'medium', + color: '#fd8d3c', + icon: 'signal-3', + bars: 4 + }, + very_strong: { + label: 'Very Strong', + description: 'High signal level', + interpretation: 'indicates likely nearby source', + confidence: 'high', + color: '#e6550d', + icon: 'signal-4', + bars: 5 + } + }, + + // Duration thresholds (seconds) + DURATION_THRESHOLDS: { + TRANSIENT: 5, + SHORT: 30, + SUSTAINED: 120 + // PERSISTENT: > 120 + }, + + DURATION_INFO: { + transient: { + label: 'Transient', + modifier: 'briefly observed', + confidence_impact: 'reduces confidence' + }, + short: { + label: 'Short-duration', + modifier: 'observed for a short period', + confidence_impact: 'limited confidence' + }, + sustained: { + label: 'Sustained', + modifier: 'observed over sustained period', + confidence_impact: 'supports confidence' + }, + persistent: { + label: 'Persistent', + modifier: 'continuously observed', + confidence_impact: 'increases confidence' + } + }, + + /** + * Classify RSSI value into qualitative signal strength + */ + classifyStrength(rssi) { + if (rssi === null || rssi === undefined || isNaN(rssi)) { + return 'minimal'; + } + const val = parseFloat(rssi); + if (val <= -85) return 'minimal'; + if (val <= -70) return 'weak'; + if (val <= -55) return 'moderate'; + if (val <= -40) return 'strong'; + return 'very_strong'; + }, + + /** + * Classify detection duration + */ + classifyDuration(seconds) { + if (seconds === null || seconds === undefined || seconds < 0) { + return 'transient'; + } + const val = parseFloat(seconds); + if (val < 5) return 'transient'; + if (val < 30) return 'short'; + if (val < 120) return 'sustained'; + return 'persistent'; + }, + + /** + * Get full signal strength info + */ + getStrengthInfo(rssi) { + const strength = this.classifyStrength(rssi); + return { + strength, + rssi, + ...this.STRENGTH_INFO[strength] + }; + }, + + /** + * Get full duration info + */ + getDurationInfo(seconds) { + const duration = this.classifyDuration(seconds); + return { + duration, + seconds, + ...this.DURATION_INFO[duration] + }; + }, + + /** + * Calculate overall confidence from signal + duration + observations + */ + calculateConfidence(rssi, durationSeconds, observationCount = 1) { + let score = 0; + const strength = this.classifyStrength(rssi); + const duration = this.classifyDuration(durationSeconds); + + // Signal strength contribution + if (strength === 'strong' || strength === 'very_strong') score += 2; + else if (strength === 'moderate') score += 1; + + // Duration contribution + if (duration === 'persistent') score += 2; + else if (duration === 'sustained') score += 1; + + // Observation count contribution + if (observationCount >= 5) score += 2; + else if (observationCount >= 3) score += 1; + + // Map to confidence level + if (score >= 5) return 'high'; + if (score >= 3) return 'medium'; + return 'low'; + }, + + /** + * Generate hedged summary statement + */ + generateSummary(rssi, durationSeconds, observationCount = 1) { + const strengthInfo = this.getStrengthInfo(rssi); + const durationInfo = this.getDurationInfo(durationSeconds); + const confidence = this.calculateConfidence(rssi, durationSeconds, observationCount); + + if (confidence === 'high') { + return `${strengthInfo.label}, ${durationInfo.label.toLowerCase()} signal with characteristics that suggest device presence in proximity`; + } else if (confidence === 'medium') { + return `${strengthInfo.label}, ${durationInfo.label.toLowerCase()} signal that may indicate device activity`; + } else { + return `${durationInfo.modifier.charAt(0).toUpperCase() + durationInfo.modifier.slice(1)} ${strengthInfo.label.toLowerCase()} signal consistent with possible device presence`; + } + }, + + /** + * Generate interpretation with hedging + */ + generateInterpretation(rssi, durationSeconds, observationCount = 1) { + const strengthInfo = this.getStrengthInfo(rssi); + const confidence = this.calculateConfidence(rssi, durationSeconds, observationCount); + const base = strengthInfo.interpretation; + + if (confidence === 'high') { + return `Observed signal characteristics suggest ${base}`; + } else if (confidence === 'medium') { + return `Signal pattern may indicate ${base}`; + } else { + return `Limited data; signal could represent ${base} or environmental factors`; + } + }, + + /** + * Estimate range from RSSI (with heavy caveats) + */ + estimateRange(rssi) { + if (rssi === null || rssi === undefined) { + return { estimate: 'Unknown', disclaimer: 'Insufficient signal data' }; + } + const val = parseFloat(rssi); + let estimate, rangeMin, rangeMax; + + if (val > -40) { + estimate = '< 3 meters'; + rangeMin = 0; rangeMax = 3; + } else if (val > -55) { + estimate = '3-10 meters'; + rangeMin = 3; rangeMax = 10; + } else if (val > -70) { + estimate = '5-20 meters'; + rangeMin = 5; rangeMax = 20; + } else if (val > -85) { + estimate = '10-50 meters'; + rangeMin = 10; rangeMax = 50; + } else { + estimate = '> 30 meters or heavily obstructed'; + rangeMin = 30; rangeMax = null; + } + + return { + estimate, + rangeMin, + rangeMax, + disclaimer: 'Range estimates are approximate and affected by walls, interference, and transmit power' + }; + } + }; + // Address tracking for new/repeated detection const addressHistory = { pager: new Map(), // address -> { count, firstSeen, lastSeen } @@ -211,6 +455,107 @@ const SignalCards = (function() { return /^[0-9\s\-\*\#U]+$/.test(message); } + /** + * Create signal strength indicator HTML + * Shows bars + label + optional tooltip with interpretation + */ + function createSignalIndicator(rssi, options = {}) { + if (rssi === null || rssi === undefined) return ''; + + const info = SignalClassification.getStrengthInfo(rssi); + const showLabel = options.showLabel !== false; + const showTooltip = options.showTooltip !== false; + const compact = options.compact === true; + + // Create signal bars SVG + const bars = info.bars; + const barsSvg = ` + + + + + + + + `; + + // Build tooltip content + let tooltipContent = ''; + if (showTooltip) { + const rangeEst = SignalClassification.estimateRange(rssi); + tooltipContent = ` + ${info.label} signal (${rssi} dBm) + ${info.description} + Est. range: ${rangeEst.estimate} + Confidence: ${info.confidence} + `.trim(); + } + + // Determine CSS class based on confidence + const confidenceClass = `signal-confidence-${info.confidence}`; + + if (compact) { + return ` + + ${barsSvg} + + `; + } + + return ` + + ${barsSvg} + ${showLabel ? `${info.label}` : ''} + + `; + } + + /** + * Create detailed signal assessment panel for advanced details + */ + function createSignalAssessmentPanel(rssi, durationSeconds, observationCount = 1) { + if (rssi === null || rssi === undefined) return ''; + + const strengthInfo = SignalClassification.getStrengthInfo(rssi); + const durationInfo = SignalClassification.getDurationInfo(durationSeconds); + const confidence = SignalClassification.calculateConfidence(rssi, durationSeconds, observationCount); + const rangeEst = SignalClassification.estimateRange(rssi); + const interpretation = SignalClassification.generateInterpretation(rssi, durationSeconds, observationCount); + + return ` +
+
Signal Assessment
+
+ ${createSignalIndicator(rssi, { compact: false, showTooltip: false })} + ${escapeHtml(interpretation)} +
+
+
+ Signal Strength + ${strengthInfo.label} (${rssi} dBm) +
+
+ Detection + ${durationInfo.label} +
+
+ Est. Range + ${rangeEst.estimate} +
+
+ Confidence + ${confidence.charAt(0).toUpperCase() + confidence.slice(1)} +
+
+
+ Note: ${rangeEst.disclaimer} +
+
+ `; + } + /** * Get message type label */ @@ -482,6 +827,10 @@ const SignalCards = (function() { const stats = getAddressStats('sensor', msg.id); const seenCount = stats ? stats.count : 1; + // Get signal strength if available + const rssi = msg.rssi || msg.signal_strength || msg.snr || null; + const signalIndicator = createSignalIndicator(rssi, { compact: true }); + // Determine sensor type icon let sensorIcon = '📡'; const model = (msg.model || '').toLowerCase(); @@ -497,6 +846,7 @@ const SignalCards = (function() {
${sensorIcon} ${escapeHtml(msg.model || 'Unknown')} ID: ${escapeHtml(msg.id || 'N/A')} + ${signalIndicator}
${status !== 'baseline' ? ` @@ -570,6 +920,7 @@ const SignalCards = (function() {
+ ${rssi !== null ? createSignalAssessmentPanel(rssi, stats?.lastSeen ? (Date.now() - stats.firstSeen) / 1000 : null, seenCount) : ''}
Sensor Details
@@ -1336,6 +1687,11 @@ const SignalCards = (function() { createAcarsCard, createMeterCard, + // Signal classification + SignalClassification, + createSignalIndicator, + createSignalAssessmentPanel, + // UI interactions toggleAdvanced, copyMessage, diff --git a/utils/tscm/detector.py b/utils/tscm/detector.py index 9d9fdb7..2957f9d 100644 --- a/utils/tscm/detector.py +++ b/utils/tscm/detector.py @@ -20,6 +20,11 @@ from data.tscm_frequencies import ( is_known_tracker, is_potential_camera, ) +from utils.tscm.signal_classification import ( + classify_signal_strength, + get_signal_strength_info, + SignalStrength, +) logger = logging.getLogger('intercept.tscm.detector') @@ -171,8 +176,11 @@ class ThreatDetector: signal_val = int(signal) if signal else -100 except (ValueError, TypeError): signal_val = -100 - if not ssid and signal and signal_val > -60: - reasons.append('Hidden SSID with strong signal') + + # Use standardized signal classification + signal_info = get_signal_strength_info(signal_val) + if not ssid and signal_info['strength'] in ('strong', 'very_strong'): + reasons.append(f"Hidden SSID with {signal_info['label'].lower()} signal") classification = 'high_interest' # Repeat detections across scans @@ -181,11 +189,17 @@ class ThreatDetector: if classification != 'high_interest': classification = 'high_interest' + # Include standardized signal classification + signal_info = get_signal_strength_info(signal_val) + return { 'classification': classification, 'reasons': reasons, 'in_baseline': in_baseline, 'times_seen': times_seen, + 'signal_strength': signal_info['strength'], + 'signal_label': signal_info['label'], + 'signal_confidence': signal_info['confidence'], } def classify_bt_device(self, device: dict) -> dict: @@ -236,13 +250,15 @@ class ThreatDetector: reasons.append('Audio-capable BLE device') classification = 'high_interest' - # Strong signal from unknown device + # Strong signal from unknown device - use standardized classification try: rssi_val = int(rssi) if rssi else -100 except (ValueError, TypeError): rssi_val = -100 - if rssi and rssi_val > -50 and not name: - reasons.append('Strong signal from unnamed device') + + signal_info = get_signal_strength_info(rssi_val) + if signal_info['strength'] in ('strong', 'very_strong') and not name: + reasons.append(f"{signal_info['label']} signal from unnamed device") classification = 'high_interest' # Repeat detections across scans @@ -251,6 +267,9 @@ class ThreatDetector: if classification != 'high_interest': classification = 'high_interest' + # Include standardized signal classification + signal_info = get_signal_strength_info(rssi_val) + return { 'classification': classification, 'reasons': reasons, @@ -258,6 +277,9 @@ class ThreatDetector: 'times_seen': times_seen, 'is_tracker': tracker_info is not None, 'is_audio_capable': _is_audio_capable_ble(name, device_type), + 'signal_strength': signal_info['strength'], + 'signal_label': signal_info['label'], + 'signal_confidence': signal_info['confidence'], } def classify_rf_signal(self, signal: dict) -> dict: @@ -301,16 +323,25 @@ class ThreatDetector: reasons.append(f'High-risk surveillance band: {band_name}') classification = 'high_interest' - # Strong persistent signal - if power and float(power) > -40: - reasons.append('Strong persistent transmitter') - classification = 'high_interest' + # Strong persistent signal - use standardized classification + if power: + power_info = get_signal_strength_info(float(power)) + if power_info['strength'] in ('strong', 'very_strong'): + reasons.append(f"{power_info['label']} persistent transmitter") + classification = 'high_interest' # Repeat detections (persistent transmitter) if times_seen >= 2: reasons.append(f'Persistent transmitter ({times_seen} detections)') classification = 'high_interest' + # Include standardized signal classification + try: + power_val = float(power) if power else -100 + except (ValueError, TypeError): + power_val = -100 + signal_info = get_signal_strength_info(power_val) + return { 'classification': classification, 'reasons': reasons, @@ -318,6 +349,9 @@ class ThreatDetector: 'times_seen': times_seen, 'risk_level': risk, 'band_name': band_name, + 'signal_strength': signal_info['strength'], + 'signal_label': signal_info['label'], + 'signal_confidence': signal_info['confidence'], } def analyze_wifi_device(self, device: dict) -> dict | None: @@ -353,16 +387,18 @@ class ThreatDetector: 'reason': 'Device matches WiFi camera patterns', }) - # Check for hidden SSID with strong signal + # Check for hidden SSID with strong signal - use standardized classification try: signal_int = int(signal) if signal else -100 except (ValueError, TypeError): signal_int = -100 - if not ssid and signal and signal_int > -60: + + signal_info = get_signal_strength_info(signal_int) + if not ssid and signal_info['strength'] in ('strong', 'very_strong'): threats.append({ 'type': 'anomaly', 'severity': 'medium', - 'reason': 'Hidden SSID with strong signal', + 'reason': f"Hidden SSID with {signal_info['label'].lower()} signal", }) if not threats: @@ -422,16 +458,18 @@ class ThreatDetector: 'tracker_type': tracker_info.get('name'), }) - # Check for suspicious BLE beacons (unnamed, persistent) + # Check for suspicious BLE beacons (unnamed, persistent) - use standardized classification try: rssi_int = int(rssi) if rssi else -100 except (ValueError, TypeError): rssi_int = -100 - if not name and rssi and rssi_int > -70: + + signal_info = get_signal_strength_info(rssi_int) + if not name and signal_info['strength'] in ('moderate', 'strong', 'very_strong'): threats.append({ 'type': 'anomaly', 'severity': 'medium', - 'reason': 'Unnamed BLE device with strong signal', + 'reason': f"Unnamed BLE device with {signal_info['label'].lower()} signal", }) if not threats: diff --git a/utils/tscm/reports.py b/utils/tscm/reports.py index 27ba033..627bb03 100644 --- a/utils/tscm/reports.py +++ b/utils/tscm/reports.py @@ -19,6 +19,17 @@ from dataclasses import dataclass, field from datetime import datetime from typing import Any, Optional +from utils.tscm.signal_classification import ( + SignalStrength, + ConfidenceLevel, + assess_signal, + classify_signal_strength, + describe_signal_for_report, + format_signal_for_dashboard, + generate_hedged_statement, + SIGNAL_ANALYSIS_DISCLAIMER, +) + logger = logging.getLogger('intercept.tscm.reports') # ============================================================================= @@ -37,6 +48,11 @@ class ReportFinding: indicators: list[dict] = field(default_factory=list) recommended_action: str = '' playbook_reference: str = '' + # Signal classification data + signal_strength: Optional[str] = None # minimal, weak, moderate, strong, very_strong + signal_confidence: Optional[str] = None # low, medium, high + signal_interpretation: Optional[str] = None + signal_caveats: list[str] = field(default_factory=list) @dataclass @@ -225,7 +241,7 @@ def generate_executive_summary(report: TSCMReport) -> str: def generate_findings_section(findings: list[ReportFinding], title: str) -> str: - """Generate a findings section for the report.""" + """Generate a findings section for the report with confidence-safe language.""" if not findings: return f"{title}\n\nNo findings in this category.\n" @@ -236,14 +252,33 @@ def generate_findings_section(findings: list[ReportFinding], title: str) -> str: lines.append(f" Protocol: {finding.protocol.upper()}") lines.append(f" Identifier: {finding.identifier}") lines.append(f" Risk Score: {finding.risk_score}") - lines.append(f" Description: {finding.description}") + + # Signal classification with confidence + if finding.signal_strength: + confidence_label = (finding.signal_confidence or 'low').capitalize() + strength_label = finding.signal_strength.replace('_', ' ').title() + lines.append(f" Signal: {strength_label} (Confidence: {confidence_label})") + + lines.append(f" Assessment: {finding.description}") + + # Interpretation with hedged language + if finding.signal_interpretation: + lines.append(f" Interpretation: {finding.signal_interpretation}") + if finding.indicators: lines.append(" Indicators:") for ind in finding.indicators[:5]: # Limit to 5 indicators lines.append(f" - {ind.get('type', 'unknown')}: {ind.get('description', '')}") + lines.append(f" Recommended Action: {finding.recommended_action}") + if finding.playbook_reference: lines.append(f" Reference: {finding.playbook_reference}") + + # Include relevant caveats for high-interest findings + if finding.signal_caveats and finding.risk_level == 'high_interest': + lines.append(" Note: " + finding.signal_caveats[0]) + lines.append("") return "\n".join(lines) @@ -345,6 +380,13 @@ def generate_pdf_content(report: TSCMReport) -> str: sections.append(f" - {limit}") sections.append("") + # Signal Analysis Note + sections.append("-" * 70) + sections.append("SIGNAL ANALYSIS METHODOLOGY") + sections.append("=" * 27) + sections.append(SIGNAL_ANALYSIS_DISCLAIMER.strip()) + sections.append("") + # Disclaimer sections.append("-" * 70) sections.append(REPORT_DISCLAIMER) @@ -407,6 +449,12 @@ def generate_technical_annex_json(report: TSCMReport) -> dict: 'description': f.description, 'indicators': f.indicators, 'recommended_action': f.recommended_action, + 'signal_classification': { + 'strength': f.signal_strength, + 'confidence': f.signal_confidence, + 'interpretation': f.signal_interpretation, + 'caveats': f.signal_caveats, + }, } for f in report.high_interest_findings ], @@ -418,6 +466,12 @@ def generate_technical_annex_json(report: TSCMReport) -> dict: 'risk_score': f.risk_score, 'description': f.description, 'indicators': f.indicators, + 'signal_classification': { + 'strength': f.signal_strength, + 'confidence': f.signal_confidence, + 'interpretation': f.signal_interpretation, + 'caveats': f.signal_caveats, + }, } for f in report.needs_review_findings ], @@ -504,7 +558,11 @@ def generate_technical_annex_csv(report: TSCMReport) -> str: # Also add findings summary writer.writerow([]) writer.writerow(['--- FINDINGS SUMMARY ---']) - writer.writerow(['identifier', 'protocol', 'risk_level', 'risk_score', 'description', 'recommended_action']) + writer.writerow([ + 'identifier', 'protocol', 'risk_level', 'risk_score', + 'signal_strength', 'signal_confidence', + 'description', 'interpretation', 'recommended_action' + ]) all_findings = ( report.high_interest_findings + @@ -517,7 +575,10 @@ def generate_technical_annex_csv(report: TSCMReport) -> str: finding.protocol, finding.risk_level, finding.risk_score, + finding.signal_strength or '', + finding.signal_confidence or '', finding.description, + finding.signal_interpretation or '', finding.recommended_action, ]) @@ -591,6 +652,9 @@ class TSCMReportBuilder: def add_findings_from_profiles(self, profiles: list[dict]) -> 'TSCMReportBuilder': """Add findings from correlation engine device profiles.""" for profile in profiles: + # Get signal classification data + signal_data = self._classify_finding_signal(profile) + finding = ReportFinding( identifier=profile.get('identifier', ''), protocol=profile.get('protocol', ''), @@ -601,26 +665,90 @@ class TSCMReportBuilder: indicators=profile.get('indicators', []), recommended_action=profile.get('recommended_action', 'monitor'), playbook_reference=self._get_playbook_reference(profile), + signal_strength=signal_data['signal_strength'], + signal_confidence=signal_data['signal_confidence'], + signal_interpretation=signal_data['signal_interpretation'], + signal_caveats=signal_data['signal_caveats'], ) self.add_finding(finding) return self def _generate_finding_description(self, profile: dict) -> str: - """Generate description from profile indicators.""" + """Generate description from profile indicators using hedged language.""" indicators = profile.get('indicators', []) - if not indicators: - return f"{profile.get('protocol', 'Unknown').upper()} device detected" + protocol = profile.get('protocol', 'Unknown').upper() - # Use first indicator as primary description + # Get signal data for context + rssi = profile.get('rssi_mean') or profile.get('rssi') + duration = profile.get('observation_duration_seconds') + observation_count = profile.get('observation_count', 1) + + # Assess signal to determine confidence + assessment = assess_signal(rssi, duration, observation_count) + confidence = assessment.confidence + + if not indicators: + # Use hedged language based on confidence + return generate_hedged_statement( + f"Observed {protocol} signal", + 'device_presence', + confidence + ) + + # Build description with hedged language primary = indicators[0] - desc = primary.get('description', 'Pattern detected') + indicator_type = primary.get('type', 'pattern') + + # Map indicator types to hedged descriptions + if indicator_type in ('airtag_detected', 'tile_detected', 'smarttag_detected', 'known_tracker'): + desc = generate_hedged_statement( + f"{protocol} signal characteristics", + 'device_presence', + confidence + ) + desc += f" - pattern consistent with {indicator_type.replace('_', ' ')}" + elif indicator_type == 'audio_capable': + desc = generate_hedged_statement( + "Device characteristics", + 'surveillance_indicator', + confidence + ) + desc += " - audio-capable device type identified" + elif indicator_type in ('hidden_identity', 'hidden_ssid'): + desc = generate_hedged_statement( + "Network configuration", + 'surveillance_indicator', + confidence + ) + desc += " - concealed identity pattern observed" + else: + desc = generate_hedged_statement( + f"{protocol} signal pattern", + 'device_presence', + confidence + ) if len(indicators) > 1: desc += f" (+{len(indicators) - 1} additional indicators)" return desc + def _classify_finding_signal(self, profile: dict) -> dict: + """Extract signal classification data for a finding.""" + rssi = profile.get('rssi_mean') or profile.get('rssi') + duration = profile.get('observation_duration_seconds') + observation_count = profile.get('observation_count', 1) + + assessment = assess_signal(rssi, duration, observation_count) + + return { + 'signal_strength': assessment.signal_strength.value, + 'signal_confidence': assessment.confidence.value, + 'signal_interpretation': assessment.interpretation, + 'signal_caveats': assessment.caveats, + } + def _get_playbook_reference(self, profile: dict) -> str: """Get playbook reference based on profile.""" risk_level = profile.get('risk_level', 'informational') @@ -761,27 +889,27 @@ def generate_report( # Add findings from profiles builder.add_findings_from_profiles(device_profiles) - # Statistics - results = sweep_data.get('results', {}) - wifi_count = results.get('wifi_count') - if wifi_count is None: - wifi_count = len(results.get('wifi_devices', results.get('wifi', []))) - - bt_count = results.get('bt_count') - if bt_count is None: - bt_count = len(results.get('bt_devices', results.get('bluetooth', []))) - - rf_count = results.get('rf_count') - if rf_count is None: - rf_count = len(results.get('rf_signals', results.get('rf', []))) - - builder.add_statistics( - wifi=wifi_count, - bluetooth=bt_count, - rf=rf_count, - new=baseline_diff.get('summary', {}).get('new_devices', 0) if baseline_diff else 0, - missing=baseline_diff.get('summary', {}).get('missing_devices', 0) if baseline_diff else 0, - ) + # Statistics + results = sweep_data.get('results', {}) + wifi_count = results.get('wifi_count') + if wifi_count is None: + wifi_count = len(results.get('wifi_devices', results.get('wifi', []))) + + bt_count = results.get('bt_count') + if bt_count is None: + bt_count = len(results.get('bt_devices', results.get('bluetooth', []))) + + rf_count = results.get('rf_count') + if rf_count is None: + rf_count = len(results.get('rf_signals', results.get('rf', []))) + + builder.add_statistics( + wifi=wifi_count, + bluetooth=bt_count, + rf=rf_count, + new=baseline_diff.get('summary', {}).get('new_devices', 0) if baseline_diff else 0, + missing=baseline_diff.get('summary', {}).get('missing_devices', 0) if baseline_diff else 0, + ) # Technical data builder.add_device_timelines(timelines) diff --git a/utils/tscm/signal_classification.py b/utils/tscm/signal_classification.py new file mode 100644 index 0000000..68deb54 --- /dev/null +++ b/utils/tscm/signal_classification.py @@ -0,0 +1,631 @@ +""" +Signal Classification Module + +Translates technical RF measurements (RSSI, duration) into confidence-safe, +client-facing language suitable for reports and dashboards. + +All outputs use hedged language that avoids absolute claims. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +# ============================================================================= +# Signal Strength Classification +# ============================================================================= + +class SignalStrength(Enum): + """Qualitative signal strength labels.""" + MINIMAL = "minimal" + WEAK = "weak" + MODERATE = "moderate" + STRONG = "strong" + VERY_STRONG = "very_strong" + + +# RSSI thresholds (dBm) - upper bounds for each category +RSSI_THRESHOLDS = { + SignalStrength.MINIMAL: -85, # -100 to -85 + SignalStrength.WEAK: -70, # -84 to -70 + SignalStrength.MODERATE: -55, # -69 to -55 + SignalStrength.STRONG: -40, # -54 to -40 + SignalStrength.VERY_STRONG: 0, # > -40 +} + +SIGNAL_STRENGTH_DESCRIPTIONS = { + SignalStrength.MINIMAL: { + 'label': 'Minimal', + 'description': 'At detection threshold', + 'interpretation': 'may be ambient noise or distant source', + 'confidence': 'low', + }, + SignalStrength.WEAK: { + 'label': 'Weak', + 'description': 'Detectable signal', + 'interpretation': 'potentially distant or obstructed', + 'confidence': 'low', + }, + SignalStrength.MODERATE: { + 'label': 'Moderate', + 'description': 'Consistent presence', + 'interpretation': 'likely in proximity', + 'confidence': 'medium', + }, + SignalStrength.STRONG: { + 'label': 'Strong', + 'description': 'Clear signal', + 'interpretation': 'probable close proximity', + 'confidence': 'medium', + }, + SignalStrength.VERY_STRONG: { + 'label': 'Very Strong', + 'description': 'High signal level', + 'interpretation': 'indicates likely nearby source', + 'confidence': 'high', + }, +} + + +def classify_signal_strength(rssi: float | int | None) -> SignalStrength: + """ + Classify RSSI value into qualitative signal strength. + + Args: + rssi: Signal strength in dBm (typically -100 to 0) + + Returns: + SignalStrength enum value + """ + if rssi is None: + return SignalStrength.MINIMAL + + try: + rssi_val = float(rssi) + except (ValueError, TypeError): + return SignalStrength.MINIMAL + + if rssi_val <= -85: + return SignalStrength.MINIMAL + elif rssi_val <= -70: + return SignalStrength.WEAK + elif rssi_val <= -55: + return SignalStrength.MODERATE + elif rssi_val <= -40: + return SignalStrength.STRONG + else: + return SignalStrength.VERY_STRONG + + +def get_signal_strength_info(rssi: float | int | None) -> dict: + """ + Get full signal strength classification with metadata. + + Returns dict with: strength, label, description, interpretation, confidence + """ + strength = classify_signal_strength(rssi) + info = SIGNAL_STRENGTH_DESCRIPTIONS[strength].copy() + info['strength'] = strength.value + info['rssi'] = rssi + return info + + +# ============================================================================= +# Detection Duration / Confidence Modifiers +# ============================================================================= + +class DetectionDuration(Enum): + """Qualitative duration labels.""" + TRANSIENT = "transient" + SHORT = "short" + SUSTAINED = "sustained" + PERSISTENT = "persistent" + + +# Duration thresholds (seconds) +DURATION_THRESHOLDS = { + DetectionDuration.TRANSIENT: 5, # < 5 seconds + DetectionDuration.SHORT: 30, # 5-30 seconds + DetectionDuration.SUSTAINED: 120, # 30s - 2 min + DetectionDuration.PERSISTENT: float('inf'), # > 2 min +} + +DURATION_DESCRIPTIONS = { + DetectionDuration.TRANSIENT: { + 'label': 'Transient', + 'modifier': 'briefly observed', + 'confidence_impact': 'reduces confidence', + }, + DetectionDuration.SHORT: { + 'label': 'Short-duration', + 'modifier': 'observed for a short period', + 'confidence_impact': 'limited confidence', + }, + DetectionDuration.SUSTAINED: { + 'label': 'Sustained', + 'modifier': 'observed over sustained period', + 'confidence_impact': 'supports confidence', + }, + DetectionDuration.PERSISTENT: { + 'label': 'Persistent', + 'modifier': 'continuously observed', + 'confidence_impact': 'increases confidence', + }, +} + + +def classify_duration(seconds: float | int | None) -> DetectionDuration: + """ + Classify detection duration into qualitative category. + + Args: + seconds: Duration of detection in seconds + + Returns: + DetectionDuration enum value + """ + if seconds is None or seconds < 0: + return DetectionDuration.TRANSIENT + + try: + duration = float(seconds) + except (ValueError, TypeError): + return DetectionDuration.TRANSIENT + + if duration < 5: + return DetectionDuration.TRANSIENT + elif duration < 30: + return DetectionDuration.SHORT + elif duration < 120: + return DetectionDuration.SUSTAINED + else: + return DetectionDuration.PERSISTENT + + +def get_duration_info(seconds: float | int | None) -> dict: + """Get full duration classification with metadata.""" + duration = classify_duration(seconds) + info = DURATION_DESCRIPTIONS[duration].copy() + info['duration'] = duration.value + info['seconds'] = seconds + return info + + +# ============================================================================= +# Combined Confidence Assessment +# ============================================================================= + +class ConfidenceLevel(Enum): + """Overall detection confidence.""" + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + + +@dataclass +class SignalAssessment: + """Complete signal assessment with confidence-safe language.""" + rssi: Optional[float] + duration_seconds: Optional[float] + observation_count: int + + signal_strength: SignalStrength + detection_duration: DetectionDuration + confidence: ConfidenceLevel + + # Client-safe descriptions + strength_label: str + duration_label: str + summary: str + interpretation: str + caveats: list[str] + + +def assess_signal( + rssi: float | int | None = None, + duration_seconds: float | int | None = None, + observation_count: int = 1, + has_corroborating_data: bool = False, +) -> SignalAssessment: + """ + Produce a complete signal assessment with confidence-safe language. + + Args: + rssi: Signal strength in dBm + duration_seconds: How long signal was detected + observation_count: Number of separate observations + has_corroborating_data: Whether other data supports this detection + + Returns: + SignalAssessment with hedged, client-safe language + """ + strength = classify_signal_strength(rssi) + duration = classify_duration(duration_seconds) + + # Calculate confidence based on multiple factors + confidence = _calculate_confidence( + strength, duration, observation_count, has_corroborating_data + ) + + strength_info = SIGNAL_STRENGTH_DESCRIPTIONS[strength] + duration_info = DURATION_DESCRIPTIONS[duration] + + # Build client-safe summary + summary = _build_summary(strength, duration, confidence) + interpretation = _build_interpretation(strength, duration, confidence) + caveats = _build_caveats(strength, duration, confidence) + + return SignalAssessment( + rssi=rssi, + duration_seconds=duration_seconds, + observation_count=observation_count, + signal_strength=strength, + detection_duration=duration, + confidence=confidence, + strength_label=strength_info['label'], + duration_label=duration_info['label'], + summary=summary, + interpretation=interpretation, + caveats=caveats, + ) + + +def _calculate_confidence( + strength: SignalStrength, + duration: DetectionDuration, + observation_count: int, + has_corroborating_data: bool, +) -> ConfidenceLevel: + """Calculate overall confidence from contributing factors.""" + score = 0 + + # Signal strength contribution + if strength in (SignalStrength.STRONG, SignalStrength.VERY_STRONG): + score += 2 + elif strength == SignalStrength.MODERATE: + score += 1 + + # Duration contribution + if duration == DetectionDuration.PERSISTENT: + score += 2 + elif duration == DetectionDuration.SUSTAINED: + score += 1 + + # Observation count contribution + if observation_count >= 5: + score += 2 + elif observation_count >= 3: + score += 1 + + # Corroborating data bonus + if has_corroborating_data: + score += 1 + + # Map score to confidence level + if score >= 5: + return ConfidenceLevel.HIGH + elif score >= 3: + return ConfidenceLevel.MEDIUM + else: + return ConfidenceLevel.LOW + + +def _build_summary( + strength: SignalStrength, + duration: DetectionDuration, + confidence: ConfidenceLevel, +) -> str: + """Build a confidence-safe summary statement.""" + strength_info = SIGNAL_STRENGTH_DESCRIPTIONS[strength] + duration_info = DURATION_DESCRIPTIONS[duration] + + if confidence == ConfidenceLevel.HIGH: + return ( + f"{strength_info['label']}, {duration_info['label'].lower()} signal " + f"with characteristics that suggest device presence in proximity" + ) + elif confidence == ConfidenceLevel.MEDIUM: + return ( + f"{strength_info['label']}, {duration_info['label'].lower()} signal " + f"that may indicate device activity" + ) + else: + return ( + f"{duration_info['modifier'].capitalize()} {strength_info['label'].lower()} signal " + f"consistent with possible device presence" + ) + + +def _build_interpretation( + strength: SignalStrength, + duration: DetectionDuration, + confidence: ConfidenceLevel, +) -> str: + """Build interpretation text with appropriate hedging.""" + strength_info = SIGNAL_STRENGTH_DESCRIPTIONS[strength] + + base = strength_info['interpretation'] + + if confidence == ConfidenceLevel.HIGH: + return f"Observed signal characteristics suggest {base}" + elif confidence == ConfidenceLevel.MEDIUM: + return f"Signal pattern may indicate {base}" + else: + return f"Limited data; signal could represent {base} or environmental factors" + + +def _build_caveats( + strength: SignalStrength, + duration: DetectionDuration, + confidence: ConfidenceLevel, +) -> list[str]: + """Build list of relevant caveats for the assessment.""" + caveats = [] + + # Always include general caveat + caveats.append( + "Signal strength is affected by environmental factors including walls, " + "interference, and device orientation" + ) + + # Strength-specific caveats + if strength in (SignalStrength.MINIMAL, SignalStrength.WEAK): + caveats.append( + "Weak signals may represent background noise, distant devices, " + "or heavily obstructed sources" + ) + + # Duration-specific caveats + if duration == DetectionDuration.TRANSIENT: + caveats.append( + "Brief detection may indicate passing device, intermittent transmission, " + "or momentary interference" + ) + + # Confidence-specific caveats + if confidence == ConfidenceLevel.LOW: + caveats.append( + "Insufficient data for reliable assessment; additional observation recommended" + ) + + return caveats + + +# ============================================================================= +# Client-Facing Language Generators +# ============================================================================= + +def describe_signal_for_report( + rssi: float | int | None, + duration_seconds: float | int | None = None, + observation_count: int = 1, + protocol: str = "RF", +) -> dict: + """ + Generate client-facing signal description for reports. + + Returns dict with: + - headline: Short summary for quick scanning + - description: Detailed description with hedged language + - technical: Technical details (RSSI value, duration) + - confidence: Confidence level + - caveats: List of applicable caveats + """ + assessment = assess_signal(rssi, duration_seconds, observation_count) + + # Estimate range (very approximate, with appropriate hedging) + range_estimate = _estimate_range(rssi) + + return { + 'headline': f"{assessment.strength_label} {protocol} signal, {assessment.duration_label.lower()}", + 'description': assessment.summary, + 'interpretation': assessment.interpretation, + 'technical': { + 'rssi_dbm': rssi, + 'strength_category': assessment.signal_strength.value, + 'duration_seconds': duration_seconds, + 'duration_category': assessment.detection_duration.value, + 'observations': observation_count, + }, + 'range_estimate': range_estimate, + 'confidence': assessment.confidence.value, + 'confidence_factors': { + 'signal_strength': assessment.strength_label, + 'detection_duration': assessment.duration_label, + 'observation_count': observation_count, + }, + 'caveats': assessment.caveats, + } + + +def _estimate_range(rssi: float | int | None) -> dict: + """ + Estimate approximate range from RSSI with heavy caveats. + + Returns range as min/max estimate with disclaimer. + """ + if rssi is None: + return { + 'estimate': 'Unknown', + 'disclaimer': 'Insufficient signal data for range estimation', + } + + try: + rssi_val = float(rssi) + except (ValueError, TypeError): + return { + 'estimate': 'Unknown', + 'disclaimer': 'Invalid signal data', + } + + # Very rough estimates based on free-space path loss + # These are intentionally wide ranges due to environmental variability + if rssi_val > -40: + estimate = "< 3 meters" + range_min, range_max = 0, 3 + elif rssi_val > -55: + estimate = "3-10 meters" + range_min, range_max = 3, 10 + elif rssi_val > -70: + estimate = "5-20 meters" + range_min, range_max = 5, 20 + elif rssi_val > -85: + estimate = "10-50 meters" + range_min, range_max = 10, 50 + else: + estimate = "> 30 meters or heavily obstructed" + range_min, range_max = 30, None + + return { + 'estimate': estimate, + 'range_min_meters': range_min, + 'range_max_meters': range_max, + 'disclaimer': ( + "Range estimates are approximate and significantly affected by " + "walls, interference, antenna characteristics, and transmit power" + ), + } + + +def format_signal_for_dashboard( + rssi: float | int | None, + duration_seconds: float | int | None = None, +) -> dict: + """ + Generate dashboard-friendly signal display data. + + Returns dict with: + - label: Short label for display + - color: Suggested color code + - icon: Suggested icon name + - tooltip: Hover text with details + """ + strength = classify_signal_strength(rssi) + duration = classify_duration(duration_seconds) + + colors = { + SignalStrength.MINIMAL: '#888888', # Gray + SignalStrength.WEAK: '#6baed6', # Light blue + SignalStrength.MODERATE: '#3182bd', # Blue + SignalStrength.STRONG: '#fd8d3c', # Orange + SignalStrength.VERY_STRONG: '#e6550d', # Red-orange + } + + icons = { + SignalStrength.MINIMAL: 'signal-0', + SignalStrength.WEAK: 'signal-1', + SignalStrength.MODERATE: 'signal-2', + SignalStrength.STRONG: 'signal-3', + SignalStrength.VERY_STRONG: 'signal-4', + } + + strength_info = SIGNAL_STRENGTH_DESCRIPTIONS[strength] + duration_info = DURATION_DESCRIPTIONS[duration] + + tooltip = f"{strength_info['label']} signal ({rssi} dBm)" + if duration_seconds is not None: + tooltip += f", {duration_info['modifier']}" + + return { + 'label': strength_info['label'], + 'color': colors[strength], + 'icon': icons[strength], + 'tooltip': tooltip, + 'strength': strength.value, + 'duration': duration.value, + } + + +# ============================================================================= +# Hedged Language Patterns +# ============================================================================= + +# Vocabulary for generating hedged statements +HEDGED_VERBS = { + 'high_confidence': [ + 'suggests', + 'indicates', + 'is consistent with', + ], + 'medium_confidence': [ + 'may indicate', + 'could suggest', + 'is potentially consistent with', + ], + 'low_confidence': [ + 'might represent', + 'could possibly indicate', + 'may or may not suggest', + ], +} + +HEDGED_CONCLUSIONS = { + 'device_presence': { + 'high': 'likely device presence in proximity', + 'medium': 'possible device activity', + 'low': 'potential device presence, though environmental factors cannot be ruled out', + }, + 'surveillance_indicator': { + 'high': 'characteristics warranting further investigation', + 'medium': 'pattern that may warrant review', + 'low': 'inconclusive pattern requiring additional data', + }, + 'location': { + 'high': 'probable location within estimated range', + 'medium': 'possible location in general vicinity', + 'low': 'uncertain location; signal could originate from various distances', + }, +} + + +def generate_hedged_statement( + subject: str, + conclusion_type: str, + confidence: ConfidenceLevel | str, +) -> str: + """ + Generate a hedged statement for reports. + + Args: + subject: What we're describing (e.g., "The detected WiFi signal") + conclusion_type: Type of conclusion (device_presence, surveillance_indicator, location) + confidence: Confidence level + + Returns: + Hedged statement string + """ + if isinstance(confidence, ConfidenceLevel): + conf_key = confidence.value + else: + conf_key = str(confidence).lower() + + verbs = HEDGED_VERBS.get(f'{conf_key}_confidence', HEDGED_VERBS['low_confidence']) + conclusions = HEDGED_CONCLUSIONS.get(conclusion_type, {}) + conclusion = conclusions.get(conf_key, conclusions.get('low', 'an inconclusive pattern')) + + verb = verbs[0] # Use first verb for consistency + + return f"{subject} {verb} {conclusion}" + + +# ============================================================================= +# Standard Disclaimer Text +# ============================================================================= + +SIGNAL_ANALYSIS_DISCLAIMER = """ +Signal analysis provides indicators for further investigation and should not be +interpreted as definitive identification of devices or their purposes. Environmental +factors such as building materials, electromagnetic interference, multipath +propagation, and device orientation significantly affect signal measurements. +All findings represent patterns observed at a specific point in time and location. +""" + +RANGE_ESTIMATION_DISCLAIMER = """ +Distance estimates are based on signal strength measurements and standard radio +propagation models. Actual distances may vary significantly due to transmit power +variations, antenna characteristics, physical obstructions, and environmental +conditions. These estimates should be considered approximate guidelines only. +"""