From 234f254f4f7e165130b1e7bae9b69508b03ccd3f Mon Sep 17 00:00:00 2001 From: Smittix Date: Fri, 16 Jan 2026 16:06:18 +0000 Subject: [PATCH] Add comprehensive TSCM advanced features Implement 9 major TSCM feature enhancements: 1. Capability & Coverage Reality Panel - Exposes what sweeps can/cannot detect based on OS, privileges, adapters, and SDR limits 2. Baseline Diff & Health - Shows changes vs baseline with health scoring (healthy/noisy/stale) based on age and device churn 3. Per-Device Timelines - Time-bucketed observations with RSSI stability, movement patterns, and meeting correlation 4. Whitelist/Known-Good Registry + Case Grouping - Global and per-location device registry with case management for sweeps/threats/notes 5. Meeting-Window Summary Enhancements - Tracks devices first seen during meetings with scoring modifiers 6. Client-Ready PDF Report + Technical Annex - Executive summary, findings by risk tier, JSON/CSV annex export 7. WiFi Advanced Indicators - Evil twin detection, probe request tracking, deauth burst detection (auto-disables without monitor mode) 8. Bluetooth Risk Explainability - Proximity estimates, tracker brand explanations, human-readable risk descriptions 9. Operator Playbooks - Procedural guidance by risk level with steps, safety notes, and documentation requirements All features include mandatory disclaimers, preserve existing architecture, and follow TSCM best practices (no packet capture, no surveillance claims). Co-Authored-By: Claude Opus 4.5 --- routes/tscm.py | 959 ++++++++++++++++++ utils/database.py | 636 ++++++++++++ utils/tscm/advanced.py | 2152 ++++++++++++++++++++++++++++++++++++++++ utils/tscm/reports.py | 813 +++++++++++++++ 4 files changed, 4560 insertions(+) create mode 100644 utils/tscm/advanced.py create mode 100644 utils/tscm/reports.py diff --git a/routes/tscm.py b/routes/tscm.py index ec2a8e6..6493cad 100644 --- a/routes/tscm.py +++ b/routes/tscm.py @@ -2274,3 +2274,962 @@ def get_cluster_detail(cluster_id: str): except Exception as e: logger.error(f"Get cluster detail error: {e}") return jsonify({'status': 'error', 'message': str(e)}), 500 + + +# ============================================================================= +# Capabilities & Coverage Endpoints +# ============================================================================= + +@tscm_bp.route('/capabilities') +def get_capabilities(): + """ + Get current system capabilities for TSCM sweeping. + + Returns what the system CAN and CANNOT detect based on OS, + privileges, adapters, and SDR hardware. + """ + try: + from utils.tscm.advanced import detect_sweep_capabilities + + wifi_interface = request.args.get('wifi_interface', '') + bt_adapter = request.args.get('bt_adapter', '') + + caps = detect_sweep_capabilities( + wifi_interface=wifi_interface, + bt_adapter=bt_adapter + ) + + return jsonify({ + 'status': 'success', + 'capabilities': caps.to_dict() + }) + + except Exception as e: + logger.error(f"Get capabilities error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/sweep//capabilities') +def get_sweep_stored_capabilities(sweep_id: int): + """Get stored capabilities for a specific sweep.""" + from utils.database import get_sweep_capabilities + + caps = get_sweep_capabilities(sweep_id) + if not caps: + return jsonify({'status': 'error', 'message': 'No capabilities stored for this sweep'}), 404 + + return jsonify({ + 'status': 'success', + 'capabilities': caps + }) + + +# ============================================================================= +# Baseline Diff & Health Endpoints +# ============================================================================= + +@tscm_bp.route('/baseline/diff//') +def get_baseline_diff(baseline_id: int, sweep_id: int): + """ + Get comprehensive diff between a baseline and a sweep. + + Shows new devices, missing devices, changed characteristics, + and baseline health assessment. + """ + try: + from utils.tscm.advanced import calculate_baseline_diff + + baseline = get_tscm_baseline(baseline_id) + if not baseline: + return jsonify({'status': 'error', 'message': 'Baseline not found'}), 404 + + sweep = get_tscm_sweep(sweep_id) + if not sweep: + return jsonify({'status': 'error', 'message': 'Sweep not found'}), 404 + + # Get current devices from sweep results + results = sweep.get('results', {}) + if isinstance(results, str): + import json + results = json.loads(results) + + current_wifi = results.get('wifi', []) + current_bt = results.get('bluetooth', []) + current_rf = results.get('rf', []) + + diff = calculate_baseline_diff( + baseline=baseline, + current_wifi=current_wifi, + current_bt=current_bt, + current_rf=current_rf, + sweep_id=sweep_id + ) + + return jsonify({ + 'status': 'success', + 'diff': diff.to_dict() + }) + + except Exception as e: + logger.error(f"Get baseline diff error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/baseline//health') +def get_baseline_health(baseline_id: int): + """Get health assessment for a baseline.""" + try: + from utils.tscm.advanced import BaselineHealth + from datetime import datetime + + baseline = get_tscm_baseline(baseline_id) + if not baseline: + return jsonify({'status': 'error', 'message': 'Baseline not found'}), 404 + + # Calculate age + created_at = baseline.get('created_at') + age_hours = 0 + if created_at: + if isinstance(created_at, str): + created = datetime.fromisoformat(created_at.replace('Z', '+00:00')) + age_hours = (datetime.now() - created.replace(tzinfo=None)).total_seconds() / 3600 + elif isinstance(created_at, datetime): + age_hours = (datetime.now() - created_at).total_seconds() / 3600 + + # Count devices + total_devices = ( + len(baseline.get('wifi_networks', [])) + + len(baseline.get('bt_devices', [])) + + len(baseline.get('rf_frequencies', [])) + ) + + # Determine health + health = 'healthy' + score = 1.0 + reasons = [] + + if age_hours > 168: + health = 'stale' + score = 0.3 + reasons.append(f'Baseline is {age_hours:.0f} hours old (over 1 week)') + elif age_hours > 72: + health = 'noisy' + score = 0.6 + reasons.append(f'Baseline is {age_hours:.0f} hours old (over 3 days)') + + if total_devices < 3: + score -= 0.2 + reasons.append(f'Baseline has few devices ({total_devices})') + if health == 'healthy': + health = 'noisy' + + return jsonify({ + 'status': 'success', + 'health': { + 'status': health, + 'score': round(max(0, score), 2), + 'age_hours': round(age_hours, 1), + 'total_devices': total_devices, + 'reasons': reasons, + } + }) + + except Exception as e: + logger.error(f"Get baseline health error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +# ============================================================================= +# Device Timeline Endpoints +# ============================================================================= + +@tscm_bp.route('/device//timeline') +def get_device_timeline_endpoint(identifier: str): + """ + Get timeline of observations for a device. + + Shows behavior over time including RSSI stability, presence, + and meeting window correlation. + """ + try: + from utils.tscm.advanced import get_timeline_manager + from utils.database import get_device_timeline + + protocol = request.args.get('protocol', 'bluetooth') + since_hours = request.args.get('since_hours', 24, type=int) + + # Try in-memory timeline first + manager = get_timeline_manager() + timeline = manager.get_timeline(identifier, protocol) + + # Also get stored timeline from database + stored = get_device_timeline(identifier, since_hours=since_hours) + + result = { + 'identifier': identifier, + 'protocol': protocol, + 'observations': stored, + } + + if timeline: + result['metrics'] = { + 'first_seen': timeline.first_seen.isoformat() if timeline.first_seen else None, + 'last_seen': timeline.last_seen.isoformat() if timeline.last_seen else None, + 'total_observations': timeline.total_observations, + 'presence_ratio': round(timeline.presence_ratio, 2), + } + result['signal'] = { + 'rssi_min': timeline.rssi_min, + 'rssi_max': timeline.rssi_max, + 'rssi_mean': round(timeline.rssi_mean, 1) if timeline.rssi_mean else None, + 'stability': round(timeline.rssi_stability, 2), + } + result['movement'] = { + 'appears_stationary': timeline.appears_stationary, + 'pattern': timeline.movement_pattern, + } + result['meeting_correlation'] = { + 'correlated': timeline.meeting_correlated, + 'observations_during_meeting': timeline.meeting_observations, + } + + return jsonify({ + 'status': 'success', + 'timeline': result + }) + + except Exception as e: + logger.error(f"Get device timeline error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/timelines') +def get_all_device_timelines(): + """Get all device timelines.""" + try: + from utils.tscm.advanced import get_timeline_manager + + manager = get_timeline_manager() + timelines = manager.get_all_timelines() + + return jsonify({ + 'status': 'success', + 'count': len(timelines), + 'timelines': [t.to_dict() for t in timelines] + }) + + except Exception as e: + logger.error(f"Get all timelines error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +# ============================================================================= +# Known-Good Registry (Whitelist) Endpoints +# ============================================================================= + +@tscm_bp.route('/known-devices', methods=['GET']) +def list_known_devices(): + """List all known-good devices.""" + from utils.database import get_all_known_devices + + location = request.args.get('location') + scope = request.args.get('scope') + + devices = get_all_known_devices(location=location, scope=scope) + + return jsonify({ + 'status': 'success', + 'count': len(devices), + 'devices': devices + }) + + +@tscm_bp.route('/known-devices', methods=['POST']) +def add_known_device_endpoint(): + """ + Add a device to the known-good registry. + + Known devices remain visible but receive reduced risk scores. + They are NOT suppressed from reports (preserves audit trail). + """ + from utils.database import add_known_device + + data = request.get_json() or {} + + identifier = data.get('identifier') + protocol = data.get('protocol') + + if not identifier or not protocol: + return jsonify({ + 'status': 'error', + 'message': 'identifier and protocol are required' + }), 400 + + device_id = add_known_device( + identifier=identifier, + protocol=protocol, + name=data.get('name'), + description=data.get('description'), + location=data.get('location'), + scope=data.get('scope', 'global'), + added_by=data.get('added_by'), + score_modifier=data.get('score_modifier', -2), + metadata=data.get('metadata') + ) + + return jsonify({ + 'status': 'success', + 'message': 'Device added to known-good registry', + 'device_id': device_id + }) + + +@tscm_bp.route('/known-devices/', methods=['GET']) +def get_known_device_endpoint(identifier: str): + """Get a known device by identifier.""" + from utils.database import get_known_device + + device = get_known_device(identifier) + if not device: + return jsonify({'status': 'error', 'message': 'Device not found'}), 404 + + return jsonify({ + 'status': 'success', + 'device': device + }) + + +@tscm_bp.route('/known-devices/', methods=['DELETE']) +def delete_known_device_endpoint(identifier: str): + """Remove a device from the known-good registry.""" + from utils.database import delete_known_device + + success = delete_known_device(identifier) + if not success: + return jsonify({'status': 'error', 'message': 'Device not found'}), 404 + + return jsonify({ + 'status': 'success', + 'message': 'Device removed from known-good registry' + }) + + +@tscm_bp.route('/known-devices/check/') +def check_known_device(identifier: str): + """Check if a device is in the known-good registry.""" + from utils.database import is_known_good_device + + location = request.args.get('location') + result = is_known_good_device(identifier, location=location) + + return jsonify({ + 'status': 'success', + 'is_known': result is not None, + 'details': result + }) + + +# ============================================================================= +# Case Management Endpoints +# ============================================================================= + +@tscm_bp.route('/cases', methods=['GET']) +def list_cases(): + """List all TSCM cases.""" + from utils.database import get_all_tscm_cases + + status = request.args.get('status') + limit = request.args.get('limit', 50, type=int) + + cases = get_all_tscm_cases(status=status, limit=limit) + + return jsonify({ + 'status': 'success', + 'count': len(cases), + 'cases': cases + }) + + +@tscm_bp.route('/cases', methods=['POST']) +def create_case(): + """Create a new TSCM case.""" + from utils.database import create_tscm_case + + data = request.get_json() or {} + + name = data.get('name') + if not name: + return jsonify({'status': 'error', 'message': 'name is required'}), 400 + + case_id = create_tscm_case( + name=name, + description=data.get('description'), + location=data.get('location'), + priority=data.get('priority', 'normal'), + created_by=data.get('created_by'), + metadata=data.get('metadata') + ) + + return jsonify({ + 'status': 'success', + 'message': 'Case created', + 'case_id': case_id + }) + + +@tscm_bp.route('/cases/', methods=['GET']) +def get_case(case_id: int): + """Get a TSCM case with all linked sweeps, threats, and notes.""" + from utils.database import get_tscm_case + + case = get_tscm_case(case_id) + if not case: + return jsonify({'status': 'error', 'message': 'Case not found'}), 404 + + return jsonify({ + 'status': 'success', + 'case': case + }) + + +@tscm_bp.route('/cases/', methods=['PUT']) +def update_case(case_id: int): + """Update a TSCM case.""" + from utils.database import update_tscm_case + + data = request.get_json() or {} + + success = update_tscm_case( + case_id=case_id, + status=data.get('status'), + priority=data.get('priority'), + assigned_to=data.get('assigned_to'), + notes=data.get('notes') + ) + + if not success: + return jsonify({'status': 'error', 'message': 'Case not found'}), 404 + + return jsonify({ + 'status': 'success', + 'message': 'Case updated' + }) + + +@tscm_bp.route('/cases//sweeps/', methods=['POST']) +def link_sweep_to_case(case_id: int, sweep_id: int): + """Link a sweep to a case.""" + from utils.database import add_sweep_to_case + + success = add_sweep_to_case(case_id, sweep_id) + + return jsonify({ + 'status': 'success' if success else 'error', + 'message': 'Sweep linked to case' if success else 'Already linked or not found' + }) + + +@tscm_bp.route('/cases//threats/', methods=['POST']) +def link_threat_to_case(case_id: int, threat_id: int): + """Link a threat to a case.""" + from utils.database import add_threat_to_case + + success = add_threat_to_case(case_id, threat_id) + + return jsonify({ + 'status': 'success' if success else 'error', + 'message': 'Threat linked to case' if success else 'Already linked or not found' + }) + + +@tscm_bp.route('/cases//notes', methods=['POST']) +def add_note_to_case(case_id: int): + """Add a note to a case.""" + from utils.database import add_case_note + + data = request.get_json() or {} + + content = data.get('content') + if not content: + return jsonify({'status': 'error', 'message': 'content is required'}), 400 + + note_id = add_case_note( + case_id=case_id, + content=content, + note_type=data.get('note_type', 'general'), + created_by=data.get('created_by') + ) + + return jsonify({ + 'status': 'success', + 'message': 'Note added', + 'note_id': note_id + }) + + +# ============================================================================= +# Meeting Window Enhanced Endpoints +# ============================================================================= + +@tscm_bp.route('/meeting/start-tracked', methods=['POST']) +def start_tracked_meeting(): + """ + Start a tracked meeting window with database persistence. + + Tracks devices first seen during meeting and behavior changes. + """ + from utils.database import start_meeting_window + from utils.tscm.advanced import get_timeline_manager + + data = request.get_json() or {} + + meeting_id = start_meeting_window( + sweep_id=_current_sweep_id, + name=data.get('name'), + location=data.get('location'), + notes=data.get('notes') + ) + + # Start meeting in correlation engine + correlation = get_correlation_engine() + correlation.start_meeting_window() + + # Start in timeline manager + manager = get_timeline_manager() + manager.start_meeting_window() + + _emit_event('meeting_started', { + 'meeting_id': meeting_id, + 'timestamp': datetime.now().isoformat(), + 'name': data.get('name'), + }) + + return jsonify({ + 'status': 'success', + 'message': 'Tracked meeting window started', + 'meeting_id': meeting_id + }) + + +@tscm_bp.route('/meeting//end', methods=['POST']) +def end_tracked_meeting(meeting_id: int): + """End a tracked meeting window.""" + from utils.database import end_meeting_window + from utils.tscm.advanced import get_timeline_manager + + success = end_meeting_window(meeting_id) + if not success: + return jsonify({'status': 'error', 'message': 'Meeting not found or already ended'}), 404 + + # End in correlation engine + correlation = get_correlation_engine() + correlation.end_meeting_window() + + # End in timeline manager + manager = get_timeline_manager() + manager.end_meeting_window() + + _emit_event('meeting_ended', { + 'meeting_id': meeting_id, + 'timestamp': datetime.now().isoformat() + }) + + return jsonify({ + 'status': 'success', + 'message': 'Meeting window ended' + }) + + +@tscm_bp.route('/meeting//summary') +def get_meeting_summary_endpoint(meeting_id: int): + """Get detailed summary of device activity during a meeting.""" + try: + from utils.database import get_meeting_windows + from utils.tscm.advanced import generate_meeting_summary, get_timeline_manager + + # Get meeting window + windows = get_meeting_windows(_current_sweep_id or 0) + meeting = None + for w in windows: + if w.get('id') == meeting_id: + meeting = w + break + + if not meeting: + return jsonify({'status': 'error', 'message': 'Meeting not found'}), 404 + + # Get timelines and profiles + manager = get_timeline_manager() + timelines = manager.get_all_timelines() + + correlation = get_correlation_engine() + profiles = [p.to_dict() for p in correlation.device_profiles.values()] + + summary = generate_meeting_summary(meeting, timelines, profiles) + + return jsonify({ + 'status': 'success', + 'summary': summary.to_dict() + }) + + except Exception as e: + logger.error(f"Get meeting summary error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/meeting/active') +def get_active_meeting(): + """Get currently active meeting window.""" + from utils.database import get_active_meeting_window + + meeting = get_active_meeting_window(_current_sweep_id) + + return jsonify({ + 'status': 'success', + 'meeting': meeting, + 'is_active': meeting is not None + }) + + +# ============================================================================= +# PDF Report & Technical Annex Endpoints +# ============================================================================= + +@tscm_bp.route('/report/pdf') +def get_pdf_report(): + """ + Generate client-safe PDF report. + + Contains executive summary, findings by risk tier, meeting window + summary, and mandatory disclaimers. + """ + try: + from utils.tscm.reports import generate_report, get_pdf_report + from utils.tscm.advanced import detect_sweep_capabilities, get_timeline_manager + + sweep_id = request.args.get('sweep_id', _current_sweep_id, type=int) + if not sweep_id: + return jsonify({'status': 'error', 'message': 'No sweep specified'}), 400 + + sweep = get_tscm_sweep(sweep_id) + if not sweep: + return jsonify({'status': 'error', 'message': 'Sweep not found'}), 404 + + # Get data for report + correlation = get_correlation_engine() + profiles = [p.to_dict() for p in correlation.device_profiles.values()] + caps = detect_sweep_capabilities().to_dict() + + manager = get_timeline_manager() + timelines = [t.to_dict() for t in manager.get_all_timelines()] + + # Generate report + report = generate_report( + sweep_id=sweep_id, + sweep_data=sweep, + device_profiles=profiles, + capabilities=caps, + timelines=timelines + ) + + pdf_content = get_pdf_report(report) + + return Response( + pdf_content, + mimetype='text/plain', + headers={ + 'Content-Disposition': f'attachment; filename=tscm_report_{sweep_id}.txt' + } + ) + + except Exception as e: + logger.error(f"Generate PDF report error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/report/annex') +def get_technical_annex(): + """ + Generate technical annex (JSON + CSV). + + Contains device timelines, all indicators, and detailed data + for audit purposes. No packet data included. + """ + try: + from utils.tscm.reports import generate_report, get_json_annex, get_csv_annex + from utils.tscm.advanced import detect_sweep_capabilities, get_timeline_manager + + sweep_id = request.args.get('sweep_id', _current_sweep_id, type=int) + format_type = request.args.get('format', 'json') + + if not sweep_id: + return jsonify({'status': 'error', 'message': 'No sweep specified'}), 400 + + sweep = get_tscm_sweep(sweep_id) + if not sweep: + return jsonify({'status': 'error', 'message': 'Sweep not found'}), 404 + + # Get data for report + correlation = get_correlation_engine() + profiles = [p.to_dict() for p in correlation.device_profiles.values()] + caps = detect_sweep_capabilities().to_dict() + + manager = get_timeline_manager() + timelines = [t.to_dict() for t in manager.get_all_timelines()] + + # Generate report + report = generate_report( + sweep_id=sweep_id, + sweep_data=sweep, + device_profiles=profiles, + capabilities=caps, + timelines=timelines + ) + + if format_type == 'csv': + csv_content = get_csv_annex(report) + return Response( + csv_content, + mimetype='text/csv', + headers={ + 'Content-Disposition': f'attachment; filename=tscm_annex_{sweep_id}.csv' + } + ) + else: + annex = get_json_annex(report) + return jsonify({ + 'status': 'success', + 'annex': annex + }) + + except Exception as e: + logger.error(f"Generate technical annex error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +# ============================================================================= +# WiFi Advanced Indicators Endpoints +# ============================================================================= + +@tscm_bp.route('/wifi/advanced-indicators') +def get_wifi_advanced_indicators(): + """ + Get advanced WiFi indicators (Evil Twin, Probes, Deauth). + + These indicators require analysis of WiFi patterns. + Some features require monitor mode. + """ + try: + from utils.tscm.advanced import get_wifi_detector + + detector = get_wifi_detector() + + return jsonify({ + 'status': 'success', + 'indicators': detector.get_all_indicators(), + 'unavailable_features': detector.get_unavailable_features(), + 'disclaimer': ( + "All indicators represent pattern detections, NOT confirmed attacks. " + "Further investigation is required." + ) + }) + + except Exception as e: + logger.error(f"Get WiFi indicators error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/wifi/analyze-network', methods=['POST']) +def analyze_wifi_network(): + """ + Analyze a WiFi network for evil twin patterns. + + Compares against known networks to detect SSID spoofing. + """ + try: + from utils.tscm.advanced import get_wifi_detector + + data = request.get_json() or {} + detector = get_wifi_detector() + + # Set known networks from baseline if available + baseline = get_active_tscm_baseline() + if baseline: + detector.set_known_networks(baseline.get('wifi_networks', [])) + + indicators = detector.analyze_network(data) + + return jsonify({ + 'status': 'success', + 'indicators': [i.to_dict() for i in indicators] + }) + + except Exception as e: + logger.error(f"Analyze WiFi network error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +# ============================================================================= +# Bluetooth Risk Explainability Endpoints +# ============================================================================= + +@tscm_bp.route('/bluetooth//explain') +def explain_bluetooth_risk(identifier: str): + """ + Get human-readable risk explanation for a BLE device. + + Includes proximity estimate, tracker explanation, and + recommended actions. + """ + try: + from utils.tscm.advanced import generate_ble_risk_explanation + + # Get device from correlation engine + correlation = get_correlation_engine() + profile = None + key = f"bluetooth:{identifier.upper()}" + if key in correlation.device_profiles: + profile = correlation.device_profiles[key].to_dict() + + # Try to find device info + device = {'mac': identifier} + if profile: + device['name'] = profile.get('name') + device['rssi'] = profile.get('rssi_samples', [None])[-1] if profile.get('rssi_samples') else None + + # Check meeting status + is_meeting = correlation.is_during_meeting() + + explanation = generate_ble_risk_explanation(device, profile, is_meeting) + + return jsonify({ + 'status': 'success', + 'explanation': explanation.to_dict() + }) + + except Exception as e: + logger.error(f"Explain BLE risk error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/bluetooth//proximity') +def get_bluetooth_proximity(identifier: str): + """Get proximity estimate for a BLE device.""" + try: + from utils.tscm.advanced import estimate_ble_proximity + + rssi = request.args.get('rssi', type=int) + if rssi is None: + # Try to get from correlation engine + correlation = get_correlation_engine() + key = f"bluetooth:{identifier.upper()}" + if key in correlation.device_profiles: + profile = correlation.device_profiles[key] + if profile.rssi_samples: + rssi = profile.rssi_samples[-1] + + if rssi is None: + return jsonify({ + 'status': 'error', + 'message': 'RSSI value required' + }), 400 + + proximity, explanation, distance = estimate_ble_proximity(rssi) + + return jsonify({ + 'status': 'success', + 'proximity': { + 'estimate': proximity.value, + 'explanation': explanation, + 'estimated_distance': distance, + 'rssi_used': rssi, + }, + 'disclaimer': ( + "Proximity estimates are approximate and affected by " + "environment, obstacles, and device characteristics." + ) + }) + + except Exception as e: + logger.error(f"Get BLE proximity error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +# ============================================================================= +# Operator Playbook Endpoints +# ============================================================================= + +@tscm_bp.route('/playbooks') +def list_playbooks(): + """List all available operator playbooks.""" + try: + from utils.tscm.advanced import PLAYBOOKS + + return jsonify({ + 'status': 'success', + 'playbooks': { + pid: pb.to_dict() + for pid, pb in PLAYBOOKS.items() + } + }) + + except Exception as e: + logger.error(f"List playbooks error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/playbooks/') +def get_playbook(playbook_id: str): + """Get a specific playbook.""" + try: + from utils.tscm.advanced import PLAYBOOKS + + if playbook_id not in PLAYBOOKS: + return jsonify({'status': 'error', 'message': 'Playbook not found'}), 404 + + return jsonify({ + 'status': 'success', + 'playbook': PLAYBOOKS[playbook_id].to_dict() + }) + + except Exception as e: + logger.error(f"Get playbook error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/findings//playbook') +def get_finding_playbook(identifier: str): + """Get recommended playbook for a specific finding.""" + try: + from utils.tscm.advanced import get_playbook_for_finding + + # Get profile + correlation = get_correlation_engine() + profile = None + + for protocol in ['bluetooth', 'wifi', 'rf']: + key = f"{protocol}:{identifier.upper()}" + if key in correlation.device_profiles: + profile = correlation.device_profiles[key].to_dict() + break + + if not profile: + return jsonify({'status': 'error', 'message': 'Finding not found'}), 404 + + playbook = get_playbook_for_finding( + risk_level=profile.get('risk_level', 'informational'), + indicators=profile.get('indicators', []) + ) + + return jsonify({ + 'status': 'success', + 'playbook': playbook.to_dict(), + 'suggested_next_steps': [ + f"Step {s.step_number}: {s.action}" + for s in playbook.steps[:3] + ] + }) + + except Exception as e: + logger.error(f"Get finding playbook error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 diff --git a/utils/database.py b/utils/database.py index be79f53..fb9012c 100644 --- a/utils/database.py +++ b/utils/database.py @@ -178,6 +178,123 @@ def init_db() -> None: ) ''') + # TSCM Device Timelines - Periodic observations per device + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_device_timelines ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + device_identifier TEXT NOT NULL, + protocol TEXT NOT NULL, + sweep_id INTEGER, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + rssi INTEGER, + presence BOOLEAN DEFAULT 1, + channel INTEGER, + frequency REAL, + attributes TEXT, + FOREIGN KEY (sweep_id) REFERENCES tscm_sweeps(id) + ) + ''') + + # TSCM Known-Good Registry - Whitelist of expected devices + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_known_devices ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + identifier TEXT NOT NULL UNIQUE, + protocol TEXT NOT NULL, + name TEXT, + description TEXT, + location TEXT, + scope TEXT DEFAULT 'global', + added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + added_by TEXT, + last_verified TIMESTAMP, + score_modifier INTEGER DEFAULT -2, + metadata TEXT + ) + ''') + + # TSCM Cases - Grouping sweeps, threats, and notes + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_cases ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + description TEXT, + location TEXT, + status TEXT DEFAULT 'open', + priority TEXT DEFAULT 'normal', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + closed_at TIMESTAMP, + created_by TEXT, + assigned_to TEXT, + notes TEXT, + metadata TEXT + ) + ''') + + # TSCM Case Sweeps - Link sweeps to cases + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_case_sweeps ( + case_id INTEGER, + sweep_id INTEGER, + added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (case_id, sweep_id), + FOREIGN KEY (case_id) REFERENCES tscm_cases(id), + FOREIGN KEY (sweep_id) REFERENCES tscm_sweeps(id) + ) + ''') + + # TSCM Case Threats - Link threats to cases + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_case_threats ( + case_id INTEGER, + threat_id INTEGER, + added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (case_id, threat_id), + FOREIGN KEY (case_id) REFERENCES tscm_cases(id), + FOREIGN KEY (threat_id) REFERENCES tscm_threats(id) + ) + ''') + + # TSCM Case Notes - Notes attached to cases + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_case_notes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + case_id INTEGER, + content TEXT NOT NULL, + note_type TEXT DEFAULT 'general', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + created_by TEXT, + FOREIGN KEY (case_id) REFERENCES tscm_cases(id) + ) + ''') + + # TSCM Meeting Windows - Track sensitive periods + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_meeting_windows ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + sweep_id INTEGER, + name TEXT, + start_time TIMESTAMP NOT NULL, + end_time TIMESTAMP, + location TEXT, + notes TEXT, + FOREIGN KEY (sweep_id) REFERENCES tscm_sweeps(id) + ) + ''') + + # TSCM Sweep Capabilities - Store sweep capability snapshot + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_sweep_capabilities ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + sweep_id INTEGER UNIQUE, + capabilities TEXT NOT NULL, + limitations TEXT, + recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (sweep_id) REFERENCES tscm_sweeps(id) + ) + ''') + # TSCM indexes for performance conn.execute(''' CREATE INDEX IF NOT EXISTS idx_tscm_threats_sweep @@ -194,6 +311,21 @@ def init_db() -> None: ON tscm_sweeps(baseline_id) ''') + conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_tscm_timelines_device + ON tscm_device_timelines(device_identifier, timestamp) + ''') + + conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_tscm_known_devices_identifier + ON tscm_known_devices(identifier) + ''') + + conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_tscm_cases_status + ON tscm_cases(status, created_at) + ''') + logger.info("Database initialized successfully") @@ -793,3 +925,507 @@ def get_tscm_threat_summary() -> dict: summary['total'] += row['count'] return summary + + +# ============================================================================= +# TSCM Device Timeline Functions +# ============================================================================= + +def add_device_timeline_entry( + device_identifier: str, + protocol: str, + sweep_id: int | None = None, + rssi: int | None = None, + presence: bool = True, + channel: int | None = None, + frequency: float | None = None, + attributes: dict | None = None +) -> int: + """Add a device timeline observation entry.""" + with get_db() as conn: + cursor = conn.execute(''' + INSERT INTO tscm_device_timelines + (device_identifier, protocol, sweep_id, rssi, presence, channel, frequency, attributes) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + device_identifier, protocol, sweep_id, rssi, presence, + channel, frequency, json.dumps(attributes) if attributes else None + )) + return cursor.lastrowid + + +def get_device_timeline( + device_identifier: str, + limit: int = 100, + since_hours: int = 24 +) -> list[dict]: + """Get timeline entries for a device.""" + with get_db() as conn: + cursor = conn.execute(''' + SELECT * FROM tscm_device_timelines + WHERE device_identifier = ? + AND timestamp > datetime('now', ?) + ORDER BY timestamp DESC + LIMIT ? + ''', (device_identifier, f'-{since_hours} hours', limit)) + + results = [] + for row in cursor: + results.append({ + 'id': row['id'], + 'device_identifier': row['device_identifier'], + 'protocol': row['protocol'], + 'sweep_id': row['sweep_id'], + 'timestamp': row['timestamp'], + 'rssi': row['rssi'], + 'presence': bool(row['presence']), + 'channel': row['channel'], + 'frequency': row['frequency'], + 'attributes': json.loads(row['attributes']) if row['attributes'] else None + }) + return list(reversed(results)) + + +def cleanup_old_timeline_entries(max_age_hours: int = 72) -> int: + """Remove old timeline entries.""" + with get_db() as conn: + cursor = conn.execute(''' + DELETE FROM tscm_device_timelines + WHERE timestamp < datetime('now', ?) + ''', (f'-{max_age_hours} hours',)) + return cursor.rowcount + + +# ============================================================================= +# TSCM Known-Good Registry Functions +# ============================================================================= + +def add_known_device( + identifier: str, + protocol: str, + name: str | None = None, + description: str | None = None, + location: str | None = None, + scope: str = 'global', + added_by: str | None = None, + score_modifier: int = -2, + metadata: dict | None = None +) -> int: + """Add a device to the known-good registry.""" + with get_db() as conn: + cursor = conn.execute(''' + INSERT INTO tscm_known_devices + (identifier, protocol, name, description, location, scope, added_by, score_modifier, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(identifier) DO UPDATE SET + name = excluded.name, + description = excluded.description, + location = excluded.location, + scope = excluded.scope, + score_modifier = excluded.score_modifier, + metadata = excluded.metadata, + last_verified = CURRENT_TIMESTAMP + ''', ( + identifier.upper(), protocol, name, description, location, + scope, added_by, score_modifier, json.dumps(metadata) if metadata else None + )) + return cursor.lastrowid + + +def get_known_device(identifier: str) -> dict | None: + """Get a known device by identifier.""" + with get_db() as conn: + cursor = conn.execute( + 'SELECT * FROM tscm_known_devices WHERE identifier = ?', + (identifier.upper(),) + ) + row = cursor.fetchone() + if not row: + return None + return { + 'id': row['id'], + 'identifier': row['identifier'], + 'protocol': row['protocol'], + 'name': row['name'], + 'description': row['description'], + 'location': row['location'], + 'scope': row['scope'], + 'added_at': row['added_at'], + 'added_by': row['added_by'], + 'last_verified': row['last_verified'], + 'score_modifier': row['score_modifier'], + 'metadata': json.loads(row['metadata']) if row['metadata'] else None + } + + +def get_all_known_devices( + location: str | None = None, + scope: str | None = None +) -> list[dict]: + """Get all known devices, optionally filtered by location or scope.""" + conditions = [] + params = [] + + if location: + conditions.append('(location = ? OR scope = ?)') + params.extend([location, 'global']) + if scope: + conditions.append('scope = ?') + params.append(scope) + + where_clause = f'WHERE {" AND ".join(conditions)}' if conditions else '' + + with get_db() as conn: + cursor = conn.execute(f''' + SELECT * FROM tscm_known_devices + {where_clause} + ORDER BY added_at DESC + ''', params) + + return [ + { + 'id': row['id'], + 'identifier': row['identifier'], + 'protocol': row['protocol'], + 'name': row['name'], + 'description': row['description'], + 'location': row['location'], + 'scope': row['scope'], + 'added_at': row['added_at'], + 'added_by': row['added_by'], + 'last_verified': row['last_verified'], + 'score_modifier': row['score_modifier'], + 'metadata': json.loads(row['metadata']) if row['metadata'] else None + } + for row in cursor + ] + + +def delete_known_device(identifier: str) -> bool: + """Remove a device from the known-good registry.""" + with get_db() as conn: + cursor = conn.execute( + 'DELETE FROM tscm_known_devices WHERE identifier = ?', + (identifier.upper(),) + ) + return cursor.rowcount > 0 + + +def is_known_good_device(identifier: str, location: str | None = None) -> dict | None: + """Check if a device is in the known-good registry for a location.""" + with get_db() as conn: + if location: + cursor = conn.execute(''' + SELECT * FROM tscm_known_devices + WHERE identifier = ? AND (location = ? OR scope = 'global') + ''', (identifier.upper(), location)) + else: + cursor = conn.execute( + 'SELECT * FROM tscm_known_devices WHERE identifier = ?', + (identifier.upper(),) + ) + row = cursor.fetchone() + if not row: + return None + return { + 'identifier': row['identifier'], + 'name': row['name'], + 'score_modifier': row['score_modifier'], + 'scope': row['scope'] + } + + +# ============================================================================= +# TSCM Case Functions +# ============================================================================= + +def create_tscm_case( + name: str, + description: str | None = None, + location: str | None = None, + priority: str = 'normal', + created_by: str | None = None, + metadata: dict | None = None +) -> int: + """Create a new TSCM case.""" + with get_db() as conn: + cursor = conn.execute(''' + INSERT INTO tscm_cases + (name, description, location, priority, created_by, metadata) + VALUES (?, ?, ?, ?, ?, ?) + ''', (name, description, location, priority, created_by, + json.dumps(metadata) if metadata else None)) + return cursor.lastrowid + + +def get_tscm_case(case_id: int) -> dict | None: + """Get a TSCM case by ID.""" + with get_db() as conn: + cursor = conn.execute('SELECT * FROM tscm_cases WHERE id = ?', (case_id,)) + row = cursor.fetchone() + if not row: + return None + + case = { + 'id': row['id'], + 'name': row['name'], + 'description': row['description'], + 'location': row['location'], + 'status': row['status'], + 'priority': row['priority'], + 'created_at': row['created_at'], + 'updated_at': row['updated_at'], + 'closed_at': row['closed_at'], + 'created_by': row['created_by'], + 'assigned_to': row['assigned_to'], + 'notes': row['notes'], + 'metadata': json.loads(row['metadata']) if row['metadata'] else None, + 'sweeps': [], + 'threats': [], + 'case_notes': [] + } + + # Get linked sweeps + cursor = conn.execute(''' + SELECT s.* FROM tscm_sweeps s + JOIN tscm_case_sweeps cs ON s.id = cs.sweep_id + WHERE cs.case_id = ? + ORDER BY s.started_at DESC + ''', (case_id,)) + case['sweeps'] = [dict(row) for row in cursor] + + # Get linked threats + cursor = conn.execute(''' + SELECT t.* FROM tscm_threats t + JOIN tscm_case_threats ct ON t.id = ct.threat_id + WHERE ct.case_id = ? + ORDER BY t.detected_at DESC + ''', (case_id,)) + case['threats'] = [dict(row) for row in cursor] + + # Get case notes + cursor = conn.execute(''' + SELECT * FROM tscm_case_notes + WHERE case_id = ? + ORDER BY created_at DESC + ''', (case_id,)) + case['case_notes'] = [dict(row) for row in cursor] + + return case + + +def get_all_tscm_cases( + status: str | None = None, + limit: int = 50 +) -> list[dict]: + """Get all TSCM cases.""" + conditions = [] + params = [] + + if status: + conditions.append('status = ?') + params.append(status) + + where_clause = f'WHERE {" AND ".join(conditions)}' if conditions else '' + params.append(limit) + + with get_db() as conn: + cursor = conn.execute(f''' + SELECT * FROM tscm_cases + {where_clause} + ORDER BY updated_at DESC + LIMIT ? + ''', params) + return [dict(row) for row in cursor] + + +def update_tscm_case( + case_id: int, + status: str | None = None, + priority: str | None = None, + assigned_to: str | None = None, + notes: str | None = None +) -> bool: + """Update a TSCM case.""" + updates = ['updated_at = CURRENT_TIMESTAMP'] + params = [] + + if status: + updates.append('status = ?') + params.append(status) + if status == 'closed': + updates.append('closed_at = CURRENT_TIMESTAMP') + if priority: + updates.append('priority = ?') + params.append(priority) + if assigned_to is not None: + updates.append('assigned_to = ?') + params.append(assigned_to) + if notes is not None: + updates.append('notes = ?') + params.append(notes) + + params.append(case_id) + + with get_db() as conn: + cursor = conn.execute( + f'UPDATE tscm_cases SET {", ".join(updates)} WHERE id = ?', + params + ) + return cursor.rowcount > 0 + + +def add_sweep_to_case(case_id: int, sweep_id: int) -> bool: + """Link a sweep to a case.""" + with get_db() as conn: + try: + conn.execute(''' + INSERT INTO tscm_case_sweeps (case_id, sweep_id) + VALUES (?, ?) + ''', (case_id, sweep_id)) + conn.execute( + 'UPDATE tscm_cases SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', + (case_id,) + ) + return True + except sqlite3.IntegrityError: + return False + + +def add_threat_to_case(case_id: int, threat_id: int) -> bool: + """Link a threat to a case.""" + with get_db() as conn: + try: + conn.execute(''' + INSERT INTO tscm_case_threats (case_id, threat_id) + VALUES (?, ?) + ''', (case_id, threat_id)) + conn.execute( + 'UPDATE tscm_cases SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', + (case_id,) + ) + return True + except sqlite3.IntegrityError: + return False + + +def add_case_note( + case_id: int, + content: str, + note_type: str = 'general', + created_by: str | None = None +) -> int: + """Add a note to a case.""" + with get_db() as conn: + cursor = conn.execute(''' + INSERT INTO tscm_case_notes (case_id, content, note_type, created_by) + VALUES (?, ?, ?, ?) + ''', (case_id, content, note_type, created_by)) + conn.execute( + 'UPDATE tscm_cases SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', + (case_id,) + ) + return cursor.lastrowid + + +# ============================================================================= +# TSCM Meeting Window Functions +# ============================================================================= + +def start_meeting_window( + sweep_id: int | None = None, + name: str | None = None, + location: str | None = None, + notes: str | None = None +) -> int: + """Start a meeting window.""" + with get_db() as conn: + cursor = conn.execute(''' + INSERT INTO tscm_meeting_windows (sweep_id, name, start_time, location, notes) + VALUES (?, ?, CURRENT_TIMESTAMP, ?, ?) + ''', (sweep_id, name, location, notes)) + return cursor.lastrowid + + +def end_meeting_window(meeting_id: int) -> bool: + """End a meeting window.""" + with get_db() as conn: + cursor = conn.execute(''' + UPDATE tscm_meeting_windows + SET end_time = CURRENT_TIMESTAMP + WHERE id = ? AND end_time IS NULL + ''', (meeting_id,)) + return cursor.rowcount > 0 + + +def get_active_meeting_window(sweep_id: int | None = None) -> dict | None: + """Get currently active meeting window.""" + with get_db() as conn: + if sweep_id: + cursor = conn.execute(''' + SELECT * FROM tscm_meeting_windows + WHERE sweep_id = ? AND end_time IS NULL + ORDER BY start_time DESC LIMIT 1 + ''', (sweep_id,)) + else: + cursor = conn.execute(''' + SELECT * FROM tscm_meeting_windows + WHERE end_time IS NULL + ORDER BY start_time DESC LIMIT 1 + ''') + row = cursor.fetchone() + if row: + return dict(row) + return None + + +def get_meeting_windows(sweep_id: int) -> list[dict]: + """Get all meeting windows for a sweep.""" + with get_db() as conn: + cursor = conn.execute(''' + SELECT * FROM tscm_meeting_windows + WHERE sweep_id = ? + ORDER BY start_time + ''', (sweep_id,)) + return [dict(row) for row in cursor] + + +# ============================================================================= +# TSCM Sweep Capabilities Functions +# ============================================================================= + +def save_sweep_capabilities( + sweep_id: int, + capabilities: dict, + limitations: list[str] | None = None +) -> int: + """Save sweep capabilities snapshot.""" + with get_db() as conn: + cursor = conn.execute(''' + INSERT INTO tscm_sweep_capabilities (sweep_id, capabilities, limitations) + VALUES (?, ?, ?) + ON CONFLICT(sweep_id) DO UPDATE SET + capabilities = excluded.capabilities, + limitations = excluded.limitations, + recorded_at = CURRENT_TIMESTAMP + ''', (sweep_id, json.dumps(capabilities), + json.dumps(limitations) if limitations else None)) + return cursor.lastrowid + + +def get_sweep_capabilities(sweep_id: int) -> dict | None: + """Get capabilities for a sweep.""" + with get_db() as conn: + cursor = conn.execute( + 'SELECT * FROM tscm_sweep_capabilities WHERE sweep_id = ?', + (sweep_id,) + ) + row = cursor.fetchone() + if not row: + return None + return { + 'sweep_id': row['sweep_id'], + 'capabilities': json.loads(row['capabilities']), + 'limitations': json.loads(row['limitations']) if row['limitations'] else [], + 'recorded_at': row['recorded_at'] + } diff --git a/utils/tscm/advanced.py b/utils/tscm/advanced.py new file mode 100644 index 0000000..3c7954e --- /dev/null +++ b/utils/tscm/advanced.py @@ -0,0 +1,2152 @@ +""" +TSCM Advanced Features Module + +Implements: +1. Capability & Coverage Reality Panel +2. Baseline Diff & Baseline Health +3. Per-Device Timelines +4. Meeting-Window Summary Enhancements +5. WiFi Advanced Indicators (Evil Twin, Probes, Deauth) +6. Bluetooth Risk Explainability & Proximity Heuristics +7. Operator Playbooks + +DISCLAIMER: This system performs wireless and RF surveillance screening. +Findings indicate anomalies and indicators, not confirmed surveillance devices. +All claims are probabilistic pattern matches requiring professional verification. +""" + +from __future__ import annotations + +import logging +import os +import platform +import subprocess +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from enum import Enum +from typing import Any, Optional + +logger = logging.getLogger('intercept.tscm.advanced') + + +# ============================================================================= +# 1. Capability & Coverage Reality Panel +# ============================================================================= + +class WifiMode(Enum): + """WiFi adapter operating modes.""" + MONITOR = 'monitor' + MANAGED = 'managed' + UNAVAILABLE = 'unavailable' + + +class BluetoothMode(Enum): + """Bluetooth adapter capabilities.""" + BLE_CLASSIC = 'ble_classic' + BLE_ONLY = 'ble_only' + LIMITED = 'limited' + UNAVAILABLE = 'unavailable' + + +@dataclass +class RFCapability: + """RF/SDR device capabilities.""" + device_type: str = 'none' + driver: str = '' + min_frequency_mhz: float = 0.0 + max_frequency_mhz: float = 0.0 + sample_rate_max: int = 0 + available: bool = False + limitations: list[str] = field(default_factory=list) + + +@dataclass +class SweepCapabilities: + """ + Complete capabilities snapshot for a TSCM sweep. + + Exposes what the current sweep CAN and CANNOT detect based on + OS, privileges, adapters, and SDR hardware limits. + """ + # System info + os_name: str = '' + os_version: str = '' + is_root: bool = False + + # WiFi capabilities + wifi_mode: WifiMode = WifiMode.UNAVAILABLE + wifi_interface: str = '' + wifi_driver: str = '' + wifi_monitor_capable: bool = False + wifi_limitations: list[str] = field(default_factory=list) + + # Bluetooth capabilities + bt_mode: BluetoothMode = BluetoothMode.UNAVAILABLE + bt_adapter: str = '' + bt_version: str = '' + bt_limitations: list[str] = field(default_factory=list) + + # RF/SDR capabilities + rf_capability: RFCapability = field(default_factory=RFCapability) + + # Overall limitations + all_limitations: list[str] = field(default_factory=list) + + # Timestamp + captured_at: datetime = field(default_factory=datetime.now) + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization.""" + return { + 'system': { + 'os': self.os_name, + 'os_version': self.os_version, + 'is_root': self.is_root, + }, + 'wifi': { + 'mode': self.wifi_mode.value, + 'interface': self.wifi_interface, + 'driver': self.wifi_driver, + 'monitor_capable': self.wifi_monitor_capable, + 'limitations': self.wifi_limitations, + }, + 'bluetooth': { + 'mode': self.bt_mode.value, + 'adapter': self.bt_adapter, + 'version': self.bt_version, + 'limitations': self.bt_limitations, + }, + 'rf': { + 'device_type': self.rf_capability.device_type, + 'driver': self.rf_capability.driver, + 'frequency_range_mhz': { + 'min': self.rf_capability.min_frequency_mhz, + 'max': self.rf_capability.max_frequency_mhz, + }, + 'sample_rate_max': self.rf_capability.sample_rate_max, + 'available': self.rf_capability.available, + 'limitations': self.rf_capability.limitations, + }, + 'all_limitations': self.all_limitations, + 'captured_at': self.captured_at.isoformat(), + 'disclaimer': ( + "Capabilities are detected at sweep start time and may change. " + "Limitations listed affect what this sweep can reliably detect." + ), + } + + +def detect_sweep_capabilities( + wifi_interface: str = '', + bt_adapter: str = '', + sdr_device: Any = None +) -> SweepCapabilities: + """ + Detect current system capabilities for TSCM sweeping. + + Args: + wifi_interface: Specific WiFi interface to check + bt_adapter: Specific BT adapter to check + sdr_device: SDR device object if available + + Returns: + SweepCapabilities object with complete capability assessment + """ + caps = SweepCapabilities() + + # System info + caps.os_name = platform.system() + caps.os_version = platform.release() + caps.is_root = os.geteuid() == 0 if hasattr(os, 'geteuid') else False + + # Detect WiFi capabilities + _detect_wifi_capabilities(caps, wifi_interface) + + # Detect Bluetooth capabilities + _detect_bluetooth_capabilities(caps, bt_adapter) + + # Detect RF/SDR capabilities + _detect_rf_capabilities(caps, sdr_device) + + # Compile all limitations + caps.all_limitations = ( + caps.wifi_limitations + + caps.bt_limitations + + caps.rf_capability.limitations + ) + + # Add privilege-based limitations + if not caps.is_root: + caps.all_limitations.append( + "Running without root privileges - some features may be limited" + ) + + return caps + + +def _detect_wifi_capabilities(caps: SweepCapabilities, interface: str) -> None: + """Detect WiFi adapter capabilities.""" + caps.wifi_interface = interface + + if platform.system() == 'Darwin': + # macOS: Check airport utility + airport_path = '/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport' + if os.path.exists(airport_path): + caps.wifi_mode = WifiMode.MANAGED + caps.wifi_driver = 'apple80211' + caps.wifi_monitor_capable = False + caps.wifi_limitations = [ + "Passive WiFi frame analysis is not available in this sweep.", + "macOS WiFi operates in managed mode only.", + "Cannot capture probe requests, deauthentication frames, or raw 802.11 headers.", + "Evil twin detection limited to SSID/BSSID comparison only.", + ] + else: + caps.wifi_mode = WifiMode.UNAVAILABLE + caps.wifi_limitations = ["WiFi scanning unavailable - no interface found"] + + else: + # Linux: Check for monitor mode capability + try: + # Check if interface supports monitor mode + result = subprocess.run( + ['iw', 'list'], + capture_output=True, + text=True, + timeout=5 + ) + + if 'monitor' in result.stdout.lower(): + # Check current mode + if interface: + mode_result = subprocess.run( + ['iw', 'dev', interface, 'info'], + capture_output=True, + text=True, + timeout=5 + ) + if 'type monitor' in mode_result.stdout.lower(): + caps.wifi_mode = WifiMode.MONITOR + caps.wifi_monitor_capable = True + else: + caps.wifi_mode = WifiMode.MANAGED + caps.wifi_monitor_capable = True + caps.wifi_limitations.append( + "WiFi interface in managed mode. " + "Probe requests and deauth detection require monitor mode." + ) + else: + caps.wifi_mode = WifiMode.MANAGED + caps.wifi_monitor_capable = True + else: + caps.wifi_mode = WifiMode.MANAGED + caps.wifi_monitor_capable = False + caps.wifi_limitations = [ + "Passive WiFi frame analysis is not available in this sweep.", + "WiFi adapter does not support monitor mode.", + "Probe request and deauthentication detection unavailable.", + ] + + # Get driver info + if interface: + try: + driver_path = f'/sys/class/net/{interface}/device/driver' + if os.path.exists(driver_path): + caps.wifi_driver = os.path.basename(os.readlink(driver_path)) + except Exception: + pass + + except (subprocess.TimeoutExpired, FileNotFoundError): + caps.wifi_mode = WifiMode.UNAVAILABLE + caps.wifi_limitations = ["WiFi scanning tools not available"] + + +def _detect_bluetooth_capabilities(caps: SweepCapabilities, adapter: str) -> None: + """Detect Bluetooth adapter capabilities.""" + caps.bt_adapter = adapter + + if platform.system() == 'Darwin': + # macOS: Use system_profiler + try: + result = subprocess.run( + ['system_profiler', 'SPBluetoothDataType', '-json'], + capture_output=True, + text=True, + timeout=10 + ) + if 'Bluetooth' in result.stdout: + caps.bt_mode = BluetoothMode.BLE_CLASSIC + caps.bt_version = 'macOS CoreBluetooth' + caps.bt_limitations = [ + "BLE scanning limited to advertising devices only.", + "Classic Bluetooth discovery may be incomplete.", + "Manufacturer data parsing depends on device advertising.", + ] + else: + caps.bt_mode = BluetoothMode.UNAVAILABLE + caps.bt_limitations = ["Bluetooth not available"] + except (subprocess.TimeoutExpired, FileNotFoundError): + caps.bt_mode = BluetoothMode.UNAVAILABLE + caps.bt_limitations = ["Bluetooth detection failed"] + + else: + # Linux: Check bluetoothctl/hciconfig + try: + result = subprocess.run( + ['hciconfig', '-a'], + capture_output=True, + text=True, + timeout=5 + ) + + if 'hci' in result.stdout.lower(): + # Check for BLE support + if 'le' in result.stdout.lower(): + caps.bt_mode = BluetoothMode.BLE_CLASSIC + caps.bt_limitations = [ + "BLE scanning range depends on adapter sensitivity.", + "Some devices may not be detected if not advertising.", + ] + else: + caps.bt_mode = BluetoothMode.LIMITED + caps.bt_limitations = [ + "Adapter may not support BLE scanning.", + "Limited to classic Bluetooth discovery.", + ] + + # Extract version + for line in result.stdout.split('\n'): + if 'hci version' in line.lower(): + caps.bt_version = line.strip() + break + else: + caps.bt_mode = BluetoothMode.UNAVAILABLE + caps.bt_limitations = ["No Bluetooth adapter found"] + + except (subprocess.TimeoutExpired, FileNotFoundError): + caps.bt_mode = BluetoothMode.UNAVAILABLE + caps.bt_limitations = ["Bluetooth tools not available"] + + +def _detect_rf_capabilities(caps: SweepCapabilities, sdr_device: Any) -> None: + """Detect RF/SDR device capabilities.""" + rf_cap = RFCapability() + + try: + from utils.sdr import SDRFactory + devices = SDRFactory.detect_devices() + + if devices: + device = devices[0] # Use first device + rf_cap.available = True + rf_cap.device_type = device.get('type', 'unknown') + rf_cap.driver = device.get('driver', '') + + # Set frequency ranges based on device type + if 'rtl' in rf_cap.device_type.lower(): + rf_cap.min_frequency_mhz = 24.0 + rf_cap.max_frequency_mhz = 1766.0 + rf_cap.sample_rate_max = 3200000 + rf_cap.limitations = [ + "RTL-SDR frequency range: 24-1766 MHz typical.", + "Cannot reliably cover frequencies below 24 MHz.", + "Cannot cover microwave bands (>1.8 GHz) without upconverter.", + "Signal detection limited by SDR noise floor and dynamic range.", + ] + elif 'hackrf' in rf_cap.device_type.lower(): + rf_cap.min_frequency_mhz = 1.0 + rf_cap.max_frequency_mhz = 6000.0 + rf_cap.sample_rate_max = 20000000 + rf_cap.limitations = [ + "HackRF frequency range: 1 MHz - 6 GHz.", + "8-bit ADC limits dynamic range for weak signal detection.", + ] + else: + rf_cap.limitations = [ + f"Unknown SDR type: {rf_cap.device_type}", + "Frequency coverage and capabilities uncertain.", + ] + else: + rf_cap.available = False + rf_cap.device_type = 'none' + rf_cap.limitations = [ + "No SDR device detected.", + "RF spectrum analysis is not available in this sweep.", + "Cannot scan for wireless microphones, bugs, or RF transmitters.", + ] + + except ImportError: + rf_cap.available = False + rf_cap.limitations = [ + "SDR support not installed.", + "RF spectrum analysis unavailable.", + ] + except Exception as e: + rf_cap.available = False + rf_cap.limitations = [f"SDR detection failed: {str(e)}"] + + caps.rf_capability = rf_cap + + +# ============================================================================= +# 2. Baseline Diff & Baseline Health +# ============================================================================= + +class BaselineHealth(Enum): + """Baseline health status.""" + HEALTHY = 'healthy' + NOISY = 'noisy' + STALE = 'stale' + + +@dataclass +class DeviceChange: + """Represents a change detected compared to baseline.""" + identifier: str + protocol: str + change_type: str # 'new', 'missing', 'rssi_drift', 'channel_change', 'security_change' + description: str + expected: bool = False # True if this is an expected/normal change + details: dict = field(default_factory=dict) + + +@dataclass +class BaselineDiff: + """ + Complete diff between a baseline and a sweep. + + Shows what changed, whether baseline is reliable, + and separates expected vs unexpected changes. + """ + baseline_id: int + sweep_id: int + + # Health assessment + health: BaselineHealth = BaselineHealth.HEALTHY + health_score: float = 1.0 # 0-1, higher is healthier + health_reasons: list[str] = field(default_factory=list) + + # Age metrics + baseline_age_hours: float = 0.0 + is_stale: bool = False + + # Device changes + new_devices: list[DeviceChange] = field(default_factory=list) + missing_devices: list[DeviceChange] = field(default_factory=list) + changed_devices: list[DeviceChange] = field(default_factory=list) + + # Summary counts + total_new: int = 0 + total_missing: int = 0 + total_changed: int = 0 + + # Expected vs unexpected + expected_changes: list[DeviceChange] = field(default_factory=list) + unexpected_changes: list[DeviceChange] = field(default_factory=list) + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization.""" + return { + 'baseline_id': self.baseline_id, + 'sweep_id': self.sweep_id, + 'health': { + 'status': self.health.value, + 'score': round(self.health_score, 2), + 'reasons': self.health_reasons, + }, + 'age': { + 'hours': round(self.baseline_age_hours, 1), + 'is_stale': self.is_stale, + }, + 'summary': { + 'new_devices': self.total_new, + 'missing_devices': self.total_missing, + 'changed_devices': self.total_changed, + 'expected_changes': len(self.expected_changes), + 'unexpected_changes': len(self.unexpected_changes), + }, + 'new_devices': [ + {'identifier': d.identifier, 'protocol': d.protocol, + 'description': d.description, 'details': d.details} + for d in self.new_devices + ], + 'missing_devices': [ + {'identifier': d.identifier, 'protocol': d.protocol, + 'description': d.description, 'details': d.details} + for d in self.missing_devices + ], + 'changed_devices': [ + {'identifier': d.identifier, 'protocol': d.protocol, + 'change_type': d.change_type, 'description': d.description, + 'expected': d.expected, 'details': d.details} + for d in self.changed_devices + ], + 'disclaimer': ( + "Baseline comparison shows differences, not confirmed threats. " + "New devices may be legitimate. Missing devices may have been powered off." + ), + } + + +def calculate_baseline_diff( + baseline: dict, + current_wifi: list[dict], + current_bt: list[dict], + current_rf: list[dict], + sweep_id: int +) -> BaselineDiff: + """ + Calculate comprehensive diff between baseline and current scan. + + Args: + baseline: Baseline dict from database + current_wifi: Current WiFi devices + current_bt: Current Bluetooth devices + current_rf: Current RF signals + sweep_id: Current sweep ID + + Returns: + BaselineDiff with complete comparison results + """ + diff = BaselineDiff( + baseline_id=baseline.get('id', 0), + sweep_id=sweep_id + ) + + # Calculate baseline age + created_at = baseline.get('created_at') + if created_at: + if isinstance(created_at, str): + try: + created = datetime.fromisoformat(created_at.replace('Z', '+00:00')) + diff.baseline_age_hours = (datetime.now() - created.replace(tzinfo=None)).total_seconds() / 3600 + except ValueError: + diff.baseline_age_hours = 0 + elif isinstance(created_at, datetime): + diff.baseline_age_hours = (datetime.now() - created_at).total_seconds() / 3600 + + # Check if baseline is stale (>72 hours old) + diff.is_stale = diff.baseline_age_hours > 72 + + # Build baseline lookup dicts + baseline_wifi = { + d.get('bssid', d.get('mac', '')).upper(): d + for d in baseline.get('wifi_networks', []) + if d.get('bssid') or d.get('mac') + } + baseline_bt = { + d.get('mac', d.get('address', '')).upper(): d + for d in baseline.get('bt_devices', []) + if d.get('mac') or d.get('address') + } + baseline_rf = { + round(d.get('frequency', 0), 1): d + for d in baseline.get('rf_frequencies', []) + if d.get('frequency') + } + + # Compare WiFi + _compare_wifi(diff, baseline_wifi, current_wifi) + + # Compare Bluetooth + _compare_bluetooth(diff, baseline_bt, current_bt) + + # Compare RF + _compare_rf(diff, baseline_rf, current_rf) + + # Calculate totals + diff.total_new = len(diff.new_devices) + diff.total_missing = len(diff.missing_devices) + diff.total_changed = len(diff.changed_devices) + + # Separate expected vs unexpected changes + for change in diff.new_devices + diff.missing_devices + diff.changed_devices: + if change.expected: + diff.expected_changes.append(change) + else: + diff.unexpected_changes.append(change) + + # Calculate health + _calculate_baseline_health(diff, baseline) + + return diff + + +def _compare_wifi(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None: + """Compare WiFi devices between baseline and current.""" + current_macs = { + d.get('bssid', d.get('mac', '')).upper(): d + for d in current + if d.get('bssid') or d.get('mac') + } + + # Find new devices + for mac, device in current_macs.items(): + if mac not in baseline: + ssid = device.get('essid', device.get('ssid', 'Hidden')) + diff.new_devices.append(DeviceChange( + identifier=mac, + protocol='wifi', + change_type='new', + description=f'New WiFi AP: {ssid}', + expected=False, + details={ + 'ssid': ssid, + 'channel': device.get('channel'), + 'rssi': device.get('power', device.get('signal')), + } + )) + else: + # Check for changes + baseline_dev = baseline[mac] + changes = [] + + # RSSI drift + curr_rssi = device.get('power', device.get('signal')) + base_rssi = baseline_dev.get('power', baseline_dev.get('signal')) + if curr_rssi and base_rssi: + rssi_diff = abs(int(curr_rssi) - int(base_rssi)) + if rssi_diff > 15: + changes.append(('rssi_drift', f'RSSI changed by {rssi_diff} dBm')) + + # Channel change + curr_chan = device.get('channel') + base_chan = baseline_dev.get('channel') + if curr_chan and base_chan and curr_chan != base_chan: + changes.append(('channel_change', f'Channel changed from {base_chan} to {curr_chan}')) + + # Security change + curr_sec = device.get('encryption', device.get('privacy', '')) + base_sec = baseline_dev.get('encryption', baseline_dev.get('privacy', '')) + if curr_sec and base_sec and curr_sec != base_sec: + changes.append(('security_change', f'Security changed from {base_sec} to {curr_sec}')) + + for change_type, desc in changes: + diff.changed_devices.append(DeviceChange( + identifier=mac, + protocol='wifi', + change_type=change_type, + description=desc, + expected=change_type == 'rssi_drift', # RSSI drift is often expected + details={ + 'ssid': device.get('essid', device.get('ssid')), + 'baseline': baseline_dev, + 'current': device, + } + )) + + # Find missing devices + for mac, device in baseline.items(): + if mac not in current_macs: + ssid = device.get('essid', device.get('ssid', 'Hidden')) + diff.missing_devices.append(DeviceChange( + identifier=mac, + protocol='wifi', + change_type='missing', + description=f'Missing WiFi AP: {ssid}', + expected=False, # Could be powered off + details={ + 'ssid': ssid, + 'last_channel': device.get('channel'), + } + )) + + +def _compare_bluetooth(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None: + """Compare Bluetooth devices between baseline and current.""" + current_macs = { + d.get('mac', d.get('address', '')).upper(): d + for d in current + if d.get('mac') or d.get('address') + } + + # Find new devices + for mac, device in current_macs.items(): + if mac not in baseline: + name = device.get('name', 'Unknown') + diff.new_devices.append(DeviceChange( + identifier=mac, + protocol='bluetooth', + change_type='new', + description=f'New BLE device: {name}', + expected=False, + details={ + 'name': name, + 'rssi': device.get('rssi'), + 'manufacturer': device.get('manufacturer'), + } + )) + else: + # Check for changes + baseline_dev = baseline[mac] + + # Name change (device renamed) + curr_name = device.get('name', '') + base_name = baseline_dev.get('name', '') + if curr_name and base_name and curr_name != base_name: + diff.changed_devices.append(DeviceChange( + identifier=mac, + protocol='bluetooth', + change_type='name_change', + description=f'Device renamed: {base_name} -> {curr_name}', + expected=True, + details={'old_name': base_name, 'new_name': curr_name} + )) + + # Find missing devices + for mac, device in baseline.items(): + if mac not in current_macs: + name = device.get('name', 'Unknown') + diff.missing_devices.append(DeviceChange( + identifier=mac, + protocol='bluetooth', + change_type='missing', + description=f'Missing BLE device: {name}', + expected=True, # BLE devices often go to sleep + details={'name': name} + )) + + +def _compare_rf(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None: + """Compare RF signals between baseline and current.""" + current_freqs = { + round(s.get('frequency', 0), 1): s + for s in current + if s.get('frequency') + } + + # Find new signals + for freq, signal in current_freqs.items(): + if freq not in baseline: + diff.new_devices.append(DeviceChange( + identifier=f'{freq:.1f} MHz', + protocol='rf', + change_type='new', + description=f'New RF signal at {freq:.3f} MHz', + expected=False, + details={ + 'frequency': freq, + 'power': signal.get('power', signal.get('level')), + 'modulation': signal.get('modulation'), + } + )) + + # Find missing signals + for freq, signal in baseline.items(): + if freq not in current_freqs: + diff.missing_devices.append(DeviceChange( + identifier=f'{freq:.1f} MHz', + protocol='rf', + change_type='missing', + description=f'Missing RF signal at {freq:.1f} MHz', + expected=True, # RF signals can be intermittent + details={'frequency': freq} + )) + + +def _calculate_baseline_health(diff: BaselineDiff, baseline: dict) -> None: + """Calculate baseline health score and status.""" + score = 1.0 + reasons = [] + + # Age penalty + if diff.baseline_age_hours > 168: # > 1 week + score -= 0.4 + reasons.append(f"Baseline is {diff.baseline_age_hours:.0f} hours old (>1 week)") + elif diff.baseline_age_hours > 72: # > 3 days + score -= 0.2 + reasons.append(f"Baseline is {diff.baseline_age_hours:.0f} hours old (>3 days)") + elif diff.baseline_age_hours > 24: + score -= 0.1 + reasons.append(f"Baseline is {diff.baseline_age_hours:.0f} hours old") + + # Device churn penalty + total_baseline = ( + len(baseline.get('wifi_networks', [])) + + len(baseline.get('bt_devices', [])) + + len(baseline.get('rf_frequencies', [])) + ) + + if total_baseline > 0: + churn_rate = (diff.total_new + diff.total_missing) / total_baseline + if churn_rate > 0.5: + score -= 0.3 + reasons.append(f"High device churn rate: {churn_rate:.0%}") + elif churn_rate > 0.25: + score -= 0.15 + reasons.append(f"Moderate device churn rate: {churn_rate:.0%}") + + # Small baseline penalty + if total_baseline < 3: + score -= 0.2 + reasons.append(f"Baseline has few devices ({total_baseline}) - may be incomplete") + + # Set health status + diff.health_score = max(0, min(1, score)) + + if diff.health_score >= 0.7: + diff.health = BaselineHealth.HEALTHY + elif diff.health_score >= 0.4: + diff.health = BaselineHealth.NOISY + if not reasons: + reasons.append("Baseline showing moderate variability") + else: + diff.health = BaselineHealth.STALE + if not reasons: + reasons.append("Baseline requires refresh") + + diff.health_reasons = reasons + + +# ============================================================================= +# 3. Per-Device Timelines +# ============================================================================= + +@dataclass +class DeviceObservation: + """A single observation of a device.""" + timestamp: datetime + rssi: Optional[int] = None + present: bool = True + channel: Optional[int] = None + frequency: Optional[float] = None + attributes: dict = field(default_factory=dict) + + +@dataclass +class DeviceTimeline: + """ + Complete timeline for a device showing behavior over time. + + Used to assess signal stability, movement patterns, and + meeting window correlation. + """ + identifier: str + protocol: str + name: Optional[str] = None + + # Observation history (time-bucketed) + observations: list[DeviceObservation] = field(default_factory=list) + + # Computed metrics + first_seen: Optional[datetime] = None + last_seen: Optional[datetime] = None + total_observations: int = 0 + presence_ratio: float = 0.0 # % of time device was present + + # Signal metrics + rssi_min: Optional[int] = None + rssi_max: Optional[int] = None + rssi_mean: Optional[float] = None + rssi_stability: float = 0.0 # 0-1, higher = more stable + + # Movement assessment + appears_stationary: bool = True + movement_pattern: str = 'unknown' # 'stationary', 'mobile', 'intermittent' + + # Meeting correlation + meeting_correlated: bool = False + meeting_observations: int = 0 + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization.""" + return { + 'identifier': self.identifier, + 'protocol': self.protocol, + 'name': self.name, + 'observations': [ + { + 'timestamp': obs.timestamp.isoformat(), + 'rssi': obs.rssi, + 'present': obs.present, + 'channel': obs.channel, + 'frequency': obs.frequency, + } + for obs in self.observations[-50:] # Limit to last 50 + ], + 'metrics': { + 'first_seen': self.first_seen.isoformat() if self.first_seen else None, + 'last_seen': self.last_seen.isoformat() if self.last_seen else None, + 'total_observations': self.total_observations, + 'presence_ratio': round(self.presence_ratio, 2), + }, + 'signal': { + 'rssi_min': self.rssi_min, + 'rssi_max': self.rssi_max, + 'rssi_mean': round(self.rssi_mean, 1) if self.rssi_mean else None, + 'stability': round(self.rssi_stability, 2), + }, + 'movement': { + 'appears_stationary': self.appears_stationary, + 'pattern': self.movement_pattern, + }, + 'meeting_correlation': { + 'correlated': self.meeting_correlated, + 'observations_during_meeting': self.meeting_observations, + }, + } + + +class TimelineManager: + """ + Manages per-device timelines with time-bucketing. + + Buckets observations to keep memory bounded while preserving + useful behavioral patterns. + """ + + def __init__(self, bucket_seconds: int = 30, max_observations: int = 200): + """ + Args: + bucket_seconds: Time bucket size in seconds + max_observations: Maximum observations to keep per device + """ + self.bucket_seconds = bucket_seconds + self.max_observations = max_observations + self.timelines: dict[str, DeviceTimeline] = {} + self._meeting_windows: list[tuple[datetime, Optional[datetime]]] = [] + + def add_observation( + self, + identifier: str, + protocol: str, + rssi: Optional[int] = None, + channel: Optional[int] = None, + frequency: Optional[float] = None, + name: Optional[str] = None, + attributes: Optional[dict] = None + ) -> None: + """Add an observation for a device.""" + key = f"{protocol}:{identifier.upper()}" + now = datetime.now() + + if key not in self.timelines: + self.timelines[key] = DeviceTimeline( + identifier=identifier.upper(), + protocol=protocol, + name=name, + first_seen=now, + ) + + timeline = self.timelines[key] + + # Update name if provided + if name: + timeline.name = name + + # Check if we should bucket with previous observation + if timeline.observations: + last_obs = timeline.observations[-1] + time_diff = (now - last_obs.timestamp).total_seconds() + + if time_diff < self.bucket_seconds: + # Update existing bucket + if rssi is not None: + # Average RSSI + if last_obs.rssi is not None: + last_obs.rssi = (last_obs.rssi + rssi) // 2 + else: + last_obs.rssi = rssi + return + + # Add new observation + obs = DeviceObservation( + timestamp=now, + rssi=rssi, + present=True, + channel=channel, + frequency=frequency, + attributes=attributes or {}, + ) + timeline.observations.append(obs) + + # Enforce max observations + if len(timeline.observations) > self.max_observations: + timeline.observations = timeline.observations[-self.max_observations:] + + # Update metrics + timeline.last_seen = now + timeline.total_observations = len(timeline.observations) + + # Check meeting correlation + if self._is_during_meeting(now): + timeline.meeting_observations += 1 + timeline.meeting_correlated = True + + def start_meeting_window(self) -> None: + """Mark the start of a meeting window.""" + self._meeting_windows.append((datetime.now(), None)) + + def end_meeting_window(self) -> None: + """Mark the end of a meeting window.""" + if self._meeting_windows and self._meeting_windows[-1][1] is None: + start = self._meeting_windows[-1][0] + self._meeting_windows[-1] = (start, datetime.now()) + + def _is_during_meeting(self, timestamp: datetime) -> bool: + """Check if timestamp falls within a meeting window.""" + for start, end in self._meeting_windows: + if end is None: + if timestamp >= start: + return True + elif start <= timestamp <= end: + return True + return False + + def compute_metrics(self, identifier: str, protocol: str) -> Optional[DeviceTimeline]: + """Compute all metrics for a device timeline.""" + key = f"{protocol}:{identifier.upper()}" + if key not in self.timelines: + return None + + timeline = self.timelines[key] + + if not timeline.observations: + return timeline + + # RSSI metrics + rssi_values = [obs.rssi for obs in timeline.observations if obs.rssi is not None] + if rssi_values: + timeline.rssi_min = min(rssi_values) + timeline.rssi_max = max(rssi_values) + timeline.rssi_mean = sum(rssi_values) / len(rssi_values) + + # Calculate stability (0-1) + if len(rssi_values) >= 3: + variance = sum((r - timeline.rssi_mean) ** 2 for r in rssi_values) / len(rssi_values) + timeline.rssi_stability = max(0, 1 - (variance / 100)) + + # Movement assessment based on RSSI variance + rssi_range = timeline.rssi_max - timeline.rssi_min + if rssi_range < 10: + timeline.appears_stationary = True + timeline.movement_pattern = 'stationary' + elif rssi_range < 25: + timeline.appears_stationary = False + timeline.movement_pattern = 'mobile' + else: + timeline.appears_stationary = False + timeline.movement_pattern = 'intermittent' + + # Presence ratio + if timeline.first_seen and timeline.last_seen: + total_duration = (timeline.last_seen - timeline.first_seen).total_seconds() + if total_duration > 0: + # Estimate presence based on observation count and bucket size + estimated_present_time = timeline.total_observations * self.bucket_seconds + timeline.presence_ratio = min(1.0, estimated_present_time / total_duration) + + return timeline + + def get_timeline(self, identifier: str, protocol: str) -> Optional[DeviceTimeline]: + """Get computed timeline for a device.""" + return self.compute_metrics(identifier, protocol) + + def get_all_timelines(self) -> list[DeviceTimeline]: + """Get all device timelines with computed metrics.""" + for key in self.timelines: + protocol, identifier = key.split(':', 1) + self.compute_metrics(identifier, protocol) + return list(self.timelines.values()) + + +# ============================================================================= +# 5. Meeting-Window Summary Enhancements +# ============================================================================= + +@dataclass +class MeetingWindowSummary: + """ + Summary of device activity during a meeting window. + + Tracks devices first seen during meeting, behavior changes, + and applies meeting-window scoring modifiers. + """ + meeting_id: int + name: Optional[str] = None + start_time: Optional[datetime] = None + end_time: Optional[datetime] = None + duration_minutes: float = 0.0 + + # Devices first seen during meeting (high interest) + devices_first_seen: list[dict] = field(default_factory=list) + + # Devices with behavior change during meeting + devices_behavior_change: list[dict] = field(default_factory=list) + + # All active devices during meeting + active_devices: list[dict] = field(default_factory=list) + + # Summary metrics + total_devices_active: int = 0 + new_devices_count: int = 0 + behavior_changes_count: int = 0 + high_interest_count: int = 0 + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization.""" + return { + 'meeting_id': self.meeting_id, + 'name': self.name, + 'start_time': self.start_time.isoformat() if self.start_time else None, + 'end_time': self.end_time.isoformat() if self.end_time else None, + 'duration_minutes': round(self.duration_minutes, 1), + 'summary': { + 'total_devices_active': self.total_devices_active, + 'new_devices': self.new_devices_count, + 'behavior_changes': self.behavior_changes_count, + 'high_interest': self.high_interest_count, + }, + 'devices_first_seen': self.devices_first_seen, + 'devices_behavior_change': self.devices_behavior_change, + 'disclaimer': ( + "Meeting-correlated activity indicates temporal correlation only, " + "not confirmed surveillance. Devices may have legitimate reasons " + "for appearing during meetings." + ), + } + + +def generate_meeting_summary( + meeting_window: dict, + device_timelines: list[DeviceTimeline], + device_profiles: list[dict] +) -> MeetingWindowSummary: + """ + Generate summary of device activity during a meeting window. + + Args: + meeting_window: Meeting window dict from database + device_timelines: List of device timelines + device_profiles: List of device profiles from correlation engine + + Returns: + MeetingWindowSummary with analysis + """ + summary = MeetingWindowSummary( + meeting_id=meeting_window.get('id', 0), + name=meeting_window.get('name'), + ) + + # Parse times + start_str = meeting_window.get('start_time') + end_str = meeting_window.get('end_time') + + if start_str: + if isinstance(start_str, str): + summary.start_time = datetime.fromisoformat(start_str.replace('Z', '+00:00')).replace(tzinfo=None) + else: + summary.start_time = start_str + + if end_str: + if isinstance(end_str, str): + summary.end_time = datetime.fromisoformat(end_str.replace('Z', '+00:00')).replace(tzinfo=None) + else: + summary.end_time = end_str + + if summary.start_time and summary.end_time: + summary.duration_minutes = (summary.end_time - summary.start_time).total_seconds() / 60 + + if not summary.start_time: + return summary + + # Analyze device timelines + for timeline in device_timelines: + if not timeline.first_seen: + continue + + # Check if device was active during meeting + was_active = False + first_seen_during = False + + for obs in timeline.observations: + if summary.end_time: + if summary.start_time <= obs.timestamp <= summary.end_time: + was_active = True + if timeline.first_seen and abs((obs.timestamp - timeline.first_seen).total_seconds()) < 60: + first_seen_during = True + break + else: + # Meeting still ongoing + if obs.timestamp >= summary.start_time: + was_active = True + if timeline.first_seen and abs((obs.timestamp - timeline.first_seen).total_seconds()) < 60: + first_seen_during = True + break + + if was_active: + device_info = { + 'identifier': timeline.identifier, + 'protocol': timeline.protocol, + 'name': timeline.name, + 'meeting_correlated': True, + } + summary.active_devices.append(device_info) + + if first_seen_during: + device_info['first_seen_during_meeting'] = True + summary.devices_first_seen.append({ + **device_info, + 'description': 'Device first seen during meeting window', + 'risk_modifier': '+2 (meeting-correlated activity)', + }) + + # Update counts + summary.total_devices_active = len(summary.active_devices) + summary.new_devices_count = len(summary.devices_first_seen) + summary.behavior_changes_count = len(summary.devices_behavior_change) + + # Count high interest from profiles + for profile in device_profiles: + if profile.get('risk_level') == 'high_interest': + indicators = profile.get('indicators', []) + if any(i.get('type') == 'meeting_correlated' for i in indicators): + summary.high_interest_count += 1 + + return summary + + +# ============================================================================= +# 7. WiFi Advanced Indicators (LIMITED SCOPE) +# ============================================================================= + +@dataclass +class WiFiAdvancedIndicator: + """An advanced WiFi indicator detection.""" + indicator_type: str # 'evil_twin', 'probe_request', 'deauth_burst' + severity: str # 'high', 'medium', 'low' + description: str + details: dict = field(default_factory=dict) + timestamp: datetime = field(default_factory=datetime.now) + requires_monitor_mode: bool = False + + def to_dict(self) -> dict: + return { + 'type': self.indicator_type, + 'severity': self.severity, + 'description': self.description, + 'details': self.details, + 'timestamp': self.timestamp.isoformat(), + 'requires_monitor_mode': self.requires_monitor_mode, + 'disclaimer': ( + "Pattern detected - this is an indicator, not confirmation of an attack. " + "Further investigation required." + ), + } + + +class WiFiAdvancedDetector: + """ + Detects advanced WiFi indicators. + + LIMITED SCOPE - Only implements: + 1. Evil Twin patterns (same SSID, different BSSID/security/abnormal signal) + 2. Probe requests for sensitive SSIDs (requires monitor mode) + 3. Deauthentication bursts (requires monitor mode) + + All findings labeled as "pattern detected", never called attacks. + """ + + def __init__(self, monitor_mode_available: bool = False): + self.monitor_mode = monitor_mode_available + self.known_networks: dict[str, dict] = {} # SSID -> expected BSSID/security + self.probe_requests: list[dict] = [] + self.deauth_frames: list[dict] = [] + self.indicators: list[WiFiAdvancedIndicator] = [] + + def set_known_networks(self, networks: list[dict]) -> None: + """Set known/expected networks from baseline.""" + for net in networks: + ssid = net.get('essid', net.get('ssid', '')) + if ssid: + self.known_networks[ssid] = { + 'bssid': net.get('bssid', net.get('mac', '')).upper(), + 'security': net.get('encryption', net.get('privacy', '')), + 'channel': net.get('channel'), + 'rssi': net.get('power', net.get('signal')), + } + + def analyze_network(self, network: dict) -> list[WiFiAdvancedIndicator]: + """ + Analyze a network for evil twin patterns. + + Detects: Same SSID with different BSSID, security, or abnormal signal. + """ + indicators = [] + ssid = network.get('essid', network.get('ssid', '')) + bssid = network.get('bssid', network.get('mac', '')).upper() + security = network.get('encryption', network.get('privacy', '')) + rssi = network.get('power', network.get('signal')) + + if not ssid or ssid in ['', 'Hidden', '[Hidden]']: + return indicators + + if ssid in self.known_networks: + known = self.known_networks[ssid] + + # Different BSSID for same SSID + if known['bssid'] and known['bssid'] != bssid: + # Check security mismatch + security_mismatch = known['security'] and security and known['security'] != security + + # Check signal anomaly (significantly stronger than expected) + signal_anomaly = False + if rssi and known.get('rssi'): + try: + rssi_diff = int(rssi) - int(known['rssi']) + signal_anomaly = rssi_diff > 20 # Much stronger than expected + except (ValueError, TypeError): + pass + + if security_mismatch: + indicators.append(WiFiAdvancedIndicator( + indicator_type='evil_twin', + severity='high', + description=f'Evil twin pattern detected for SSID "{ssid}"', + details={ + 'ssid': ssid, + 'detected_bssid': bssid, + 'expected_bssid': known['bssid'], + 'detected_security': security, + 'expected_security': known['security'], + 'pattern': 'Different BSSID with security downgrade', + }, + requires_monitor_mode=False, + )) + elif signal_anomaly: + indicators.append(WiFiAdvancedIndicator( + indicator_type='evil_twin', + severity='medium', + description=f'Possible evil twin pattern for SSID "{ssid}"', + details={ + 'ssid': ssid, + 'detected_bssid': bssid, + 'expected_bssid': known['bssid'], + 'signal_difference': f'+{rssi_diff} dBm stronger than expected', + 'pattern': 'Different BSSID with abnormally strong signal', + }, + requires_monitor_mode=False, + )) + else: + indicators.append(WiFiAdvancedIndicator( + indicator_type='evil_twin', + severity='low', + description=f'Duplicate SSID detected: "{ssid}"', + details={ + 'ssid': ssid, + 'detected_bssid': bssid, + 'expected_bssid': known['bssid'], + 'pattern': 'Multiple APs with same SSID (may be legitimate)', + }, + requires_monitor_mode=False, + )) + + self.indicators.extend(indicators) + return indicators + + def add_probe_request(self, frame: dict) -> Optional[WiFiAdvancedIndicator]: + """ + Record a probe request frame (requires monitor mode). + + Detects repeated probing for sensitive SSIDs. + """ + if not self.monitor_mode: + return None + + self.probe_requests.append({ + 'timestamp': datetime.now(), + 'src_mac': frame.get('src_mac', '').upper(), + 'probed_ssid': frame.get('ssid', ''), + }) + + # Keep last 1000 probe requests + if len(self.probe_requests) > 1000: + self.probe_requests = self.probe_requests[-1000:] + + # Check for sensitive SSID probing + ssid = frame.get('ssid', '') + sensitive_patterns = [ + 'corp', 'internal', 'private', 'secure', 'vpn', + 'admin', 'management', 'executive', 'board', + ] + + is_sensitive = any(p in ssid.lower() for p in sensitive_patterns) if ssid else False + + if is_sensitive: + # Count recent probes for this SSID + recent_cutoff = datetime.now() - timedelta(minutes=5) + recent_probes = [ + p for p in self.probe_requests + if p['probed_ssid'] == ssid and p['timestamp'] > recent_cutoff + ] + + if len(recent_probes) >= 3: + indicator = WiFiAdvancedIndicator( + indicator_type='probe_request', + severity='medium', + description=f'Repeated probing for sensitive SSID "{ssid}"', + details={ + 'ssid': ssid, + 'probe_count': len(recent_probes), + 'source_macs': list(set(p['src_mac'] for p in recent_probes)), + 'pattern': 'Multiple probe requests for potentially sensitive network', + }, + requires_monitor_mode=True, + ) + self.indicators.append(indicator) + return indicator + + return None + + def add_deauth_frame(self, frame: dict) -> Optional[WiFiAdvancedIndicator]: + """ + Record a deauthentication frame (requires monitor mode). + + Detects abnormal deauth volume potentially indicating attack. + """ + if not self.monitor_mode: + return None + + self.deauth_frames.append({ + 'timestamp': datetime.now(), + 'src_mac': frame.get('src_mac', '').upper(), + 'dst_mac': frame.get('dst_mac', '').upper(), + 'bssid': frame.get('bssid', '').upper(), + 'reason': frame.get('reason_code'), + }) + + # Keep last 500 deauth frames + if len(self.deauth_frames) > 500: + self.deauth_frames = self.deauth_frames[-500:] + + # Check for deauth burst (>10 deauths in 10 seconds) + recent_cutoff = datetime.now() - timedelta(seconds=10) + recent_deauths = [d for d in self.deauth_frames if d['timestamp'] > recent_cutoff] + + if len(recent_deauths) >= 10: + # Check if targeting specific BSSID + bssid = frame.get('bssid', '').upper() + targeting_bssid = len([d for d in recent_deauths if d['bssid'] == bssid]) >= 5 + + indicator = WiFiAdvancedIndicator( + indicator_type='deauth_burst', + severity='high' if targeting_bssid else 'medium', + description='Deauthentication burst pattern detected', + details={ + 'deauth_count': len(recent_deauths), + 'time_window_seconds': 10, + 'targeted_bssid': bssid if targeting_bssid else None, + 'unique_sources': len(set(d['src_mac'] for d in recent_deauths)), + 'pattern': 'Abnormal deauthentication frame volume', + }, + requires_monitor_mode=True, + ) + self.indicators.append(indicator) + + # Clear recent to avoid repeated alerts + self.deauth_frames = [d for d in self.deauth_frames if d['timestamp'] <= recent_cutoff] + + return indicator + + return None + + def get_all_indicators(self) -> list[dict]: + """Get all detected indicators.""" + return [i.to_dict() for i in self.indicators] + + def get_unavailable_features(self) -> list[str]: + """Get list of features unavailable without monitor mode.""" + if self.monitor_mode: + return [] + return [ + "Probe request analysis: Requires monitor mode to capture probe frames.", + "Deauthentication detection: Requires monitor mode to capture management frames.", + "Raw 802.11 frame analysis: Not available in managed mode.", + ] + + +# ============================================================================= +# 8. Bluetooth Risk Explainability & Proximity Heuristics +# ============================================================================= + +class BLEProximity(Enum): + """RSSI-based proximity estimation.""" + VERY_CLOSE = 'very_close' # Within ~1m + CLOSE = 'close' # Within ~3m + MODERATE = 'moderate' # Within ~10m + FAR = 'far' # Beyond ~10m + UNKNOWN = 'unknown' + + +@dataclass +class BLERiskExplanation: + """ + Explainable risk assessment for a BLE device. + + Provides human-readable explanations, proximity estimates, + and recommended actions. + """ + identifier: str + name: Optional[str] = None + + # Risk assessment + risk_level: str = 'informational' + risk_score: int = 0 + risk_explanation: str = '' + + # Proximity + proximity: BLEProximity = BLEProximity.UNKNOWN + proximity_explanation: str = '' + estimated_distance: str = '' + + # Tracker detection + is_tracker: bool = False + tracker_type: Optional[str] = None + tracker_explanation: str = '' + + # Meeting correlation + meeting_correlated: bool = False + meeting_explanation: str = '' + + # Recommended action + recommended_action: str = '' + action_rationale: str = '' + + # All indicators with explanations + indicators: list[dict] = field(default_factory=list) + + def to_dict(self) -> dict: + return { + 'identifier': self.identifier, + 'name': self.name, + 'risk': { + 'level': self.risk_level, + 'score': self.risk_score, + 'explanation': self.risk_explanation, + }, + 'proximity': { + 'estimate': self.proximity.value, + 'explanation': self.proximity_explanation, + 'estimated_distance': self.estimated_distance, + }, + 'tracker': { + 'is_tracker': self.is_tracker, + 'type': self.tracker_type, + 'explanation': self.tracker_explanation, + }, + 'meeting_correlation': { + 'correlated': self.meeting_correlated, + 'explanation': self.meeting_explanation, + }, + 'recommended_action': { + 'action': self.recommended_action, + 'rationale': self.action_rationale, + }, + 'indicators': self.indicators, + 'disclaimer': ( + "Risk assessment is based on observable indicators and heuristics. " + "Proximity estimates are approximate based on RSSI and may vary with environment. " + "Tracker detection indicates brand presence, not confirmed threat." + ), + } + + +def estimate_ble_proximity(rssi: int) -> tuple[BLEProximity, str, str]: + """ + Estimate BLE device proximity from RSSI. + + Note: RSSI-based distance is highly variable due to: + - TX power differences between devices + - Environmental factors (walls, interference) + - Antenna characteristics + + Returns: + Tuple of (proximity enum, explanation, estimated distance string) + """ + if rssi is None: + return ( + BLEProximity.UNKNOWN, + "RSSI not available - cannot estimate proximity", + "Unknown" + ) + + # These thresholds are heuristic approximations + if rssi >= -50: + return ( + BLEProximity.VERY_CLOSE, + f"Very strong signal ({rssi} dBm) suggests device is very close", + "< 1 meter (approximate)" + ) + elif rssi >= -65: + return ( + BLEProximity.CLOSE, + f"Strong signal ({rssi} dBm) suggests device is nearby", + "1-3 meters (approximate)" + ) + elif rssi >= -80: + return ( + BLEProximity.MODERATE, + f"Moderate signal ({rssi} dBm) suggests device is in the area", + "3-10 meters (approximate)" + ) + else: + return ( + BLEProximity.FAR, + f"Weak signal ({rssi} dBm) suggests device is distant", + "> 10 meters (approximate)" + ) + + +def generate_ble_risk_explanation( + device: dict, + profile: Optional[dict] = None, + is_during_meeting: bool = False +) -> BLERiskExplanation: + """ + Generate human-readable risk explanation for a BLE device. + + Args: + device: BLE device dict with mac, name, rssi, etc. + profile: DeviceProfile dict from correlation engine + is_during_meeting: Whether device was detected during meeting + + Returns: + BLERiskExplanation with complete assessment + """ + mac = device.get('mac', device.get('address', '')).upper() + name = device.get('name', '') + rssi = device.get('rssi', device.get('signal')) + + explanation = BLERiskExplanation( + identifier=mac, + name=name if name else None, + ) + + # Proximity estimation + if rssi: + try: + rssi_int = int(rssi) + prox, prox_exp, dist = estimate_ble_proximity(rssi_int) + explanation.proximity = prox + explanation.proximity_explanation = prox_exp + explanation.estimated_distance = dist + except (ValueError, TypeError): + explanation.proximity = BLEProximity.UNKNOWN + explanation.proximity_explanation = "Could not parse RSSI value" + + # Tracker detection with explanation + tracker_info = device.get('tracker_type') or device.get('is_tracker') + if device.get('is_airtag'): + explanation.is_tracker = True + explanation.tracker_type = 'Apple AirTag' + explanation.tracker_explanation = ( + "Apple AirTag detected via manufacturer data. AirTags are legitimate " + "tracking devices but may indicate unwanted tracking if not recognized. " + "Apple's Find My network will alert iPhone users to unknown AirTags." + ) + elif device.get('is_tile'): + explanation.is_tracker = True + explanation.tracker_type = 'Tile' + explanation.tracker_explanation = ( + "Tile tracker detected. Tile trackers are common consumer devices " + "for finding lost items. Presence does not indicate surveillance." + ) + elif device.get('is_smarttag'): + explanation.is_tracker = True + explanation.tracker_type = 'Samsung SmartTag' + explanation.tracker_explanation = ( + "Samsung SmartTag detected. SmartTags are consumer tracking devices " + "similar to AirTags. Samsung phones can detect unknown SmartTags." + ) + elif device.get('is_espressif'): + explanation.tracker_type = 'ESP32/ESP8266' + explanation.tracker_explanation = ( + "Espressif chipset (ESP32/ESP8266) detected. These are programmable " + "development boards commonly used in IoT projects. They can be configured " + "for various purposes including custom tracking devices." + ) + + # Meeting correlation explanation + if is_during_meeting or device.get('meeting_correlated'): + explanation.meeting_correlated = True + explanation.meeting_explanation = ( + "Device detected during a marked meeting window. This temporal correlation " + "is noted but does not confirm malicious intent - many legitimate devices " + "are active during meetings (phones, laptops, wearables)." + ) + + # Build risk explanation from profile + if profile: + explanation.risk_level = profile.get('risk_level', 'informational') + explanation.risk_score = profile.get('total_score', 0) + + # Convert indicators to explanations + for ind in profile.get('indicators', []): + ind_type = ind.get('type', '') + ind_desc = ind.get('description', '') + + explanation.indicators.append({ + 'type': ind_type, + 'description': ind_desc, + 'explanation': _get_indicator_explanation(ind_type), + }) + + # Build overall risk explanation + if explanation.risk_level == 'high_interest': + explanation.risk_explanation = ( + f"This device has accumulated {explanation.risk_score} risk points " + "across multiple indicators, warranting closer investigation. " + "High interest does not confirm surveillance - manual verification required." + ) + elif explanation.risk_level == 'review': + explanation.risk_explanation = ( + f"This device shows {explanation.risk_score} risk points indicating " + "it should be reviewed but is not immediately concerning." + ) + else: + explanation.risk_explanation = ( + "This device shows typical characteristics and does not raise " + "significant concerns based on observable indicators." + ) + else: + explanation.risk_explanation = "No detailed profile available for risk assessment." + + # Recommended action + _set_recommended_action(explanation) + + return explanation + + +def _get_indicator_explanation(indicator_type: str) -> str: + """Get human-readable explanation for an indicator type.""" + explanations = { + 'unknown_device': ( + "Device manufacturer is unknown or uses a generic chipset. " + "This is common in DIY/hobbyist devices and some surveillance equipment." + ), + 'audio_capable': ( + "Device advertises audio services (headphones, speakers, etc.). " + "Audio-capable devices could theoretically transmit captured audio." + ), + 'persistent': ( + "Device has been detected repeatedly across multiple scans. " + "Persistence suggests a fixed or regularly present device." + ), + 'meeting_correlated': ( + "Device activity correlates with marked meeting windows. " + "This is a temporal pattern that warrants attention." + ), + 'hidden_identity': ( + "Device does not broadcast a name or uses minimal advertising. " + "Some legitimate devices minimize advertising for battery life." + ), + 'stable_rssi': ( + "Signal strength is very stable, suggesting a stationary device. " + "Fixed placement could indicate a planted device." + ), + 'mac_rotation': ( + "Device appears to use MAC address randomization. " + "This is a privacy feature in modern devices, also used to evade detection." + ), + 'known_tracker': ( + "Device matches known tracking device signatures. " + "May be a legitimate item tracker or unwanted surveillance." + ), + 'airtag_detected': ( + "Apple AirTag identified. Check if this belongs to someone present." + ), + 'tile_detected': ( + "Tile tracker identified. Common consumer tracking device." + ), + 'smarttag_detected': ( + "Samsung SmartTag identified. Consumer tracking device." + ), + 'esp32_device': ( + "Espressif development board detected. Highly programmable, " + "could be configured for custom surveillance applications." + ), + } + return explanations.get(indicator_type, "Indicator detected requiring review.") + + +def _set_recommended_action(explanation: BLERiskExplanation) -> None: + """Set recommended action based on risk assessment.""" + if explanation.risk_level == 'high_interest': + if explanation.is_tracker and explanation.proximity == BLEProximity.VERY_CLOSE: + explanation.recommended_action = 'Investigate immediately' + explanation.action_rationale = ( + "Unknown tracker in very close proximity warrants immediate " + "physical search of the area and personal belongings." + ) + elif explanation.is_tracker: + explanation.recommended_action = 'Investigate location' + explanation.action_rationale = ( + "Tracker detected - recommend searching the area to locate " + "the physical device and determine if it belongs to someone present." + ) + else: + explanation.recommended_action = 'Review and document' + explanation.action_rationale = ( + "Multiple risk indicators present. Document the finding, " + "attempt to identify the device, and consider physical search " + "if other indicators suggest surveillance." + ) + elif explanation.risk_level == 'review': + explanation.recommended_action = 'Monitor and document' + explanation.action_rationale = ( + "Device shows some indicators worth noting. Add to monitoring list " + "and compare against future sweeps to identify patterns." + ) + else: + explanation.recommended_action = 'Continue monitoring' + explanation.action_rationale = ( + "No immediate action required. Device will be tracked in subsequent " + "sweeps for pattern analysis." + ) + + +# ============================================================================= +# 9. Operator Playbooks ("What To Do Next") +# ============================================================================= + +@dataclass +class PlaybookStep: + """A single step in an operator playbook.""" + step_number: int + action: str + details: str + safety_note: Optional[str] = None + + +@dataclass +class OperatorPlaybook: + """ + Procedural guidance for TSCM operators based on findings. + + Playbooks are procedural (what to do), not prescriptive (how to decide). + All guidance is legally safe and professional. + """ + playbook_id: str + title: str + risk_level: str + description: str + steps: list[PlaybookStep] = field(default_factory=list) + when_to_escalate: str = '' + documentation_required: list[str] = field(default_factory=list) + + def to_dict(self) -> dict: + return { + 'playbook_id': self.playbook_id, + 'title': self.title, + 'risk_level': self.risk_level, + 'description': self.description, + 'steps': [ + { + 'step': s.step_number, + 'action': s.action, + 'details': s.details, + 'safety_note': s.safety_note, + } + for s in self.steps + ], + 'when_to_escalate': self.when_to_escalate, + 'documentation_required': self.documentation_required, + 'disclaimer': ( + "This playbook provides procedural guidance only. Actions should be " + "adapted to local laws, organizational policies, and professional judgment. " + "Do not disassemble, interfere with, or remove suspected devices without " + "proper authorization and legal guidance." + ), + } + + +# Predefined playbooks by risk level +PLAYBOOKS = { + 'high_interest_tracker': OperatorPlaybook( + playbook_id='PB-001', + title='High Interest: Unknown Tracker Detection', + risk_level='high_interest', + description='Guidance for responding to unknown tracking device detection', + steps=[ + PlaybookStep( + step_number=1, + action='Document the finding', + details='Record device identifier, signal strength, location, and timestamp. Take screenshots of the detection.', + ), + PlaybookStep( + step_number=2, + action='Estimate device location', + details='Use signal strength variations while moving to triangulate approximate device position. Note areas of strongest signal.', + safety_note='Do not touch or disturb any physical device found.', + ), + PlaybookStep( + step_number=3, + action='Physical search (if authorized)', + details='Systematically search the high-signal area. Check common hiding spots: under furniture, in plants, behind fixtures, in bags/belongings.', + safety_note='Only conduct physical searches with proper authorization.', + ), + PlaybookStep( + step_number=4, + action='Identify device owner', + details='If device is located, determine if it belongs to someone legitimately present. Apple/Samsung/Tile devices can be scanned by their respective apps.', + ), + PlaybookStep( + step_number=5, + action='Escalate if unidentified', + details='If device owner cannot be determined and device is in sensitive location, escalate to security management.', + ), + ], + when_to_escalate='Escalate immediately if: device is concealed in sensitive area, owner cannot be identified, or multiple unknown trackers are found.', + documentation_required=[ + 'Device identifier (MAC address)', + 'Signal strength readings at multiple locations', + 'Physical location description', + 'Photos of any located devices', + 'Names of individuals present during search', + ], + ), + + 'high_interest_generic': OperatorPlaybook( + playbook_id='PB-002', + title='High Interest: Suspicious Device Pattern', + risk_level='high_interest', + description='Guidance for devices with multiple high-risk indicators', + steps=[ + PlaybookStep( + step_number=1, + action='Review all indicators', + details='Examine each risk indicator in the device profile. Understand why the device scored high interest.', + ), + PlaybookStep( + step_number=2, + action='Cross-reference with baseline', + details='Check if device appears in baseline. New devices warrant more scrutiny than known devices.', + ), + PlaybookStep( + step_number=3, + action='Monitor for pattern', + details='Continue sweep and note if device persists, moves, or correlates with sensitive activities.', + ), + PlaybookStep( + step_number=4, + action='Attempt identification', + details='Research manufacturer OUI, check for matching devices in the environment, ask occupants about devices.', + ), + PlaybookStep( + step_number=5, + action='Document and report', + details='Add finding to sweep report with full details. Include in meeting/client debrief.', + ), + ], + when_to_escalate='Escalate if: device cannot be identified, shows surveillance-consistent behavior, or correlates strongly with sensitive activities.', + documentation_required=[ + 'Complete device profile', + 'All risk indicators with scores', + 'Timeline of observations', + 'Correlation with meeting windows', + 'Any identification attempts and results', + ], + ), + + 'needs_review': OperatorPlaybook( + playbook_id='PB-003', + title='Needs Review: Unknown Device', + risk_level='needs_review', + description='Guidance for devices requiring investigation but not immediately concerning', + steps=[ + PlaybookStep( + step_number=1, + action='Note the device', + details='Add device to monitoring list. Record basic details: identifier, type, signal strength.', + ), + PlaybookStep( + step_number=2, + action='Check against known devices', + details='Verify device is not a known infrastructure device or personal device of authorized personnel.', + ), + PlaybookStep( + step_number=3, + action='Continue sweep', + details='Complete the sweep. Review device in context of all findings.', + ), + PlaybookStep( + step_number=4, + action='Assess in final review', + details='During sweep wrap-up, decide if device warrants further investigation or can be added to baseline.', + ), + ], + when_to_escalate='Escalate if: multiple "needs review" devices appear together, or device shows high-interest indicators in subsequent sweeps.', + documentation_required=[ + 'Device identifier and type', + 'Brief description of why flagged', + 'Decision made (investigate further / add to baseline / monitor)', + ], + ), + + 'informational': OperatorPlaybook( + playbook_id='PB-004', + title='Informational: Known/Expected Device', + risk_level='informational', + description='Guidance for devices that appear normal and expected', + steps=[ + PlaybookStep( + step_number=1, + action='Verify against baseline', + details='Confirm device matches baseline entry. Note any changes (signal strength, channel, etc.).', + ), + PlaybookStep( + step_number=2, + action='Log observation', + details='Record observation for timeline tracking. Even known devices should be logged.', + ), + PlaybookStep( + step_number=3, + action='Continue sweep', + details='No further action required. Proceed with sweep.', + ), + ], + when_to_escalate='Only escalate if device shows unexpected behavior changes or additional risk indicators.', + documentation_required=[ + 'Device identifier (for timeline)', + 'Observation timestamp', + ], + ), + + 'wifi_evil_twin': OperatorPlaybook( + playbook_id='PB-005', + title='High Interest: Evil Twin Pattern Detected', + risk_level='high_interest', + description='Guidance when duplicate SSID with security mismatch is detected', + steps=[ + PlaybookStep( + step_number=1, + action='Document both access points', + details='Record details of legitimate AP and suspected rogue: BSSID, security, signal strength, channel.', + ), + PlaybookStep( + step_number=2, + action='Verify legitimate AP', + details='Confirm which AP is the authorized infrastructure. Check with IT/facilities if needed.', + ), + PlaybookStep( + step_number=3, + action='Locate rogue AP', + details='Use signal strength to estimate rogue AP location. Walk the area noting signal variations.', + safety_note='Do not connect to or interact with the suspected rogue AP.', + ), + PlaybookStep( + step_number=4, + action='Physical search', + details='Search suspected area for unauthorized access point. Check for hidden devices, suspicious equipment.', + ), + PlaybookStep( + step_number=5, + action='Report to IT Security', + details='Even if device not found, report the finding to IT Security for network monitoring.', + ), + ], + when_to_escalate='Escalate immediately. Evil twin attacks can capture credentials and traffic.', + documentation_required=[ + 'Both AP details (BSSID, SSID, security, channel, signal)', + 'Location where detected', + 'Signal strength map if created', + 'Physical search results', + ], + ), +} + + +def get_playbook_for_finding( + risk_level: str, + finding_type: Optional[str] = None, + indicators: Optional[list[dict]] = None +) -> OperatorPlaybook: + """ + Get appropriate playbook for a finding. + + Args: + risk_level: Risk level string + finding_type: Optional specific finding type + indicators: Optional list of indicators + + Returns: + Appropriate OperatorPlaybook + """ + # Check for specific finding types + if finding_type == 'evil_twin': + return PLAYBOOKS['wifi_evil_twin'] + + # Check indicators for tracker + if indicators: + tracker_types = ['airtag_detected', 'tile_detected', 'smarttag_detected', 'known_tracker'] + if any(i.get('type') in tracker_types for i in indicators): + if risk_level == 'high_interest': + return PLAYBOOKS['high_interest_tracker'] + + # Return based on risk level + if risk_level == 'high_interest': + return PLAYBOOKS['high_interest_generic'] + elif risk_level in ['review', 'needs_review']: + return PLAYBOOKS['needs_review'] + else: + return PLAYBOOKS['informational'] + + +def attach_playbook_to_finding(finding: dict) -> dict: + """ + Attach appropriate playbook to a finding dict. + + Args: + finding: Finding dict with risk_level, indicators, etc. + + Returns: + Finding dict with playbook attached + """ + risk_level = finding.get('risk_level', 'informational') + finding_type = finding.get('finding_type') + indicators = finding.get('indicators', []) + + playbook = get_playbook_for_finding(risk_level, finding_type, indicators) + finding['suggested_playbook'] = playbook.to_dict() + finding['suggested_next_steps'] = [ + f"Step {s.step_number}: {s.action}" + for s in playbook.steps[:3] # First 3 steps as quick reference + ] + + return finding + + +# ============================================================================= +# Global Instance Management +# ============================================================================= + +_timeline_manager: Optional[TimelineManager] = None +_wifi_detector: Optional[WiFiAdvancedDetector] = None + + +def get_timeline_manager() -> TimelineManager: + """Get or create global timeline manager.""" + global _timeline_manager + if _timeline_manager is None: + _timeline_manager = TimelineManager() + return _timeline_manager + + +def reset_timeline_manager() -> None: + """Reset global timeline manager.""" + global _timeline_manager + _timeline_manager = TimelineManager() + + +def get_wifi_detector(monitor_mode: bool = False) -> WiFiAdvancedDetector: + """Get or create global WiFi detector.""" + global _wifi_detector + if _wifi_detector is None: + _wifi_detector = WiFiAdvancedDetector(monitor_mode) + return _wifi_detector + + +def reset_wifi_detector(monitor_mode: bool = False) -> None: + """Reset global WiFi detector.""" + global _wifi_detector + _wifi_detector = WiFiAdvancedDetector(monitor_mode) diff --git a/utils/tscm/reports.py b/utils/tscm/reports.py new file mode 100644 index 0000000..4c97bb9 --- /dev/null +++ b/utils/tscm/reports.py @@ -0,0 +1,813 @@ +""" +TSCM Report Generation Module + +Generates: +1. Client-safe PDF reports with executive summary +2. Technical annex (JSON + CSV) with device timelines and indicators + +DISCLAIMER: All reports include mandatory disclaimers. +No packet data. No claims of confirmed surveillance. +""" + +from __future__ import annotations + +import csv +import io +import json +import logging +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Optional + +logger = logging.getLogger('intercept.tscm.reports') + +# ============================================================================= +# Report Data Structures +# ============================================================================= + +@dataclass +class ReportFinding: + """A single finding for the report.""" + identifier: str + protocol: str + name: Optional[str] + risk_level: str + risk_score: int + description: str + indicators: list[dict] = field(default_factory=list) + recommended_action: str = '' + playbook_reference: str = '' + + +@dataclass +class ReportMeetingSummary: + """Meeting window summary for report.""" + name: Optional[str] + start_time: str + end_time: Optional[str] + duration_minutes: float + devices_first_seen: int + behavior_changes: int + high_interest_devices: int + + +@dataclass +class TSCMReport: + """ + Complete TSCM sweep report. + + Contains all data needed for both client-safe PDF and technical annex. + """ + # Report metadata + report_id: str + generated_at: datetime + sweep_id: int + sweep_type: str + + # Location and context + location: Optional[str] = None + baseline_id: Optional[int] = None + baseline_name: Optional[str] = None + + # Executive summary + executive_summary: str = '' + overall_risk_assessment: str = 'low' # low, moderate, elevated, high + key_findings_count: int = 0 + + # Capabilities used + capabilities: dict = field(default_factory=dict) + limitations: list[str] = field(default_factory=list) + + # Findings by risk tier + high_interest_findings: list[ReportFinding] = field(default_factory=list) + needs_review_findings: list[ReportFinding] = field(default_factory=list) + informational_findings: list[ReportFinding] = field(default_factory=list) + + # Meeting window summaries + meeting_summaries: list[ReportMeetingSummary] = field(default_factory=list) + + # Statistics + total_devices_scanned: int = 0 + wifi_devices: int = 0 + bluetooth_devices: int = 0 + rf_signals: int = 0 + new_devices: int = 0 + missing_devices: int = 0 + + # Sweep duration + sweep_start: Optional[datetime] = None + sweep_end: Optional[datetime] = None + duration_minutes: float = 0.0 + + # Technical data (for annex only) + device_timelines: list[dict] = field(default_factory=list) + all_indicators: list[dict] = field(default_factory=list) + baseline_diff: Optional[dict] = None + correlation_data: list[dict] = field(default_factory=list) + + +# ============================================================================= +# Disclaimer Text +# ============================================================================= + +REPORT_DISCLAIMER = """ +IMPORTANT DISCLAIMER + +This report documents the findings of a Technical Surveillance Countermeasures +(TSCM) sweep conducted using electronic detection equipment. The following +limitations and considerations apply: + +1. DETECTION LIMITATIONS: No TSCM sweep can guarantee detection of all + surveillance devices. Sophisticated devices may evade detection. + +2. FINDINGS ARE INDICATORS: All findings represent patterns and indicators, + NOT confirmed surveillance devices. Each finding requires professional + interpretation and may have legitimate explanations. + +3. ENVIRONMENTAL FACTORS: Wireless signals are affected by building + construction, interference, and other environmental factors that may + impact detection accuracy. + +4. POINT-IN-TIME ASSESSMENT: This report reflects conditions at the time + of the sweep. Conditions may change after the assessment. + +5. NOT LEGAL ADVICE: This report does not constitute legal advice. Consult + qualified legal counsel for guidance on surveillance-related matters. + +6. PRIVACY CONSIDERATIONS: Some detected devices may be legitimate personal + devices of authorized individuals. + +This report should be treated as confidential and distributed only to +authorized personnel on a need-to-know basis. +""" + +ANNEX_DISCLAIMER = """ +TECHNICAL ANNEX DISCLAIMER + +This annex contains detailed technical data from the TSCM sweep. This data +is provided for documentation and audit purposes. + +- No raw packet captures or intercepted communications are included +- Device identifiers (MAC addresses) are included for tracking purposes +- Signal strength values are approximate and environment-dependent +- Timeline data is time-bucketed to preserve privacy +- All interpretations require professional TSCM expertise + +This data should be handled according to organizational data protection +policies and applicable privacy regulations. +""" + + +# ============================================================================= +# Report Generation Functions +# ============================================================================= + +def generate_executive_summary(report: TSCMReport) -> str: + """Generate executive summary text.""" + lines = [] + + # Opening + lines.append(f"TSCM Sweep Report - {report.location or 'Location Not Specified'}") + lines.append(f"Conducted: {report.sweep_start.strftime('%Y-%m-%d %H:%M') if report.sweep_start else 'Unknown'}") + lines.append(f"Duration: {report.duration_minutes:.0f} minutes") + lines.append("") + + # Overall assessment + assessment_text = { + 'low': 'No significant indicators of surveillance activity were detected.', + 'moderate': 'Some devices require review but no confirmed surveillance indicators.', + 'elevated': 'Multiple indicators warrant further investigation.', + 'high': 'Significant indicators detected requiring immediate attention.', + } + lines.append(f"OVERALL ASSESSMENT: {report.overall_risk_assessment.upper()}") + lines.append(assessment_text.get(report.overall_risk_assessment, '')) + lines.append("") + + # Key statistics + lines.append("SCAN STATISTICS:") + lines.append(f" - Total devices scanned: {report.total_devices_scanned}") + lines.append(f" - WiFi access points: {report.wifi_devices}") + lines.append(f" - Bluetooth devices: {report.bluetooth_devices}") + lines.append(f" - RF signals: {report.rf_signals}") + lines.append("") + + # Findings summary + lines.append("FINDINGS SUMMARY:") + lines.append(f" - High Interest (require investigation): {len(report.high_interest_findings)}") + lines.append(f" - Needs Review: {len(report.needs_review_findings)}") + lines.append(f" - Informational: {len(report.informational_findings)}") + lines.append("") + + # Baseline comparison if available + if report.baseline_name: + lines.append(f"BASELINE COMPARISON (vs '{report.baseline_name}'):") + lines.append(f" - New devices: {report.new_devices}") + lines.append(f" - Missing devices: {report.missing_devices}") + lines.append("") + + # Meeting window summary if available + if report.meeting_summaries: + lines.append("MEETING WINDOW ACTIVITY:") + for meeting in report.meeting_summaries: + lines.append(f" - {meeting.name or 'Unnamed meeting'}: " + f"{meeting.devices_first_seen} new devices, " + f"{meeting.high_interest_devices} high interest") + lines.append("") + + # Limitations + if report.limitations: + lines.append("SWEEP LIMITATIONS:") + for limit in report.limitations[:3]: # Top 3 limitations + lines.append(f" - {limit}") + lines.append("") + + return "\n".join(lines) + + +def generate_findings_section(findings: list[ReportFinding], title: str) -> str: + """Generate a findings section for the report.""" + if not findings: + return f"{title}\n\nNo findings in this category.\n" + + lines = [title, "=" * len(title), ""] + + for i, finding in enumerate(findings, 1): + lines.append(f"{i}. {finding.name or finding.identifier}") + lines.append(f" Protocol: {finding.protocol.upper()}") + lines.append(f" Identifier: {finding.identifier}") + lines.append(f" Risk Score: {finding.risk_score}") + lines.append(f" Description: {finding.description}") + if finding.indicators: + lines.append(" Indicators:") + for ind in finding.indicators[:5]: # Limit to 5 indicators + lines.append(f" - {ind.get('type', 'unknown')}: {ind.get('description', '')}") + lines.append(f" Recommended Action: {finding.recommended_action}") + if finding.playbook_reference: + lines.append(f" Reference: {finding.playbook_reference}") + lines.append("") + + return "\n".join(lines) + + +def generate_meeting_section(summaries: list[ReportMeetingSummary]) -> str: + """Generate meeting window summary section.""" + if not summaries: + return "MEETING WINDOW SUMMARY\n\nNo meeting windows were marked during this sweep.\n" + + lines = ["MEETING WINDOW SUMMARY", "=" * 22, ""] + + for meeting in summaries: + lines.append(f"Meeting: {meeting.name or 'Unnamed'}") + lines.append(f" Time: {meeting.start_time} - {meeting.end_time or 'ongoing'}") + lines.append(f" Duration: {meeting.duration_minutes:.0f} minutes") + lines.append(f" Devices first seen during meeting: {meeting.devices_first_seen}") + lines.append(f" Behavior changes detected: {meeting.behavior_changes}") + lines.append(f" High interest devices active: {meeting.high_interest_devices}") + + if meeting.devices_first_seen > 0 or meeting.high_interest_devices > 0: + lines.append(" NOTE: Meeting-correlated activity detected - see findings for details") + lines.append("") + + lines.append("Meeting-correlated activity indicates temporal correlation only.") + lines.append("Devices appearing during meetings may have legitimate explanations.") + lines.append("") + + return "\n".join(lines) + + +def generate_pdf_content(report: TSCMReport) -> str: + """ + Generate complete PDF report content. + + Returns plain text that can be converted to PDF. + For actual PDF generation, use a library like reportlab or weasyprint. + """ + sections = [] + + # Header + sections.append("=" * 70) + sections.append("TECHNICAL SURVEILLANCE COUNTERMEASURES (TSCM) SWEEP REPORT") + sections.append("=" * 70) + sections.append("") + sections.append(f"Report ID: {report.report_id}") + sections.append(f"Generated: {report.generated_at.strftime('%Y-%m-%d %H:%M:%S')}") + sections.append(f"Sweep ID: {report.sweep_id}") + sections.append("") + + # Executive Summary + sections.append("-" * 70) + sections.append("EXECUTIVE SUMMARY") + sections.append("-" * 70) + sections.append(report.executive_summary or generate_executive_summary(report)) + sections.append("") + + # High Interest Findings + if report.high_interest_findings: + sections.append("-" * 70) + sections.append(generate_findings_section( + report.high_interest_findings, + "HIGH INTEREST FINDINGS" + )) + + # Needs Review Findings + if report.needs_review_findings: + sections.append("-" * 70) + sections.append(generate_findings_section( + report.needs_review_findings, + "FINDINGS REQUIRING REVIEW" + )) + + # Meeting Window Summary + if report.meeting_summaries: + sections.append("-" * 70) + sections.append(generate_meeting_section(report.meeting_summaries)) + + # Capabilities & Limitations + sections.append("-" * 70) + sections.append("SWEEP CAPABILITIES & LIMITATIONS") + sections.append("=" * 33) + sections.append("") + + if report.capabilities: + caps = report.capabilities + sections.append("Equipment Used:") + if caps.get('wifi', {}).get('mode') != 'unavailable': + sections.append(f" - WiFi: {caps.get('wifi', {}).get('mode', 'unknown')} mode") + if caps.get('bluetooth', {}).get('mode') != 'unavailable': + sections.append(f" - Bluetooth: {caps.get('bluetooth', {}).get('mode', 'unknown')}") + if caps.get('rf', {}).get('available'): + sections.append(f" - RF/SDR: {caps.get('rf', {}).get('device_type', 'unknown')}") + sections.append("") + + if report.limitations: + sections.append("Limitations:") + for limit in report.limitations: + sections.append(f" - {limit}") + sections.append("") + + # Disclaimer + sections.append("-" * 70) + sections.append(REPORT_DISCLAIMER) + + # Footer + sections.append("") + sections.append("=" * 70) + sections.append("END OF REPORT") + sections.append("=" * 70) + + return "\n".join(sections) + + +def generate_technical_annex_json(report: TSCMReport) -> dict: + """ + Generate technical annex as JSON. + + Contains detailed device timelines, all indicators, and raw data + for audit and further analysis. + """ + return { + 'annex_type': 'tscm_technical_annex', + 'report_id': report.report_id, + 'generated_at': report.generated_at.isoformat(), + 'sweep_id': report.sweep_id, + 'disclaimer': ANNEX_DISCLAIMER.strip(), + + 'sweep_details': { + 'type': report.sweep_type, + 'location': report.location, + 'start_time': report.sweep_start.isoformat() if report.sweep_start else None, + 'end_time': report.sweep_end.isoformat() if report.sweep_end else None, + 'duration_minutes': report.duration_minutes, + 'baseline_id': report.baseline_id, + 'baseline_name': report.baseline_name, + }, + + 'capabilities': report.capabilities, + 'limitations': report.limitations, + + 'statistics': { + 'total_devices': report.total_devices_scanned, + 'wifi_devices': report.wifi_devices, + 'bluetooth_devices': report.bluetooth_devices, + 'rf_signals': report.rf_signals, + 'new_devices': report.new_devices, + 'missing_devices': report.missing_devices, + 'high_interest_count': len(report.high_interest_findings), + 'needs_review_count': len(report.needs_review_findings), + 'informational_count': len(report.informational_findings), + }, + + 'findings': { + 'high_interest': [ + { + 'identifier': f.identifier, + 'protocol': f.protocol, + 'name': f.name, + 'risk_score': f.risk_score, + 'description': f.description, + 'indicators': f.indicators, + 'recommended_action': f.recommended_action, + } + for f in report.high_interest_findings + ], + 'needs_review': [ + { + 'identifier': f.identifier, + 'protocol': f.protocol, + 'name': f.name, + 'risk_score': f.risk_score, + 'description': f.description, + 'indicators': f.indicators, + } + for f in report.needs_review_findings + ], + }, + + 'meeting_windows': [ + { + 'name': m.name, + 'start_time': m.start_time, + 'end_time': m.end_time, + 'duration_minutes': m.duration_minutes, + 'devices_first_seen': m.devices_first_seen, + 'behavior_changes': m.behavior_changes, + 'high_interest_devices': m.high_interest_devices, + } + for m in report.meeting_summaries + ], + + 'device_timelines': report.device_timelines, + 'all_indicators': report.all_indicators, + 'baseline_diff': report.baseline_diff, + 'correlations': report.correlation_data, + } + + +def generate_technical_annex_csv(report: TSCMReport) -> str: + """ + Generate device timeline data as CSV. + + Provides spreadsheet-compatible format for further analysis. + """ + output = io.StringIO() + writer = csv.writer(output) + + # Header + writer.writerow([ + 'identifier', + 'protocol', + 'name', + 'risk_level', + 'risk_score', + 'first_seen', + 'last_seen', + 'observation_count', + 'rssi_min', + 'rssi_max', + 'rssi_mean', + 'rssi_stability', + 'movement_pattern', + 'meeting_correlated', + 'indicators', + ]) + + # Device data from timelines + for timeline in report.device_timelines: + indicators_str = '; '.join( + f"{i.get('type', '')}({i.get('score', 0)})" + for i in timeline.get('indicators', []) + ) + + signal = timeline.get('signal', {}) + metrics = timeline.get('metrics', {}) + movement = timeline.get('movement', {}) + meeting = timeline.get('meeting_correlation', {}) + + writer.writerow([ + timeline.get('identifier', ''), + timeline.get('protocol', ''), + timeline.get('name', ''), + timeline.get('risk_level', 'informational'), + timeline.get('risk_score', 0), + metrics.get('first_seen', ''), + metrics.get('last_seen', ''), + metrics.get('total_observations', 0), + signal.get('rssi_min', ''), + signal.get('rssi_max', ''), + signal.get('rssi_mean', ''), + signal.get('stability', ''), + movement.get('pattern', ''), + meeting.get('correlated', False), + indicators_str, + ]) + + # Also add findings summary + writer.writerow([]) + writer.writerow(['--- FINDINGS SUMMARY ---']) + writer.writerow(['identifier', 'protocol', 'risk_level', 'risk_score', 'description', 'recommended_action']) + + all_findings = ( + report.high_interest_findings + + report.needs_review_findings + ) + + for finding in all_findings: + writer.writerow([ + finding.identifier, + finding.protocol, + finding.risk_level, + finding.risk_score, + finding.description, + finding.recommended_action, + ]) + + return output.getvalue() + + +# ============================================================================= +# Report Builder +# ============================================================================= + +class TSCMReportBuilder: + """ + Builder for constructing TSCM reports from sweep data. + + Usage: + builder = TSCMReportBuilder(sweep_id=123) + builder.set_location("Conference Room A") + builder.add_capabilities(capabilities_dict) + builder.add_finding(finding) + report = builder.build() + """ + + def __init__(self, sweep_id: int): + self.sweep_id = sweep_id + self.report = TSCMReport( + report_id=f"TSCM-{sweep_id}-{datetime.now().strftime('%Y%m%d%H%M%S')}", + generated_at=datetime.now(), + sweep_id=sweep_id, + sweep_type='standard', + ) + + def set_sweep_type(self, sweep_type: str) -> 'TSCMReportBuilder': + self.report.sweep_type = sweep_type + return self + + def set_location(self, location: str) -> 'TSCMReportBuilder': + self.report.location = location + return self + + def set_baseline(self, baseline_id: int, baseline_name: str) -> 'TSCMReportBuilder': + self.report.baseline_id = baseline_id + self.report.baseline_name = baseline_name + return self + + def set_sweep_times( + self, + start: datetime, + end: Optional[datetime] = None + ) -> 'TSCMReportBuilder': + self.report.sweep_start = start + self.report.sweep_end = end or datetime.now() + self.report.duration_minutes = ( + (self.report.sweep_end - self.report.sweep_start).total_seconds() / 60 + ) + return self + + def add_capabilities(self, capabilities: dict) -> 'TSCMReportBuilder': + self.report.capabilities = capabilities + self.report.limitations = capabilities.get('all_limitations', []) + return self + + def add_finding(self, finding: ReportFinding) -> 'TSCMReportBuilder': + if finding.risk_level == 'high_interest': + self.report.high_interest_findings.append(finding) + elif finding.risk_level in ['review', 'needs_review']: + self.report.needs_review_findings.append(finding) + else: + self.report.informational_findings.append(finding) + return self + + def add_findings_from_profiles(self, profiles: list[dict]) -> 'TSCMReportBuilder': + """Add findings from correlation engine device profiles.""" + for profile in profiles: + finding = ReportFinding( + identifier=profile.get('identifier', ''), + protocol=profile.get('protocol', ''), + name=profile.get('name'), + risk_level=profile.get('risk_level', 'informational'), + risk_score=profile.get('total_score', 0), + description=self._generate_finding_description(profile), + indicators=profile.get('indicators', []), + recommended_action=profile.get('recommended_action', 'monitor'), + playbook_reference=self._get_playbook_reference(profile), + ) + self.add_finding(finding) + + return self + + def _generate_finding_description(self, profile: dict) -> str: + """Generate description from profile indicators.""" + indicators = profile.get('indicators', []) + if not indicators: + return f"{profile.get('protocol', 'Unknown').upper()} device detected" + + # Use first indicator as primary description + primary = indicators[0] + desc = primary.get('description', 'Pattern detected') + + if len(indicators) > 1: + desc += f" (+{len(indicators) - 1} additional indicators)" + + return desc + + def _get_playbook_reference(self, profile: dict) -> str: + """Get playbook reference based on profile.""" + risk_level = profile.get('risk_level', 'informational') + indicators = profile.get('indicators', []) + + # Check for tracker + tracker_types = ['airtag_detected', 'tile_detected', 'smarttag_detected', 'known_tracker'] + if any(i.get('type') in tracker_types for i in indicators): + if risk_level == 'high_interest': + return 'PB-001 (Tracker Detection)' + + if risk_level == 'high_interest': + return 'PB-002 (Suspicious Device)' + elif risk_level in ['review', 'needs_review']: + return 'PB-003 (Unknown Device)' + + return '' + + def add_meeting_summary(self, summary: dict) -> 'TSCMReportBuilder': + """Add meeting window summary.""" + meeting = ReportMeetingSummary( + name=summary.get('name'), + start_time=summary.get('start_time', ''), + end_time=summary.get('end_time'), + duration_minutes=summary.get('duration_minutes', 0), + devices_first_seen=summary.get('devices_first_seen', 0), + behavior_changes=summary.get('behavior_changes', 0), + high_interest_devices=summary.get('high_interest_devices', 0), + ) + self.report.meeting_summaries.append(meeting) + return self + + def add_statistics( + self, + wifi: int = 0, + bluetooth: int = 0, + rf: int = 0, + new: int = 0, + missing: int = 0 + ) -> 'TSCMReportBuilder': + self.report.wifi_devices = wifi + self.report.bluetooth_devices = bluetooth + self.report.rf_signals = rf + self.report.total_devices_scanned = wifi + bluetooth + rf + self.report.new_devices = new + self.report.missing_devices = missing + return self + + def add_device_timelines(self, timelines: list[dict]) -> 'TSCMReportBuilder': + self.report.device_timelines = timelines + return self + + def add_all_indicators(self, indicators: list[dict]) -> 'TSCMReportBuilder': + self.report.all_indicators = indicators + return self + + def add_baseline_diff(self, diff: dict) -> 'TSCMReportBuilder': + self.report.baseline_diff = diff + return self + + def add_correlations(self, correlations: list[dict]) -> 'TSCMReportBuilder': + self.report.correlation_data = correlations + return self + + def build(self) -> TSCMReport: + """Build and return the complete report.""" + # Calculate overall risk assessment + if self.report.high_interest_findings: + if len(self.report.high_interest_findings) >= 3: + self.report.overall_risk_assessment = 'high' + else: + self.report.overall_risk_assessment = 'elevated' + elif self.report.needs_review_findings: + self.report.overall_risk_assessment = 'moderate' + else: + self.report.overall_risk_assessment = 'low' + + self.report.key_findings_count = ( + len(self.report.high_interest_findings) + + len(self.report.needs_review_findings) + ) + + # Generate executive summary + self.report.executive_summary = generate_executive_summary(self.report) + + return self.report + + +# ============================================================================= +# Report Generation API Functions +# ============================================================================= + +def generate_report( + sweep_id: int, + sweep_data: dict, + device_profiles: list[dict], + capabilities: dict, + timelines: list[dict], + baseline_diff: Optional[dict] = None, + meeting_summaries: Optional[list[dict]] = None, + correlations: Optional[list[dict]] = None, +) -> TSCMReport: + """ + Generate a complete TSCM report from sweep data. + + Args: + sweep_id: Sweep ID + sweep_data: Sweep dict from database + device_profiles: List of DeviceProfile dicts from correlation engine + capabilities: Capabilities dict + timelines: Device timeline dicts + baseline_diff: Optional baseline diff dict + meeting_summaries: Optional meeting summaries + correlations: Optional correlation data + + Returns: + Complete TSCMReport + """ + builder = TSCMReportBuilder(sweep_id) + + # Basic info + builder.set_sweep_type(sweep_data.get('sweep_type', 'standard')) + + # Parse times + started_at = sweep_data.get('started_at') + completed_at = sweep_data.get('completed_at') + if started_at: + if isinstance(started_at, str): + started_at = datetime.fromisoformat(started_at.replace('Z', '+00:00')).replace(tzinfo=None) + if completed_at: + if isinstance(completed_at, str): + completed_at = datetime.fromisoformat(completed_at.replace('Z', '+00:00')).replace(tzinfo=None) + builder.set_sweep_times(started_at, completed_at) + + # Capabilities + builder.add_capabilities(capabilities) + + # Add findings from profiles + builder.add_findings_from_profiles(device_profiles) + + # Statistics + results = sweep_data.get('results', {}) + builder.add_statistics( + wifi=len(results.get('wifi', [])), + bluetooth=len(results.get('bluetooth', [])), + rf=len(results.get('rf', [])), + new=baseline_diff.get('summary', {}).get('new_devices', 0) if baseline_diff else 0, + missing=baseline_diff.get('summary', {}).get('missing_devices', 0) if baseline_diff else 0, + ) + + # Technical data + builder.add_device_timelines(timelines) + + if baseline_diff: + builder.add_baseline_diff(baseline_diff) + + if meeting_summaries: + for summary in meeting_summaries: + builder.add_meeting_summary(summary) + + if correlations: + builder.add_correlations(correlations) + + # Extract all indicators + all_indicators = [] + for profile in device_profiles: + for ind in profile.get('indicators', []): + all_indicators.append({ + 'device': profile.get('identifier'), + 'protocol': profile.get('protocol'), + **ind + }) + builder.add_all_indicators(all_indicators) + + return builder.build() + + +def get_pdf_report(report: TSCMReport) -> str: + """Get PDF-ready report content.""" + return generate_pdf_content(report) + + +def get_json_annex(report: TSCMReport) -> dict: + """Get JSON technical annex.""" + return generate_technical_annex_json(report) + + +def get_csv_annex(report: TSCMReport) -> str: + """Get CSV technical annex.""" + return generate_technical_annex_csv(report)