Add signal strength classification with confidence-safe language

Introduces standardized RSSI-to-label mapping (minimal/weak/moderate/strong/very_strong)
and duration-based confidence modifiers for client-facing reports and dashboards.

- New signal_classification.py module with hedged language generation
- Updated detector.py to use standardized signal descriptions
- Enhanced reports.py with signal classification in findings
- Added JS SignalClassification and signal indicator components
- CSS styles for signal strength bars and assessment panels

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-01-20 21:37:07 +00:00
parent 8a744eb55a
commit 9461cc2121
5 changed files with 1323 additions and 44 deletions

View File

@@ -19,6 +19,17 @@ from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
from utils.tscm.signal_classification import (
SignalStrength,
ConfidenceLevel,
assess_signal,
classify_signal_strength,
describe_signal_for_report,
format_signal_for_dashboard,
generate_hedged_statement,
SIGNAL_ANALYSIS_DISCLAIMER,
)
logger = logging.getLogger('intercept.tscm.reports')
# =============================================================================
@@ -37,6 +48,11 @@ class ReportFinding:
indicators: list[dict] = field(default_factory=list)
recommended_action: str = ''
playbook_reference: str = ''
# Signal classification data
signal_strength: Optional[str] = None # minimal, weak, moderate, strong, very_strong
signal_confidence: Optional[str] = None # low, medium, high
signal_interpretation: Optional[str] = None
signal_caveats: list[str] = field(default_factory=list)
@dataclass
@@ -225,7 +241,7 @@ def generate_executive_summary(report: TSCMReport) -> str:
def generate_findings_section(findings: list[ReportFinding], title: str) -> str:
"""Generate a findings section for the report."""
"""Generate a findings section for the report with confidence-safe language."""
if not findings:
return f"{title}\n\nNo findings in this category.\n"
@@ -236,14 +252,33 @@ def generate_findings_section(findings: list[ReportFinding], title: str) -> str:
lines.append(f" Protocol: {finding.protocol.upper()}")
lines.append(f" Identifier: {finding.identifier}")
lines.append(f" Risk Score: {finding.risk_score}")
lines.append(f" Description: {finding.description}")
# Signal classification with confidence
if finding.signal_strength:
confidence_label = (finding.signal_confidence or 'low').capitalize()
strength_label = finding.signal_strength.replace('_', ' ').title()
lines.append(f" Signal: {strength_label} (Confidence: {confidence_label})")
lines.append(f" Assessment: {finding.description}")
# Interpretation with hedged language
if finding.signal_interpretation:
lines.append(f" Interpretation: {finding.signal_interpretation}")
if finding.indicators:
lines.append(" Indicators:")
for ind in finding.indicators[:5]: # Limit to 5 indicators
lines.append(f" - {ind.get('type', 'unknown')}: {ind.get('description', '')}")
lines.append(f" Recommended Action: {finding.recommended_action}")
if finding.playbook_reference:
lines.append(f" Reference: {finding.playbook_reference}")
# Include relevant caveats for high-interest findings
if finding.signal_caveats and finding.risk_level == 'high_interest':
lines.append(" Note: " + finding.signal_caveats[0])
lines.append("")
return "\n".join(lines)
@@ -345,6 +380,13 @@ def generate_pdf_content(report: TSCMReport) -> str:
sections.append(f" - {limit}")
sections.append("")
# Signal Analysis Note
sections.append("-" * 70)
sections.append("SIGNAL ANALYSIS METHODOLOGY")
sections.append("=" * 27)
sections.append(SIGNAL_ANALYSIS_DISCLAIMER.strip())
sections.append("")
# Disclaimer
sections.append("-" * 70)
sections.append(REPORT_DISCLAIMER)
@@ -407,6 +449,12 @@ def generate_technical_annex_json(report: TSCMReport) -> dict:
'description': f.description,
'indicators': f.indicators,
'recommended_action': f.recommended_action,
'signal_classification': {
'strength': f.signal_strength,
'confidence': f.signal_confidence,
'interpretation': f.signal_interpretation,
'caveats': f.signal_caveats,
},
}
for f in report.high_interest_findings
],
@@ -418,6 +466,12 @@ def generate_technical_annex_json(report: TSCMReport) -> dict:
'risk_score': f.risk_score,
'description': f.description,
'indicators': f.indicators,
'signal_classification': {
'strength': f.signal_strength,
'confidence': f.signal_confidence,
'interpretation': f.signal_interpretation,
'caveats': f.signal_caveats,
},
}
for f in report.needs_review_findings
],
@@ -504,7 +558,11 @@ def generate_technical_annex_csv(report: TSCMReport) -> str:
# Also add findings summary
writer.writerow([])
writer.writerow(['--- FINDINGS SUMMARY ---'])
writer.writerow(['identifier', 'protocol', 'risk_level', 'risk_score', 'description', 'recommended_action'])
writer.writerow([
'identifier', 'protocol', 'risk_level', 'risk_score',
'signal_strength', 'signal_confidence',
'description', 'interpretation', 'recommended_action'
])
all_findings = (
report.high_interest_findings +
@@ -517,7 +575,10 @@ def generate_technical_annex_csv(report: TSCMReport) -> str:
finding.protocol,
finding.risk_level,
finding.risk_score,
finding.signal_strength or '',
finding.signal_confidence or '',
finding.description,
finding.signal_interpretation or '',
finding.recommended_action,
])
@@ -591,6 +652,9 @@ class TSCMReportBuilder:
def add_findings_from_profiles(self, profiles: list[dict]) -> 'TSCMReportBuilder':
"""Add findings from correlation engine device profiles."""
for profile in profiles:
# Get signal classification data
signal_data = self._classify_finding_signal(profile)
finding = ReportFinding(
identifier=profile.get('identifier', ''),
protocol=profile.get('protocol', ''),
@@ -601,26 +665,90 @@ class TSCMReportBuilder:
indicators=profile.get('indicators', []),
recommended_action=profile.get('recommended_action', 'monitor'),
playbook_reference=self._get_playbook_reference(profile),
signal_strength=signal_data['signal_strength'],
signal_confidence=signal_data['signal_confidence'],
signal_interpretation=signal_data['signal_interpretation'],
signal_caveats=signal_data['signal_caveats'],
)
self.add_finding(finding)
return self
def _generate_finding_description(self, profile: dict) -> str:
"""Generate description from profile indicators."""
"""Generate description from profile indicators using hedged language."""
indicators = profile.get('indicators', [])
if not indicators:
return f"{profile.get('protocol', 'Unknown').upper()} device detected"
protocol = profile.get('protocol', 'Unknown').upper()
# Use first indicator as primary description
# Get signal data for context
rssi = profile.get('rssi_mean') or profile.get('rssi')
duration = profile.get('observation_duration_seconds')
observation_count = profile.get('observation_count', 1)
# Assess signal to determine confidence
assessment = assess_signal(rssi, duration, observation_count)
confidence = assessment.confidence
if not indicators:
# Use hedged language based on confidence
return generate_hedged_statement(
f"Observed {protocol} signal",
'device_presence',
confidence
)
# Build description with hedged language
primary = indicators[0]
desc = primary.get('description', 'Pattern detected')
indicator_type = primary.get('type', 'pattern')
# Map indicator types to hedged descriptions
if indicator_type in ('airtag_detected', 'tile_detected', 'smarttag_detected', 'known_tracker'):
desc = generate_hedged_statement(
f"{protocol} signal characteristics",
'device_presence',
confidence
)
desc += f" - pattern consistent with {indicator_type.replace('_', ' ')}"
elif indicator_type == 'audio_capable':
desc = generate_hedged_statement(
"Device characteristics",
'surveillance_indicator',
confidence
)
desc += " - audio-capable device type identified"
elif indicator_type in ('hidden_identity', 'hidden_ssid'):
desc = generate_hedged_statement(
"Network configuration",
'surveillance_indicator',
confidence
)
desc += " - concealed identity pattern observed"
else:
desc = generate_hedged_statement(
f"{protocol} signal pattern",
'device_presence',
confidence
)
if len(indicators) > 1:
desc += f" (+{len(indicators) - 1} additional indicators)"
return desc
def _classify_finding_signal(self, profile: dict) -> dict:
"""Extract signal classification data for a finding."""
rssi = profile.get('rssi_mean') or profile.get('rssi')
duration = profile.get('observation_duration_seconds')
observation_count = profile.get('observation_count', 1)
assessment = assess_signal(rssi, duration, observation_count)
return {
'signal_strength': assessment.signal_strength.value,
'signal_confidence': assessment.confidence.value,
'signal_interpretation': assessment.interpretation,
'signal_caveats': assessment.caveats,
}
def _get_playbook_reference(self, profile: dict) -> str:
"""Get playbook reference based on profile."""
risk_level = profile.get('risk_level', 'informational')
@@ -761,27 +889,27 @@ def generate_report(
# Add findings from profiles
builder.add_findings_from_profiles(device_profiles)
# Statistics
results = sweep_data.get('results', {})
wifi_count = results.get('wifi_count')
if wifi_count is None:
wifi_count = len(results.get('wifi_devices', results.get('wifi', [])))
bt_count = results.get('bt_count')
if bt_count is None:
bt_count = len(results.get('bt_devices', results.get('bluetooth', [])))
rf_count = results.get('rf_count')
if rf_count is None:
rf_count = len(results.get('rf_signals', results.get('rf', [])))
builder.add_statistics(
wifi=wifi_count,
bluetooth=bt_count,
rf=rf_count,
new=baseline_diff.get('summary', {}).get('new_devices', 0) if baseline_diff else 0,
missing=baseline_diff.get('summary', {}).get('missing_devices', 0) if baseline_diff else 0,
)
# Statistics
results = sweep_data.get('results', {})
wifi_count = results.get('wifi_count')
if wifi_count is None:
wifi_count = len(results.get('wifi_devices', results.get('wifi', [])))
bt_count = results.get('bt_count')
if bt_count is None:
bt_count = len(results.get('bt_devices', results.get('bluetooth', [])))
rf_count = results.get('rf_count')
if rf_count is None:
rf_count = len(results.get('rf_signals', results.get('rf', [])))
builder.add_statistics(
wifi=wifi_count,
bluetooth=bt_count,
rf=rf_count,
new=baseline_diff.get('summary', {}).get('new_devices', 0) if baseline_diff else 0,
missing=baseline_diff.get('summary', {}).get('missing_devices', 0) if baseline_diff else 0,
)
# Technical data
builder.add_device_timelines(timelines)