diff --git a/routes/tscm.py b/routes/tscm.py index ec2a8e6..6493cad 100644 --- a/routes/tscm.py +++ b/routes/tscm.py @@ -2274,3 +2274,962 @@ def get_cluster_detail(cluster_id: str): except Exception as e: logger.error(f"Get cluster detail error: {e}") return jsonify({'status': 'error', 'message': str(e)}), 500 + + +# ============================================================================= +# Capabilities & Coverage Endpoints +# ============================================================================= + +@tscm_bp.route('/capabilities') +def get_capabilities(): + """ + Get current system capabilities for TSCM sweeping. + + Returns what the system CAN and CANNOT detect based on OS, + privileges, adapters, and SDR hardware. + """ + try: + from utils.tscm.advanced import detect_sweep_capabilities + + wifi_interface = request.args.get('wifi_interface', '') + bt_adapter = request.args.get('bt_adapter', '') + + caps = detect_sweep_capabilities( + wifi_interface=wifi_interface, + bt_adapter=bt_adapter + ) + + return jsonify({ + 'status': 'success', + 'capabilities': caps.to_dict() + }) + + except Exception as e: + logger.error(f"Get capabilities error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/sweep//capabilities') +def get_sweep_stored_capabilities(sweep_id: int): + """Get stored capabilities for a specific sweep.""" + from utils.database import get_sweep_capabilities + + caps = get_sweep_capabilities(sweep_id) + if not caps: + return jsonify({'status': 'error', 'message': 'No capabilities stored for this sweep'}), 404 + + return jsonify({ + 'status': 'success', + 'capabilities': caps + }) + + +# ============================================================================= +# Baseline Diff & Health Endpoints +# ============================================================================= + +@tscm_bp.route('/baseline/diff//') +def get_baseline_diff(baseline_id: int, sweep_id: int): + """ + Get comprehensive diff between a baseline and a sweep. + + Shows new devices, missing devices, changed characteristics, + and baseline health assessment. + """ + try: + from utils.tscm.advanced import calculate_baseline_diff + + baseline = get_tscm_baseline(baseline_id) + if not baseline: + return jsonify({'status': 'error', 'message': 'Baseline not found'}), 404 + + sweep = get_tscm_sweep(sweep_id) + if not sweep: + return jsonify({'status': 'error', 'message': 'Sweep not found'}), 404 + + # Get current devices from sweep results + results = sweep.get('results', {}) + if isinstance(results, str): + import json + results = json.loads(results) + + current_wifi = results.get('wifi', []) + current_bt = results.get('bluetooth', []) + current_rf = results.get('rf', []) + + diff = calculate_baseline_diff( + baseline=baseline, + current_wifi=current_wifi, + current_bt=current_bt, + current_rf=current_rf, + sweep_id=sweep_id + ) + + return jsonify({ + 'status': 'success', + 'diff': diff.to_dict() + }) + + except Exception as e: + logger.error(f"Get baseline diff error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/baseline//health') +def get_baseline_health(baseline_id: int): + """Get health assessment for a baseline.""" + try: + from utils.tscm.advanced import BaselineHealth + from datetime import datetime + + baseline = get_tscm_baseline(baseline_id) + if not baseline: + return jsonify({'status': 'error', 'message': 'Baseline not found'}), 404 + + # Calculate age + created_at = baseline.get('created_at') + age_hours = 0 + if created_at: + if isinstance(created_at, str): + created = datetime.fromisoformat(created_at.replace('Z', '+00:00')) + age_hours = (datetime.now() - created.replace(tzinfo=None)).total_seconds() / 3600 + elif isinstance(created_at, datetime): + age_hours = (datetime.now() - created_at).total_seconds() / 3600 + + # Count devices + total_devices = ( + len(baseline.get('wifi_networks', [])) + + len(baseline.get('bt_devices', [])) + + len(baseline.get('rf_frequencies', [])) + ) + + # Determine health + health = 'healthy' + score = 1.0 + reasons = [] + + if age_hours > 168: + health = 'stale' + score = 0.3 + reasons.append(f'Baseline is {age_hours:.0f} hours old (over 1 week)') + elif age_hours > 72: + health = 'noisy' + score = 0.6 + reasons.append(f'Baseline is {age_hours:.0f} hours old (over 3 days)') + + if total_devices < 3: + score -= 0.2 + reasons.append(f'Baseline has few devices ({total_devices})') + if health == 'healthy': + health = 'noisy' + + return jsonify({ + 'status': 'success', + 'health': { + 'status': health, + 'score': round(max(0, score), 2), + 'age_hours': round(age_hours, 1), + 'total_devices': total_devices, + 'reasons': reasons, + } + }) + + except Exception as e: + logger.error(f"Get baseline health error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +# ============================================================================= +# Device Timeline Endpoints +# ============================================================================= + +@tscm_bp.route('/device//timeline') +def get_device_timeline_endpoint(identifier: str): + """ + Get timeline of observations for a device. + + Shows behavior over time including RSSI stability, presence, + and meeting window correlation. + """ + try: + from utils.tscm.advanced import get_timeline_manager + from utils.database import get_device_timeline + + protocol = request.args.get('protocol', 'bluetooth') + since_hours = request.args.get('since_hours', 24, type=int) + + # Try in-memory timeline first + manager = get_timeline_manager() + timeline = manager.get_timeline(identifier, protocol) + + # Also get stored timeline from database + stored = get_device_timeline(identifier, since_hours=since_hours) + + result = { + 'identifier': identifier, + 'protocol': protocol, + 'observations': stored, + } + + if timeline: + result['metrics'] = { + 'first_seen': timeline.first_seen.isoformat() if timeline.first_seen else None, + 'last_seen': timeline.last_seen.isoformat() if timeline.last_seen else None, + 'total_observations': timeline.total_observations, + 'presence_ratio': round(timeline.presence_ratio, 2), + } + result['signal'] = { + 'rssi_min': timeline.rssi_min, + 'rssi_max': timeline.rssi_max, + 'rssi_mean': round(timeline.rssi_mean, 1) if timeline.rssi_mean else None, + 'stability': round(timeline.rssi_stability, 2), + } + result['movement'] = { + 'appears_stationary': timeline.appears_stationary, + 'pattern': timeline.movement_pattern, + } + result['meeting_correlation'] = { + 'correlated': timeline.meeting_correlated, + 'observations_during_meeting': timeline.meeting_observations, + } + + return jsonify({ + 'status': 'success', + 'timeline': result + }) + + except Exception as e: + logger.error(f"Get device timeline error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/timelines') +def get_all_device_timelines(): + """Get all device timelines.""" + try: + from utils.tscm.advanced import get_timeline_manager + + manager = get_timeline_manager() + timelines = manager.get_all_timelines() + + return jsonify({ + 'status': 'success', + 'count': len(timelines), + 'timelines': [t.to_dict() for t in timelines] + }) + + except Exception as e: + logger.error(f"Get all timelines error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +# ============================================================================= +# Known-Good Registry (Whitelist) Endpoints +# ============================================================================= + +@tscm_bp.route('/known-devices', methods=['GET']) +def list_known_devices(): + """List all known-good devices.""" + from utils.database import get_all_known_devices + + location = request.args.get('location') + scope = request.args.get('scope') + + devices = get_all_known_devices(location=location, scope=scope) + + return jsonify({ + 'status': 'success', + 'count': len(devices), + 'devices': devices + }) + + +@tscm_bp.route('/known-devices', methods=['POST']) +def add_known_device_endpoint(): + """ + Add a device to the known-good registry. + + Known devices remain visible but receive reduced risk scores. + They are NOT suppressed from reports (preserves audit trail). + """ + from utils.database import add_known_device + + data = request.get_json() or {} + + identifier = data.get('identifier') + protocol = data.get('protocol') + + if not identifier or not protocol: + return jsonify({ + 'status': 'error', + 'message': 'identifier and protocol are required' + }), 400 + + device_id = add_known_device( + identifier=identifier, + protocol=protocol, + name=data.get('name'), + description=data.get('description'), + location=data.get('location'), + scope=data.get('scope', 'global'), + added_by=data.get('added_by'), + score_modifier=data.get('score_modifier', -2), + metadata=data.get('metadata') + ) + + return jsonify({ + 'status': 'success', + 'message': 'Device added to known-good registry', + 'device_id': device_id + }) + + +@tscm_bp.route('/known-devices/', methods=['GET']) +def get_known_device_endpoint(identifier: str): + """Get a known device by identifier.""" + from utils.database import get_known_device + + device = get_known_device(identifier) + if not device: + return jsonify({'status': 'error', 'message': 'Device not found'}), 404 + + return jsonify({ + 'status': 'success', + 'device': device + }) + + +@tscm_bp.route('/known-devices/', methods=['DELETE']) +def delete_known_device_endpoint(identifier: str): + """Remove a device from the known-good registry.""" + from utils.database import delete_known_device + + success = delete_known_device(identifier) + if not success: + return jsonify({'status': 'error', 'message': 'Device not found'}), 404 + + return jsonify({ + 'status': 'success', + 'message': 'Device removed from known-good registry' + }) + + +@tscm_bp.route('/known-devices/check/') +def check_known_device(identifier: str): + """Check if a device is in the known-good registry.""" + from utils.database import is_known_good_device + + location = request.args.get('location') + result = is_known_good_device(identifier, location=location) + + return jsonify({ + 'status': 'success', + 'is_known': result is not None, + 'details': result + }) + + +# ============================================================================= +# Case Management Endpoints +# ============================================================================= + +@tscm_bp.route('/cases', methods=['GET']) +def list_cases(): + """List all TSCM cases.""" + from utils.database import get_all_tscm_cases + + status = request.args.get('status') + limit = request.args.get('limit', 50, type=int) + + cases = get_all_tscm_cases(status=status, limit=limit) + + return jsonify({ + 'status': 'success', + 'count': len(cases), + 'cases': cases + }) + + +@tscm_bp.route('/cases', methods=['POST']) +def create_case(): + """Create a new TSCM case.""" + from utils.database import create_tscm_case + + data = request.get_json() or {} + + name = data.get('name') + if not name: + return jsonify({'status': 'error', 'message': 'name is required'}), 400 + + case_id = create_tscm_case( + name=name, + description=data.get('description'), + location=data.get('location'), + priority=data.get('priority', 'normal'), + created_by=data.get('created_by'), + metadata=data.get('metadata') + ) + + return jsonify({ + 'status': 'success', + 'message': 'Case created', + 'case_id': case_id + }) + + +@tscm_bp.route('/cases/', methods=['GET']) +def get_case(case_id: int): + """Get a TSCM case with all linked sweeps, threats, and notes.""" + from utils.database import get_tscm_case + + case = get_tscm_case(case_id) + if not case: + return jsonify({'status': 'error', 'message': 'Case not found'}), 404 + + return jsonify({ + 'status': 'success', + 'case': case + }) + + +@tscm_bp.route('/cases/', methods=['PUT']) +def update_case(case_id: int): + """Update a TSCM case.""" + from utils.database import update_tscm_case + + data = request.get_json() or {} + + success = update_tscm_case( + case_id=case_id, + status=data.get('status'), + priority=data.get('priority'), + assigned_to=data.get('assigned_to'), + notes=data.get('notes') + ) + + if not success: + return jsonify({'status': 'error', 'message': 'Case not found'}), 404 + + return jsonify({ + 'status': 'success', + 'message': 'Case updated' + }) + + +@tscm_bp.route('/cases//sweeps/', methods=['POST']) +def link_sweep_to_case(case_id: int, sweep_id: int): + """Link a sweep to a case.""" + from utils.database import add_sweep_to_case + + success = add_sweep_to_case(case_id, sweep_id) + + return jsonify({ + 'status': 'success' if success else 'error', + 'message': 'Sweep linked to case' if success else 'Already linked or not found' + }) + + +@tscm_bp.route('/cases//threats/', methods=['POST']) +def link_threat_to_case(case_id: int, threat_id: int): + """Link a threat to a case.""" + from utils.database import add_threat_to_case + + success = add_threat_to_case(case_id, threat_id) + + return jsonify({ + 'status': 'success' if success else 'error', + 'message': 'Threat linked to case' if success else 'Already linked or not found' + }) + + +@tscm_bp.route('/cases//notes', methods=['POST']) +def add_note_to_case(case_id: int): + """Add a note to a case.""" + from utils.database import add_case_note + + data = request.get_json() or {} + + content = data.get('content') + if not content: + return jsonify({'status': 'error', 'message': 'content is required'}), 400 + + note_id = add_case_note( + case_id=case_id, + content=content, + note_type=data.get('note_type', 'general'), + created_by=data.get('created_by') + ) + + return jsonify({ + 'status': 'success', + 'message': 'Note added', + 'note_id': note_id + }) + + +# ============================================================================= +# Meeting Window Enhanced Endpoints +# ============================================================================= + +@tscm_bp.route('/meeting/start-tracked', methods=['POST']) +def start_tracked_meeting(): + """ + Start a tracked meeting window with database persistence. + + Tracks devices first seen during meeting and behavior changes. + """ + from utils.database import start_meeting_window + from utils.tscm.advanced import get_timeline_manager + + data = request.get_json() or {} + + meeting_id = start_meeting_window( + sweep_id=_current_sweep_id, + name=data.get('name'), + location=data.get('location'), + notes=data.get('notes') + ) + + # Start meeting in correlation engine + correlation = get_correlation_engine() + correlation.start_meeting_window() + + # Start in timeline manager + manager = get_timeline_manager() + manager.start_meeting_window() + + _emit_event('meeting_started', { + 'meeting_id': meeting_id, + 'timestamp': datetime.now().isoformat(), + 'name': data.get('name'), + }) + + return jsonify({ + 'status': 'success', + 'message': 'Tracked meeting window started', + 'meeting_id': meeting_id + }) + + +@tscm_bp.route('/meeting//end', methods=['POST']) +def end_tracked_meeting(meeting_id: int): + """End a tracked meeting window.""" + from utils.database import end_meeting_window + from utils.tscm.advanced import get_timeline_manager + + success = end_meeting_window(meeting_id) + if not success: + return jsonify({'status': 'error', 'message': 'Meeting not found or already ended'}), 404 + + # End in correlation engine + correlation = get_correlation_engine() + correlation.end_meeting_window() + + # End in timeline manager + manager = get_timeline_manager() + manager.end_meeting_window() + + _emit_event('meeting_ended', { + 'meeting_id': meeting_id, + 'timestamp': datetime.now().isoformat() + }) + + return jsonify({ + 'status': 'success', + 'message': 'Meeting window ended' + }) + + +@tscm_bp.route('/meeting//summary') +def get_meeting_summary_endpoint(meeting_id: int): + """Get detailed summary of device activity during a meeting.""" + try: + from utils.database import get_meeting_windows + from utils.tscm.advanced import generate_meeting_summary, get_timeline_manager + + # Get meeting window + windows = get_meeting_windows(_current_sweep_id or 0) + meeting = None + for w in windows: + if w.get('id') == meeting_id: + meeting = w + break + + if not meeting: + return jsonify({'status': 'error', 'message': 'Meeting not found'}), 404 + + # Get timelines and profiles + manager = get_timeline_manager() + timelines = manager.get_all_timelines() + + correlation = get_correlation_engine() + profiles = [p.to_dict() for p in correlation.device_profiles.values()] + + summary = generate_meeting_summary(meeting, timelines, profiles) + + return jsonify({ + 'status': 'success', + 'summary': summary.to_dict() + }) + + except Exception as e: + logger.error(f"Get meeting summary error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/meeting/active') +def get_active_meeting(): + """Get currently active meeting window.""" + from utils.database import get_active_meeting_window + + meeting = get_active_meeting_window(_current_sweep_id) + + return jsonify({ + 'status': 'success', + 'meeting': meeting, + 'is_active': meeting is not None + }) + + +# ============================================================================= +# PDF Report & Technical Annex Endpoints +# ============================================================================= + +@tscm_bp.route('/report/pdf') +def get_pdf_report(): + """ + Generate client-safe PDF report. + + Contains executive summary, findings by risk tier, meeting window + summary, and mandatory disclaimers. + """ + try: + from utils.tscm.reports import generate_report, get_pdf_report + from utils.tscm.advanced import detect_sweep_capabilities, get_timeline_manager + + sweep_id = request.args.get('sweep_id', _current_sweep_id, type=int) + if not sweep_id: + return jsonify({'status': 'error', 'message': 'No sweep specified'}), 400 + + sweep = get_tscm_sweep(sweep_id) + if not sweep: + return jsonify({'status': 'error', 'message': 'Sweep not found'}), 404 + + # Get data for report + correlation = get_correlation_engine() + profiles = [p.to_dict() for p in correlation.device_profiles.values()] + caps = detect_sweep_capabilities().to_dict() + + manager = get_timeline_manager() + timelines = [t.to_dict() for t in manager.get_all_timelines()] + + # Generate report + report = generate_report( + sweep_id=sweep_id, + sweep_data=sweep, + device_profiles=profiles, + capabilities=caps, + timelines=timelines + ) + + pdf_content = get_pdf_report(report) + + return Response( + pdf_content, + mimetype='text/plain', + headers={ + 'Content-Disposition': f'attachment; filename=tscm_report_{sweep_id}.txt' + } + ) + + except Exception as e: + logger.error(f"Generate PDF report error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/report/annex') +def get_technical_annex(): + """ + Generate technical annex (JSON + CSV). + + Contains device timelines, all indicators, and detailed data + for audit purposes. No packet data included. + """ + try: + from utils.tscm.reports import generate_report, get_json_annex, get_csv_annex + from utils.tscm.advanced import detect_sweep_capabilities, get_timeline_manager + + sweep_id = request.args.get('sweep_id', _current_sweep_id, type=int) + format_type = request.args.get('format', 'json') + + if not sweep_id: + return jsonify({'status': 'error', 'message': 'No sweep specified'}), 400 + + sweep = get_tscm_sweep(sweep_id) + if not sweep: + return jsonify({'status': 'error', 'message': 'Sweep not found'}), 404 + + # Get data for report + correlation = get_correlation_engine() + profiles = [p.to_dict() for p in correlation.device_profiles.values()] + caps = detect_sweep_capabilities().to_dict() + + manager = get_timeline_manager() + timelines = [t.to_dict() for t in manager.get_all_timelines()] + + # Generate report + report = generate_report( + sweep_id=sweep_id, + sweep_data=sweep, + device_profiles=profiles, + capabilities=caps, + timelines=timelines + ) + + if format_type == 'csv': + csv_content = get_csv_annex(report) + return Response( + csv_content, + mimetype='text/csv', + headers={ + 'Content-Disposition': f'attachment; filename=tscm_annex_{sweep_id}.csv' + } + ) + else: + annex = get_json_annex(report) + return jsonify({ + 'status': 'success', + 'annex': annex + }) + + except Exception as e: + logger.error(f"Generate technical annex error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +# ============================================================================= +# WiFi Advanced Indicators Endpoints +# ============================================================================= + +@tscm_bp.route('/wifi/advanced-indicators') +def get_wifi_advanced_indicators(): + """ + Get advanced WiFi indicators (Evil Twin, Probes, Deauth). + + These indicators require analysis of WiFi patterns. + Some features require monitor mode. + """ + try: + from utils.tscm.advanced import get_wifi_detector + + detector = get_wifi_detector() + + return jsonify({ + 'status': 'success', + 'indicators': detector.get_all_indicators(), + 'unavailable_features': detector.get_unavailable_features(), + 'disclaimer': ( + "All indicators represent pattern detections, NOT confirmed attacks. " + "Further investigation is required." + ) + }) + + except Exception as e: + logger.error(f"Get WiFi indicators error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/wifi/analyze-network', methods=['POST']) +def analyze_wifi_network(): + """ + Analyze a WiFi network for evil twin patterns. + + Compares against known networks to detect SSID spoofing. + """ + try: + from utils.tscm.advanced import get_wifi_detector + + data = request.get_json() or {} + detector = get_wifi_detector() + + # Set known networks from baseline if available + baseline = get_active_tscm_baseline() + if baseline: + detector.set_known_networks(baseline.get('wifi_networks', [])) + + indicators = detector.analyze_network(data) + + return jsonify({ + 'status': 'success', + 'indicators': [i.to_dict() for i in indicators] + }) + + except Exception as e: + logger.error(f"Analyze WiFi network error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +# ============================================================================= +# Bluetooth Risk Explainability Endpoints +# ============================================================================= + +@tscm_bp.route('/bluetooth//explain') +def explain_bluetooth_risk(identifier: str): + """ + Get human-readable risk explanation for a BLE device. + + Includes proximity estimate, tracker explanation, and + recommended actions. + """ + try: + from utils.tscm.advanced import generate_ble_risk_explanation + + # Get device from correlation engine + correlation = get_correlation_engine() + profile = None + key = f"bluetooth:{identifier.upper()}" + if key in correlation.device_profiles: + profile = correlation.device_profiles[key].to_dict() + + # Try to find device info + device = {'mac': identifier} + if profile: + device['name'] = profile.get('name') + device['rssi'] = profile.get('rssi_samples', [None])[-1] if profile.get('rssi_samples') else None + + # Check meeting status + is_meeting = correlation.is_during_meeting() + + explanation = generate_ble_risk_explanation(device, profile, is_meeting) + + return jsonify({ + 'status': 'success', + 'explanation': explanation.to_dict() + }) + + except Exception as e: + logger.error(f"Explain BLE risk error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/bluetooth//proximity') +def get_bluetooth_proximity(identifier: str): + """Get proximity estimate for a BLE device.""" + try: + from utils.tscm.advanced import estimate_ble_proximity + + rssi = request.args.get('rssi', type=int) + if rssi is None: + # Try to get from correlation engine + correlation = get_correlation_engine() + key = f"bluetooth:{identifier.upper()}" + if key in correlation.device_profiles: + profile = correlation.device_profiles[key] + if profile.rssi_samples: + rssi = profile.rssi_samples[-1] + + if rssi is None: + return jsonify({ + 'status': 'error', + 'message': 'RSSI value required' + }), 400 + + proximity, explanation, distance = estimate_ble_proximity(rssi) + + return jsonify({ + 'status': 'success', + 'proximity': { + 'estimate': proximity.value, + 'explanation': explanation, + 'estimated_distance': distance, + 'rssi_used': rssi, + }, + 'disclaimer': ( + "Proximity estimates are approximate and affected by " + "environment, obstacles, and device characteristics." + ) + }) + + except Exception as e: + logger.error(f"Get BLE proximity error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +# ============================================================================= +# Operator Playbook Endpoints +# ============================================================================= + +@tscm_bp.route('/playbooks') +def list_playbooks(): + """List all available operator playbooks.""" + try: + from utils.tscm.advanced import PLAYBOOKS + + return jsonify({ + 'status': 'success', + 'playbooks': { + pid: pb.to_dict() + for pid, pb in PLAYBOOKS.items() + } + }) + + except Exception as e: + logger.error(f"List playbooks error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/playbooks/') +def get_playbook(playbook_id: str): + """Get a specific playbook.""" + try: + from utils.tscm.advanced import PLAYBOOKS + + if playbook_id not in PLAYBOOKS: + return jsonify({'status': 'error', 'message': 'Playbook not found'}), 404 + + return jsonify({ + 'status': 'success', + 'playbook': PLAYBOOKS[playbook_id].to_dict() + }) + + except Exception as e: + logger.error(f"Get playbook error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/findings//playbook') +def get_finding_playbook(identifier: str): + """Get recommended playbook for a specific finding.""" + try: + from utils.tscm.advanced import get_playbook_for_finding + + # Get profile + correlation = get_correlation_engine() + profile = None + + for protocol in ['bluetooth', 'wifi', 'rf']: + key = f"{protocol}:{identifier.upper()}" + if key in correlation.device_profiles: + profile = correlation.device_profiles[key].to_dict() + break + + if not profile: + return jsonify({'status': 'error', 'message': 'Finding not found'}), 404 + + playbook = get_playbook_for_finding( + risk_level=profile.get('risk_level', 'informational'), + indicators=profile.get('indicators', []) + ) + + return jsonify({ + 'status': 'success', + 'playbook': playbook.to_dict(), + 'suggested_next_steps': [ + f"Step {s.step_number}: {s.action}" + for s in playbook.steps[:3] + ] + }) + + except Exception as e: + logger.error(f"Get finding playbook error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 diff --git a/utils/database.py b/utils/database.py index be79f53..fb9012c 100644 --- a/utils/database.py +++ b/utils/database.py @@ -178,6 +178,123 @@ def init_db() -> None: ) ''') + # TSCM Device Timelines - Periodic observations per device + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_device_timelines ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + device_identifier TEXT NOT NULL, + protocol TEXT NOT NULL, + sweep_id INTEGER, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + rssi INTEGER, + presence BOOLEAN DEFAULT 1, + channel INTEGER, + frequency REAL, + attributes TEXT, + FOREIGN KEY (sweep_id) REFERENCES tscm_sweeps(id) + ) + ''') + + # TSCM Known-Good Registry - Whitelist of expected devices + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_known_devices ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + identifier TEXT NOT NULL UNIQUE, + protocol TEXT NOT NULL, + name TEXT, + description TEXT, + location TEXT, + scope TEXT DEFAULT 'global', + added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + added_by TEXT, + last_verified TIMESTAMP, + score_modifier INTEGER DEFAULT -2, + metadata TEXT + ) + ''') + + # TSCM Cases - Grouping sweeps, threats, and notes + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_cases ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + description TEXT, + location TEXT, + status TEXT DEFAULT 'open', + priority TEXT DEFAULT 'normal', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + closed_at TIMESTAMP, + created_by TEXT, + assigned_to TEXT, + notes TEXT, + metadata TEXT + ) + ''') + + # TSCM Case Sweeps - Link sweeps to cases + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_case_sweeps ( + case_id INTEGER, + sweep_id INTEGER, + added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (case_id, sweep_id), + FOREIGN KEY (case_id) REFERENCES tscm_cases(id), + FOREIGN KEY (sweep_id) REFERENCES tscm_sweeps(id) + ) + ''') + + # TSCM Case Threats - Link threats to cases + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_case_threats ( + case_id INTEGER, + threat_id INTEGER, + added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (case_id, threat_id), + FOREIGN KEY (case_id) REFERENCES tscm_cases(id), + FOREIGN KEY (threat_id) REFERENCES tscm_threats(id) + ) + ''') + + # TSCM Case Notes - Notes attached to cases + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_case_notes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + case_id INTEGER, + content TEXT NOT NULL, + note_type TEXT DEFAULT 'general', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + created_by TEXT, + FOREIGN KEY (case_id) REFERENCES tscm_cases(id) + ) + ''') + + # TSCM Meeting Windows - Track sensitive periods + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_meeting_windows ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + sweep_id INTEGER, + name TEXT, + start_time TIMESTAMP NOT NULL, + end_time TIMESTAMP, + location TEXT, + notes TEXT, + FOREIGN KEY (sweep_id) REFERENCES tscm_sweeps(id) + ) + ''') + + # TSCM Sweep Capabilities - Store sweep capability snapshot + conn.execute(''' + CREATE TABLE IF NOT EXISTS tscm_sweep_capabilities ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + sweep_id INTEGER UNIQUE, + capabilities TEXT NOT NULL, + limitations TEXT, + recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (sweep_id) REFERENCES tscm_sweeps(id) + ) + ''') + # TSCM indexes for performance conn.execute(''' CREATE INDEX IF NOT EXISTS idx_tscm_threats_sweep @@ -194,6 +311,21 @@ def init_db() -> None: ON tscm_sweeps(baseline_id) ''') + conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_tscm_timelines_device + ON tscm_device_timelines(device_identifier, timestamp) + ''') + + conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_tscm_known_devices_identifier + ON tscm_known_devices(identifier) + ''') + + conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_tscm_cases_status + ON tscm_cases(status, created_at) + ''') + logger.info("Database initialized successfully") @@ -793,3 +925,507 @@ def get_tscm_threat_summary() -> dict: summary['total'] += row['count'] return summary + + +# ============================================================================= +# TSCM Device Timeline Functions +# ============================================================================= + +def add_device_timeline_entry( + device_identifier: str, + protocol: str, + sweep_id: int | None = None, + rssi: int | None = None, + presence: bool = True, + channel: int | None = None, + frequency: float | None = None, + attributes: dict | None = None +) -> int: + """Add a device timeline observation entry.""" + with get_db() as conn: + cursor = conn.execute(''' + INSERT INTO tscm_device_timelines + (device_identifier, protocol, sweep_id, rssi, presence, channel, frequency, attributes) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + device_identifier, protocol, sweep_id, rssi, presence, + channel, frequency, json.dumps(attributes) if attributes else None + )) + return cursor.lastrowid + + +def get_device_timeline( + device_identifier: str, + limit: int = 100, + since_hours: int = 24 +) -> list[dict]: + """Get timeline entries for a device.""" + with get_db() as conn: + cursor = conn.execute(''' + SELECT * FROM tscm_device_timelines + WHERE device_identifier = ? + AND timestamp > datetime('now', ?) + ORDER BY timestamp DESC + LIMIT ? + ''', (device_identifier, f'-{since_hours} hours', limit)) + + results = [] + for row in cursor: + results.append({ + 'id': row['id'], + 'device_identifier': row['device_identifier'], + 'protocol': row['protocol'], + 'sweep_id': row['sweep_id'], + 'timestamp': row['timestamp'], + 'rssi': row['rssi'], + 'presence': bool(row['presence']), + 'channel': row['channel'], + 'frequency': row['frequency'], + 'attributes': json.loads(row['attributes']) if row['attributes'] else None + }) + return list(reversed(results)) + + +def cleanup_old_timeline_entries(max_age_hours: int = 72) -> int: + """Remove old timeline entries.""" + with get_db() as conn: + cursor = conn.execute(''' + DELETE FROM tscm_device_timelines + WHERE timestamp < datetime('now', ?) + ''', (f'-{max_age_hours} hours',)) + return cursor.rowcount + + +# ============================================================================= +# TSCM Known-Good Registry Functions +# ============================================================================= + +def add_known_device( + identifier: str, + protocol: str, + name: str | None = None, + description: str | None = None, + location: str | None = None, + scope: str = 'global', + added_by: str | None = None, + score_modifier: int = -2, + metadata: dict | None = None +) -> int: + """Add a device to the known-good registry.""" + with get_db() as conn: + cursor = conn.execute(''' + INSERT INTO tscm_known_devices + (identifier, protocol, name, description, location, scope, added_by, score_modifier, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(identifier) DO UPDATE SET + name = excluded.name, + description = excluded.description, + location = excluded.location, + scope = excluded.scope, + score_modifier = excluded.score_modifier, + metadata = excluded.metadata, + last_verified = CURRENT_TIMESTAMP + ''', ( + identifier.upper(), protocol, name, description, location, + scope, added_by, score_modifier, json.dumps(metadata) if metadata else None + )) + return cursor.lastrowid + + +def get_known_device(identifier: str) -> dict | None: + """Get a known device by identifier.""" + with get_db() as conn: + cursor = conn.execute( + 'SELECT * FROM tscm_known_devices WHERE identifier = ?', + (identifier.upper(),) + ) + row = cursor.fetchone() + if not row: + return None + return { + 'id': row['id'], + 'identifier': row['identifier'], + 'protocol': row['protocol'], + 'name': row['name'], + 'description': row['description'], + 'location': row['location'], + 'scope': row['scope'], + 'added_at': row['added_at'], + 'added_by': row['added_by'], + 'last_verified': row['last_verified'], + 'score_modifier': row['score_modifier'], + 'metadata': json.loads(row['metadata']) if row['metadata'] else None + } + + +def get_all_known_devices( + location: str | None = None, + scope: str | None = None +) -> list[dict]: + """Get all known devices, optionally filtered by location or scope.""" + conditions = [] + params = [] + + if location: + conditions.append('(location = ? OR scope = ?)') + params.extend([location, 'global']) + if scope: + conditions.append('scope = ?') + params.append(scope) + + where_clause = f'WHERE {" AND ".join(conditions)}' if conditions else '' + + with get_db() as conn: + cursor = conn.execute(f''' + SELECT * FROM tscm_known_devices + {where_clause} + ORDER BY added_at DESC + ''', params) + + return [ + { + 'id': row['id'], + 'identifier': row['identifier'], + 'protocol': row['protocol'], + 'name': row['name'], + 'description': row['description'], + 'location': row['location'], + 'scope': row['scope'], + 'added_at': row['added_at'], + 'added_by': row['added_by'], + 'last_verified': row['last_verified'], + 'score_modifier': row['score_modifier'], + 'metadata': json.loads(row['metadata']) if row['metadata'] else None + } + for row in cursor + ] + + +def delete_known_device(identifier: str) -> bool: + """Remove a device from the known-good registry.""" + with get_db() as conn: + cursor = conn.execute( + 'DELETE FROM tscm_known_devices WHERE identifier = ?', + (identifier.upper(),) + ) + return cursor.rowcount > 0 + + +def is_known_good_device(identifier: str, location: str | None = None) -> dict | None: + """Check if a device is in the known-good registry for a location.""" + with get_db() as conn: + if location: + cursor = conn.execute(''' + SELECT * FROM tscm_known_devices + WHERE identifier = ? AND (location = ? OR scope = 'global') + ''', (identifier.upper(), location)) + else: + cursor = conn.execute( + 'SELECT * FROM tscm_known_devices WHERE identifier = ?', + (identifier.upper(),) + ) + row = cursor.fetchone() + if not row: + return None + return { + 'identifier': row['identifier'], + 'name': row['name'], + 'score_modifier': row['score_modifier'], + 'scope': row['scope'] + } + + +# ============================================================================= +# TSCM Case Functions +# ============================================================================= + +def create_tscm_case( + name: str, + description: str | None = None, + location: str | None = None, + priority: str = 'normal', + created_by: str | None = None, + metadata: dict | None = None +) -> int: + """Create a new TSCM case.""" + with get_db() as conn: + cursor = conn.execute(''' + INSERT INTO tscm_cases + (name, description, location, priority, created_by, metadata) + VALUES (?, ?, ?, ?, ?, ?) + ''', (name, description, location, priority, created_by, + json.dumps(metadata) if metadata else None)) + return cursor.lastrowid + + +def get_tscm_case(case_id: int) -> dict | None: + """Get a TSCM case by ID.""" + with get_db() as conn: + cursor = conn.execute('SELECT * FROM tscm_cases WHERE id = ?', (case_id,)) + row = cursor.fetchone() + if not row: + return None + + case = { + 'id': row['id'], + 'name': row['name'], + 'description': row['description'], + 'location': row['location'], + 'status': row['status'], + 'priority': row['priority'], + 'created_at': row['created_at'], + 'updated_at': row['updated_at'], + 'closed_at': row['closed_at'], + 'created_by': row['created_by'], + 'assigned_to': row['assigned_to'], + 'notes': row['notes'], + 'metadata': json.loads(row['metadata']) if row['metadata'] else None, + 'sweeps': [], + 'threats': [], + 'case_notes': [] + } + + # Get linked sweeps + cursor = conn.execute(''' + SELECT s.* FROM tscm_sweeps s + JOIN tscm_case_sweeps cs ON s.id = cs.sweep_id + WHERE cs.case_id = ? + ORDER BY s.started_at DESC + ''', (case_id,)) + case['sweeps'] = [dict(row) for row in cursor] + + # Get linked threats + cursor = conn.execute(''' + SELECT t.* FROM tscm_threats t + JOIN tscm_case_threats ct ON t.id = ct.threat_id + WHERE ct.case_id = ? + ORDER BY t.detected_at DESC + ''', (case_id,)) + case['threats'] = [dict(row) for row in cursor] + + # Get case notes + cursor = conn.execute(''' + SELECT * FROM tscm_case_notes + WHERE case_id = ? + ORDER BY created_at DESC + ''', (case_id,)) + case['case_notes'] = [dict(row) for row in cursor] + + return case + + +def get_all_tscm_cases( + status: str | None = None, + limit: int = 50 +) -> list[dict]: + """Get all TSCM cases.""" + conditions = [] + params = [] + + if status: + conditions.append('status = ?') + params.append(status) + + where_clause = f'WHERE {" AND ".join(conditions)}' if conditions else '' + params.append(limit) + + with get_db() as conn: + cursor = conn.execute(f''' + SELECT * FROM tscm_cases + {where_clause} + ORDER BY updated_at DESC + LIMIT ? + ''', params) + return [dict(row) for row in cursor] + + +def update_tscm_case( + case_id: int, + status: str | None = None, + priority: str | None = None, + assigned_to: str | None = None, + notes: str | None = None +) -> bool: + """Update a TSCM case.""" + updates = ['updated_at = CURRENT_TIMESTAMP'] + params = [] + + if status: + updates.append('status = ?') + params.append(status) + if status == 'closed': + updates.append('closed_at = CURRENT_TIMESTAMP') + if priority: + updates.append('priority = ?') + params.append(priority) + if assigned_to is not None: + updates.append('assigned_to = ?') + params.append(assigned_to) + if notes is not None: + updates.append('notes = ?') + params.append(notes) + + params.append(case_id) + + with get_db() as conn: + cursor = conn.execute( + f'UPDATE tscm_cases SET {", ".join(updates)} WHERE id = ?', + params + ) + return cursor.rowcount > 0 + + +def add_sweep_to_case(case_id: int, sweep_id: int) -> bool: + """Link a sweep to a case.""" + with get_db() as conn: + try: + conn.execute(''' + INSERT INTO tscm_case_sweeps (case_id, sweep_id) + VALUES (?, ?) + ''', (case_id, sweep_id)) + conn.execute( + 'UPDATE tscm_cases SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', + (case_id,) + ) + return True + except sqlite3.IntegrityError: + return False + + +def add_threat_to_case(case_id: int, threat_id: int) -> bool: + """Link a threat to a case.""" + with get_db() as conn: + try: + conn.execute(''' + INSERT INTO tscm_case_threats (case_id, threat_id) + VALUES (?, ?) + ''', (case_id, threat_id)) + conn.execute( + 'UPDATE tscm_cases SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', + (case_id,) + ) + return True + except sqlite3.IntegrityError: + return False + + +def add_case_note( + case_id: int, + content: str, + note_type: str = 'general', + created_by: str | None = None +) -> int: + """Add a note to a case.""" + with get_db() as conn: + cursor = conn.execute(''' + INSERT INTO tscm_case_notes (case_id, content, note_type, created_by) + VALUES (?, ?, ?, ?) + ''', (case_id, content, note_type, created_by)) + conn.execute( + 'UPDATE tscm_cases SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', + (case_id,) + ) + return cursor.lastrowid + + +# ============================================================================= +# TSCM Meeting Window Functions +# ============================================================================= + +def start_meeting_window( + sweep_id: int | None = None, + name: str | None = None, + location: str | None = None, + notes: str | None = None +) -> int: + """Start a meeting window.""" + with get_db() as conn: + cursor = conn.execute(''' + INSERT INTO tscm_meeting_windows (sweep_id, name, start_time, location, notes) + VALUES (?, ?, CURRENT_TIMESTAMP, ?, ?) + ''', (sweep_id, name, location, notes)) + return cursor.lastrowid + + +def end_meeting_window(meeting_id: int) -> bool: + """End a meeting window.""" + with get_db() as conn: + cursor = conn.execute(''' + UPDATE tscm_meeting_windows + SET end_time = CURRENT_TIMESTAMP + WHERE id = ? AND end_time IS NULL + ''', (meeting_id,)) + return cursor.rowcount > 0 + + +def get_active_meeting_window(sweep_id: int | None = None) -> dict | None: + """Get currently active meeting window.""" + with get_db() as conn: + if sweep_id: + cursor = conn.execute(''' + SELECT * FROM tscm_meeting_windows + WHERE sweep_id = ? AND end_time IS NULL + ORDER BY start_time DESC LIMIT 1 + ''', (sweep_id,)) + else: + cursor = conn.execute(''' + SELECT * FROM tscm_meeting_windows + WHERE end_time IS NULL + ORDER BY start_time DESC LIMIT 1 + ''') + row = cursor.fetchone() + if row: + return dict(row) + return None + + +def get_meeting_windows(sweep_id: int) -> list[dict]: + """Get all meeting windows for a sweep.""" + with get_db() as conn: + cursor = conn.execute(''' + SELECT * FROM tscm_meeting_windows + WHERE sweep_id = ? + ORDER BY start_time + ''', (sweep_id,)) + return [dict(row) for row in cursor] + + +# ============================================================================= +# TSCM Sweep Capabilities Functions +# ============================================================================= + +def save_sweep_capabilities( + sweep_id: int, + capabilities: dict, + limitations: list[str] | None = None +) -> int: + """Save sweep capabilities snapshot.""" + with get_db() as conn: + cursor = conn.execute(''' + INSERT INTO tscm_sweep_capabilities (sweep_id, capabilities, limitations) + VALUES (?, ?, ?) + ON CONFLICT(sweep_id) DO UPDATE SET + capabilities = excluded.capabilities, + limitations = excluded.limitations, + recorded_at = CURRENT_TIMESTAMP + ''', (sweep_id, json.dumps(capabilities), + json.dumps(limitations) if limitations else None)) + return cursor.lastrowid + + +def get_sweep_capabilities(sweep_id: int) -> dict | None: + """Get capabilities for a sweep.""" + with get_db() as conn: + cursor = conn.execute( + 'SELECT * FROM tscm_sweep_capabilities WHERE sweep_id = ?', + (sweep_id,) + ) + row = cursor.fetchone() + if not row: + return None + return { + 'sweep_id': row['sweep_id'], + 'capabilities': json.loads(row['capabilities']), + 'limitations': json.loads(row['limitations']) if row['limitations'] else [], + 'recorded_at': row['recorded_at'] + } diff --git a/utils/tscm/advanced.py b/utils/tscm/advanced.py new file mode 100644 index 0000000..3c7954e --- /dev/null +++ b/utils/tscm/advanced.py @@ -0,0 +1,2152 @@ +""" +TSCM Advanced Features Module + +Implements: +1. Capability & Coverage Reality Panel +2. Baseline Diff & Baseline Health +3. Per-Device Timelines +4. Meeting-Window Summary Enhancements +5. WiFi Advanced Indicators (Evil Twin, Probes, Deauth) +6. Bluetooth Risk Explainability & Proximity Heuristics +7. Operator Playbooks + +DISCLAIMER: This system performs wireless and RF surveillance screening. +Findings indicate anomalies and indicators, not confirmed surveillance devices. +All claims are probabilistic pattern matches requiring professional verification. +""" + +from __future__ import annotations + +import logging +import os +import platform +import subprocess +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from enum import Enum +from typing import Any, Optional + +logger = logging.getLogger('intercept.tscm.advanced') + + +# ============================================================================= +# 1. Capability & Coverage Reality Panel +# ============================================================================= + +class WifiMode(Enum): + """WiFi adapter operating modes.""" + MONITOR = 'monitor' + MANAGED = 'managed' + UNAVAILABLE = 'unavailable' + + +class BluetoothMode(Enum): + """Bluetooth adapter capabilities.""" + BLE_CLASSIC = 'ble_classic' + BLE_ONLY = 'ble_only' + LIMITED = 'limited' + UNAVAILABLE = 'unavailable' + + +@dataclass +class RFCapability: + """RF/SDR device capabilities.""" + device_type: str = 'none' + driver: str = '' + min_frequency_mhz: float = 0.0 + max_frequency_mhz: float = 0.0 + sample_rate_max: int = 0 + available: bool = False + limitations: list[str] = field(default_factory=list) + + +@dataclass +class SweepCapabilities: + """ + Complete capabilities snapshot for a TSCM sweep. + + Exposes what the current sweep CAN and CANNOT detect based on + OS, privileges, adapters, and SDR hardware limits. + """ + # System info + os_name: str = '' + os_version: str = '' + is_root: bool = False + + # WiFi capabilities + wifi_mode: WifiMode = WifiMode.UNAVAILABLE + wifi_interface: str = '' + wifi_driver: str = '' + wifi_monitor_capable: bool = False + wifi_limitations: list[str] = field(default_factory=list) + + # Bluetooth capabilities + bt_mode: BluetoothMode = BluetoothMode.UNAVAILABLE + bt_adapter: str = '' + bt_version: str = '' + bt_limitations: list[str] = field(default_factory=list) + + # RF/SDR capabilities + rf_capability: RFCapability = field(default_factory=RFCapability) + + # Overall limitations + all_limitations: list[str] = field(default_factory=list) + + # Timestamp + captured_at: datetime = field(default_factory=datetime.now) + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization.""" + return { + 'system': { + 'os': self.os_name, + 'os_version': self.os_version, + 'is_root': self.is_root, + }, + 'wifi': { + 'mode': self.wifi_mode.value, + 'interface': self.wifi_interface, + 'driver': self.wifi_driver, + 'monitor_capable': self.wifi_monitor_capable, + 'limitations': self.wifi_limitations, + }, + 'bluetooth': { + 'mode': self.bt_mode.value, + 'adapter': self.bt_adapter, + 'version': self.bt_version, + 'limitations': self.bt_limitations, + }, + 'rf': { + 'device_type': self.rf_capability.device_type, + 'driver': self.rf_capability.driver, + 'frequency_range_mhz': { + 'min': self.rf_capability.min_frequency_mhz, + 'max': self.rf_capability.max_frequency_mhz, + }, + 'sample_rate_max': self.rf_capability.sample_rate_max, + 'available': self.rf_capability.available, + 'limitations': self.rf_capability.limitations, + }, + 'all_limitations': self.all_limitations, + 'captured_at': self.captured_at.isoformat(), + 'disclaimer': ( + "Capabilities are detected at sweep start time and may change. " + "Limitations listed affect what this sweep can reliably detect." + ), + } + + +def detect_sweep_capabilities( + wifi_interface: str = '', + bt_adapter: str = '', + sdr_device: Any = None +) -> SweepCapabilities: + """ + Detect current system capabilities for TSCM sweeping. + + Args: + wifi_interface: Specific WiFi interface to check + bt_adapter: Specific BT adapter to check + sdr_device: SDR device object if available + + Returns: + SweepCapabilities object with complete capability assessment + """ + caps = SweepCapabilities() + + # System info + caps.os_name = platform.system() + caps.os_version = platform.release() + caps.is_root = os.geteuid() == 0 if hasattr(os, 'geteuid') else False + + # Detect WiFi capabilities + _detect_wifi_capabilities(caps, wifi_interface) + + # Detect Bluetooth capabilities + _detect_bluetooth_capabilities(caps, bt_adapter) + + # Detect RF/SDR capabilities + _detect_rf_capabilities(caps, sdr_device) + + # Compile all limitations + caps.all_limitations = ( + caps.wifi_limitations + + caps.bt_limitations + + caps.rf_capability.limitations + ) + + # Add privilege-based limitations + if not caps.is_root: + caps.all_limitations.append( + "Running without root privileges - some features may be limited" + ) + + return caps + + +def _detect_wifi_capabilities(caps: SweepCapabilities, interface: str) -> None: + """Detect WiFi adapter capabilities.""" + caps.wifi_interface = interface + + if platform.system() == 'Darwin': + # macOS: Check airport utility + airport_path = '/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport' + if os.path.exists(airport_path): + caps.wifi_mode = WifiMode.MANAGED + caps.wifi_driver = 'apple80211' + caps.wifi_monitor_capable = False + caps.wifi_limitations = [ + "Passive WiFi frame analysis is not available in this sweep.", + "macOS WiFi operates in managed mode only.", + "Cannot capture probe requests, deauthentication frames, or raw 802.11 headers.", + "Evil twin detection limited to SSID/BSSID comparison only.", + ] + else: + caps.wifi_mode = WifiMode.UNAVAILABLE + caps.wifi_limitations = ["WiFi scanning unavailable - no interface found"] + + else: + # Linux: Check for monitor mode capability + try: + # Check if interface supports monitor mode + result = subprocess.run( + ['iw', 'list'], + capture_output=True, + text=True, + timeout=5 + ) + + if 'monitor' in result.stdout.lower(): + # Check current mode + if interface: + mode_result = subprocess.run( + ['iw', 'dev', interface, 'info'], + capture_output=True, + text=True, + timeout=5 + ) + if 'type monitor' in mode_result.stdout.lower(): + caps.wifi_mode = WifiMode.MONITOR + caps.wifi_monitor_capable = True + else: + caps.wifi_mode = WifiMode.MANAGED + caps.wifi_monitor_capable = True + caps.wifi_limitations.append( + "WiFi interface in managed mode. " + "Probe requests and deauth detection require monitor mode." + ) + else: + caps.wifi_mode = WifiMode.MANAGED + caps.wifi_monitor_capable = True + else: + caps.wifi_mode = WifiMode.MANAGED + caps.wifi_monitor_capable = False + caps.wifi_limitations = [ + "Passive WiFi frame analysis is not available in this sweep.", + "WiFi adapter does not support monitor mode.", + "Probe request and deauthentication detection unavailable.", + ] + + # Get driver info + if interface: + try: + driver_path = f'/sys/class/net/{interface}/device/driver' + if os.path.exists(driver_path): + caps.wifi_driver = os.path.basename(os.readlink(driver_path)) + except Exception: + pass + + except (subprocess.TimeoutExpired, FileNotFoundError): + caps.wifi_mode = WifiMode.UNAVAILABLE + caps.wifi_limitations = ["WiFi scanning tools not available"] + + +def _detect_bluetooth_capabilities(caps: SweepCapabilities, adapter: str) -> None: + """Detect Bluetooth adapter capabilities.""" + caps.bt_adapter = adapter + + if platform.system() == 'Darwin': + # macOS: Use system_profiler + try: + result = subprocess.run( + ['system_profiler', 'SPBluetoothDataType', '-json'], + capture_output=True, + text=True, + timeout=10 + ) + if 'Bluetooth' in result.stdout: + caps.bt_mode = BluetoothMode.BLE_CLASSIC + caps.bt_version = 'macOS CoreBluetooth' + caps.bt_limitations = [ + "BLE scanning limited to advertising devices only.", + "Classic Bluetooth discovery may be incomplete.", + "Manufacturer data parsing depends on device advertising.", + ] + else: + caps.bt_mode = BluetoothMode.UNAVAILABLE + caps.bt_limitations = ["Bluetooth not available"] + except (subprocess.TimeoutExpired, FileNotFoundError): + caps.bt_mode = BluetoothMode.UNAVAILABLE + caps.bt_limitations = ["Bluetooth detection failed"] + + else: + # Linux: Check bluetoothctl/hciconfig + try: + result = subprocess.run( + ['hciconfig', '-a'], + capture_output=True, + text=True, + timeout=5 + ) + + if 'hci' in result.stdout.lower(): + # Check for BLE support + if 'le' in result.stdout.lower(): + caps.bt_mode = BluetoothMode.BLE_CLASSIC + caps.bt_limitations = [ + "BLE scanning range depends on adapter sensitivity.", + "Some devices may not be detected if not advertising.", + ] + else: + caps.bt_mode = BluetoothMode.LIMITED + caps.bt_limitations = [ + "Adapter may not support BLE scanning.", + "Limited to classic Bluetooth discovery.", + ] + + # Extract version + for line in result.stdout.split('\n'): + if 'hci version' in line.lower(): + caps.bt_version = line.strip() + break + else: + caps.bt_mode = BluetoothMode.UNAVAILABLE + caps.bt_limitations = ["No Bluetooth adapter found"] + + except (subprocess.TimeoutExpired, FileNotFoundError): + caps.bt_mode = BluetoothMode.UNAVAILABLE + caps.bt_limitations = ["Bluetooth tools not available"] + + +def _detect_rf_capabilities(caps: SweepCapabilities, sdr_device: Any) -> None: + """Detect RF/SDR device capabilities.""" + rf_cap = RFCapability() + + try: + from utils.sdr import SDRFactory + devices = SDRFactory.detect_devices() + + if devices: + device = devices[0] # Use first device + rf_cap.available = True + rf_cap.device_type = device.get('type', 'unknown') + rf_cap.driver = device.get('driver', '') + + # Set frequency ranges based on device type + if 'rtl' in rf_cap.device_type.lower(): + rf_cap.min_frequency_mhz = 24.0 + rf_cap.max_frequency_mhz = 1766.0 + rf_cap.sample_rate_max = 3200000 + rf_cap.limitations = [ + "RTL-SDR frequency range: 24-1766 MHz typical.", + "Cannot reliably cover frequencies below 24 MHz.", + "Cannot cover microwave bands (>1.8 GHz) without upconverter.", + "Signal detection limited by SDR noise floor and dynamic range.", + ] + elif 'hackrf' in rf_cap.device_type.lower(): + rf_cap.min_frequency_mhz = 1.0 + rf_cap.max_frequency_mhz = 6000.0 + rf_cap.sample_rate_max = 20000000 + rf_cap.limitations = [ + "HackRF frequency range: 1 MHz - 6 GHz.", + "8-bit ADC limits dynamic range for weak signal detection.", + ] + else: + rf_cap.limitations = [ + f"Unknown SDR type: {rf_cap.device_type}", + "Frequency coverage and capabilities uncertain.", + ] + else: + rf_cap.available = False + rf_cap.device_type = 'none' + rf_cap.limitations = [ + "No SDR device detected.", + "RF spectrum analysis is not available in this sweep.", + "Cannot scan for wireless microphones, bugs, or RF transmitters.", + ] + + except ImportError: + rf_cap.available = False + rf_cap.limitations = [ + "SDR support not installed.", + "RF spectrum analysis unavailable.", + ] + except Exception as e: + rf_cap.available = False + rf_cap.limitations = [f"SDR detection failed: {str(e)}"] + + caps.rf_capability = rf_cap + + +# ============================================================================= +# 2. Baseline Diff & Baseline Health +# ============================================================================= + +class BaselineHealth(Enum): + """Baseline health status.""" + HEALTHY = 'healthy' + NOISY = 'noisy' + STALE = 'stale' + + +@dataclass +class DeviceChange: + """Represents a change detected compared to baseline.""" + identifier: str + protocol: str + change_type: str # 'new', 'missing', 'rssi_drift', 'channel_change', 'security_change' + description: str + expected: bool = False # True if this is an expected/normal change + details: dict = field(default_factory=dict) + + +@dataclass +class BaselineDiff: + """ + Complete diff between a baseline and a sweep. + + Shows what changed, whether baseline is reliable, + and separates expected vs unexpected changes. + """ + baseline_id: int + sweep_id: int + + # Health assessment + health: BaselineHealth = BaselineHealth.HEALTHY + health_score: float = 1.0 # 0-1, higher is healthier + health_reasons: list[str] = field(default_factory=list) + + # Age metrics + baseline_age_hours: float = 0.0 + is_stale: bool = False + + # Device changes + new_devices: list[DeviceChange] = field(default_factory=list) + missing_devices: list[DeviceChange] = field(default_factory=list) + changed_devices: list[DeviceChange] = field(default_factory=list) + + # Summary counts + total_new: int = 0 + total_missing: int = 0 + total_changed: int = 0 + + # Expected vs unexpected + expected_changes: list[DeviceChange] = field(default_factory=list) + unexpected_changes: list[DeviceChange] = field(default_factory=list) + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization.""" + return { + 'baseline_id': self.baseline_id, + 'sweep_id': self.sweep_id, + 'health': { + 'status': self.health.value, + 'score': round(self.health_score, 2), + 'reasons': self.health_reasons, + }, + 'age': { + 'hours': round(self.baseline_age_hours, 1), + 'is_stale': self.is_stale, + }, + 'summary': { + 'new_devices': self.total_new, + 'missing_devices': self.total_missing, + 'changed_devices': self.total_changed, + 'expected_changes': len(self.expected_changes), + 'unexpected_changes': len(self.unexpected_changes), + }, + 'new_devices': [ + {'identifier': d.identifier, 'protocol': d.protocol, + 'description': d.description, 'details': d.details} + for d in self.new_devices + ], + 'missing_devices': [ + {'identifier': d.identifier, 'protocol': d.protocol, + 'description': d.description, 'details': d.details} + for d in self.missing_devices + ], + 'changed_devices': [ + {'identifier': d.identifier, 'protocol': d.protocol, + 'change_type': d.change_type, 'description': d.description, + 'expected': d.expected, 'details': d.details} + for d in self.changed_devices + ], + 'disclaimer': ( + "Baseline comparison shows differences, not confirmed threats. " + "New devices may be legitimate. Missing devices may have been powered off." + ), + } + + +def calculate_baseline_diff( + baseline: dict, + current_wifi: list[dict], + current_bt: list[dict], + current_rf: list[dict], + sweep_id: int +) -> BaselineDiff: + """ + Calculate comprehensive diff between baseline and current scan. + + Args: + baseline: Baseline dict from database + current_wifi: Current WiFi devices + current_bt: Current Bluetooth devices + current_rf: Current RF signals + sweep_id: Current sweep ID + + Returns: + BaselineDiff with complete comparison results + """ + diff = BaselineDiff( + baseline_id=baseline.get('id', 0), + sweep_id=sweep_id + ) + + # Calculate baseline age + created_at = baseline.get('created_at') + if created_at: + if isinstance(created_at, str): + try: + created = datetime.fromisoformat(created_at.replace('Z', '+00:00')) + diff.baseline_age_hours = (datetime.now() - created.replace(tzinfo=None)).total_seconds() / 3600 + except ValueError: + diff.baseline_age_hours = 0 + elif isinstance(created_at, datetime): + diff.baseline_age_hours = (datetime.now() - created_at).total_seconds() / 3600 + + # Check if baseline is stale (>72 hours old) + diff.is_stale = diff.baseline_age_hours > 72 + + # Build baseline lookup dicts + baseline_wifi = { + d.get('bssid', d.get('mac', '')).upper(): d + for d in baseline.get('wifi_networks', []) + if d.get('bssid') or d.get('mac') + } + baseline_bt = { + d.get('mac', d.get('address', '')).upper(): d + for d in baseline.get('bt_devices', []) + if d.get('mac') or d.get('address') + } + baseline_rf = { + round(d.get('frequency', 0), 1): d + for d in baseline.get('rf_frequencies', []) + if d.get('frequency') + } + + # Compare WiFi + _compare_wifi(diff, baseline_wifi, current_wifi) + + # Compare Bluetooth + _compare_bluetooth(diff, baseline_bt, current_bt) + + # Compare RF + _compare_rf(diff, baseline_rf, current_rf) + + # Calculate totals + diff.total_new = len(diff.new_devices) + diff.total_missing = len(diff.missing_devices) + diff.total_changed = len(diff.changed_devices) + + # Separate expected vs unexpected changes + for change in diff.new_devices + diff.missing_devices + diff.changed_devices: + if change.expected: + diff.expected_changes.append(change) + else: + diff.unexpected_changes.append(change) + + # Calculate health + _calculate_baseline_health(diff, baseline) + + return diff + + +def _compare_wifi(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None: + """Compare WiFi devices between baseline and current.""" + current_macs = { + d.get('bssid', d.get('mac', '')).upper(): d + for d in current + if d.get('bssid') or d.get('mac') + } + + # Find new devices + for mac, device in current_macs.items(): + if mac not in baseline: + ssid = device.get('essid', device.get('ssid', 'Hidden')) + diff.new_devices.append(DeviceChange( + identifier=mac, + protocol='wifi', + change_type='new', + description=f'New WiFi AP: {ssid}', + expected=False, + details={ + 'ssid': ssid, + 'channel': device.get('channel'), + 'rssi': device.get('power', device.get('signal')), + } + )) + else: + # Check for changes + baseline_dev = baseline[mac] + changes = [] + + # RSSI drift + curr_rssi = device.get('power', device.get('signal')) + base_rssi = baseline_dev.get('power', baseline_dev.get('signal')) + if curr_rssi and base_rssi: + rssi_diff = abs(int(curr_rssi) - int(base_rssi)) + if rssi_diff > 15: + changes.append(('rssi_drift', f'RSSI changed by {rssi_diff} dBm')) + + # Channel change + curr_chan = device.get('channel') + base_chan = baseline_dev.get('channel') + if curr_chan and base_chan and curr_chan != base_chan: + changes.append(('channel_change', f'Channel changed from {base_chan} to {curr_chan}')) + + # Security change + curr_sec = device.get('encryption', device.get('privacy', '')) + base_sec = baseline_dev.get('encryption', baseline_dev.get('privacy', '')) + if curr_sec and base_sec and curr_sec != base_sec: + changes.append(('security_change', f'Security changed from {base_sec} to {curr_sec}')) + + for change_type, desc in changes: + diff.changed_devices.append(DeviceChange( + identifier=mac, + protocol='wifi', + change_type=change_type, + description=desc, + expected=change_type == 'rssi_drift', # RSSI drift is often expected + details={ + 'ssid': device.get('essid', device.get('ssid')), + 'baseline': baseline_dev, + 'current': device, + } + )) + + # Find missing devices + for mac, device in baseline.items(): + if mac not in current_macs: + ssid = device.get('essid', device.get('ssid', 'Hidden')) + diff.missing_devices.append(DeviceChange( + identifier=mac, + protocol='wifi', + change_type='missing', + description=f'Missing WiFi AP: {ssid}', + expected=False, # Could be powered off + details={ + 'ssid': ssid, + 'last_channel': device.get('channel'), + } + )) + + +def _compare_bluetooth(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None: + """Compare Bluetooth devices between baseline and current.""" + current_macs = { + d.get('mac', d.get('address', '')).upper(): d + for d in current + if d.get('mac') or d.get('address') + } + + # Find new devices + for mac, device in current_macs.items(): + if mac not in baseline: + name = device.get('name', 'Unknown') + diff.new_devices.append(DeviceChange( + identifier=mac, + protocol='bluetooth', + change_type='new', + description=f'New BLE device: {name}', + expected=False, + details={ + 'name': name, + 'rssi': device.get('rssi'), + 'manufacturer': device.get('manufacturer'), + } + )) + else: + # Check for changes + baseline_dev = baseline[mac] + + # Name change (device renamed) + curr_name = device.get('name', '') + base_name = baseline_dev.get('name', '') + if curr_name and base_name and curr_name != base_name: + diff.changed_devices.append(DeviceChange( + identifier=mac, + protocol='bluetooth', + change_type='name_change', + description=f'Device renamed: {base_name} -> {curr_name}', + expected=True, + details={'old_name': base_name, 'new_name': curr_name} + )) + + # Find missing devices + for mac, device in baseline.items(): + if mac not in current_macs: + name = device.get('name', 'Unknown') + diff.missing_devices.append(DeviceChange( + identifier=mac, + protocol='bluetooth', + change_type='missing', + description=f'Missing BLE device: {name}', + expected=True, # BLE devices often go to sleep + details={'name': name} + )) + + +def _compare_rf(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None: + """Compare RF signals between baseline and current.""" + current_freqs = { + round(s.get('frequency', 0), 1): s + for s in current + if s.get('frequency') + } + + # Find new signals + for freq, signal in current_freqs.items(): + if freq not in baseline: + diff.new_devices.append(DeviceChange( + identifier=f'{freq:.1f} MHz', + protocol='rf', + change_type='new', + description=f'New RF signal at {freq:.3f} MHz', + expected=False, + details={ + 'frequency': freq, + 'power': signal.get('power', signal.get('level')), + 'modulation': signal.get('modulation'), + } + )) + + # Find missing signals + for freq, signal in baseline.items(): + if freq not in current_freqs: + diff.missing_devices.append(DeviceChange( + identifier=f'{freq:.1f} MHz', + protocol='rf', + change_type='missing', + description=f'Missing RF signal at {freq:.1f} MHz', + expected=True, # RF signals can be intermittent + details={'frequency': freq} + )) + + +def _calculate_baseline_health(diff: BaselineDiff, baseline: dict) -> None: + """Calculate baseline health score and status.""" + score = 1.0 + reasons = [] + + # Age penalty + if diff.baseline_age_hours > 168: # > 1 week + score -= 0.4 + reasons.append(f"Baseline is {diff.baseline_age_hours:.0f} hours old (>1 week)") + elif diff.baseline_age_hours > 72: # > 3 days + score -= 0.2 + reasons.append(f"Baseline is {diff.baseline_age_hours:.0f} hours old (>3 days)") + elif diff.baseline_age_hours > 24: + score -= 0.1 + reasons.append(f"Baseline is {diff.baseline_age_hours:.0f} hours old") + + # Device churn penalty + total_baseline = ( + len(baseline.get('wifi_networks', [])) + + len(baseline.get('bt_devices', [])) + + len(baseline.get('rf_frequencies', [])) + ) + + if total_baseline > 0: + churn_rate = (diff.total_new + diff.total_missing) / total_baseline + if churn_rate > 0.5: + score -= 0.3 + reasons.append(f"High device churn rate: {churn_rate:.0%}") + elif churn_rate > 0.25: + score -= 0.15 + reasons.append(f"Moderate device churn rate: {churn_rate:.0%}") + + # Small baseline penalty + if total_baseline < 3: + score -= 0.2 + reasons.append(f"Baseline has few devices ({total_baseline}) - may be incomplete") + + # Set health status + diff.health_score = max(0, min(1, score)) + + if diff.health_score >= 0.7: + diff.health = BaselineHealth.HEALTHY + elif diff.health_score >= 0.4: + diff.health = BaselineHealth.NOISY + if not reasons: + reasons.append("Baseline showing moderate variability") + else: + diff.health = BaselineHealth.STALE + if not reasons: + reasons.append("Baseline requires refresh") + + diff.health_reasons = reasons + + +# ============================================================================= +# 3. Per-Device Timelines +# ============================================================================= + +@dataclass +class DeviceObservation: + """A single observation of a device.""" + timestamp: datetime + rssi: Optional[int] = None + present: bool = True + channel: Optional[int] = None + frequency: Optional[float] = None + attributes: dict = field(default_factory=dict) + + +@dataclass +class DeviceTimeline: + """ + Complete timeline for a device showing behavior over time. + + Used to assess signal stability, movement patterns, and + meeting window correlation. + """ + identifier: str + protocol: str + name: Optional[str] = None + + # Observation history (time-bucketed) + observations: list[DeviceObservation] = field(default_factory=list) + + # Computed metrics + first_seen: Optional[datetime] = None + last_seen: Optional[datetime] = None + total_observations: int = 0 + presence_ratio: float = 0.0 # % of time device was present + + # Signal metrics + rssi_min: Optional[int] = None + rssi_max: Optional[int] = None + rssi_mean: Optional[float] = None + rssi_stability: float = 0.0 # 0-1, higher = more stable + + # Movement assessment + appears_stationary: bool = True + movement_pattern: str = 'unknown' # 'stationary', 'mobile', 'intermittent' + + # Meeting correlation + meeting_correlated: bool = False + meeting_observations: int = 0 + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization.""" + return { + 'identifier': self.identifier, + 'protocol': self.protocol, + 'name': self.name, + 'observations': [ + { + 'timestamp': obs.timestamp.isoformat(), + 'rssi': obs.rssi, + 'present': obs.present, + 'channel': obs.channel, + 'frequency': obs.frequency, + } + for obs in self.observations[-50:] # Limit to last 50 + ], + 'metrics': { + 'first_seen': self.first_seen.isoformat() if self.first_seen else None, + 'last_seen': self.last_seen.isoformat() if self.last_seen else None, + 'total_observations': self.total_observations, + 'presence_ratio': round(self.presence_ratio, 2), + }, + 'signal': { + 'rssi_min': self.rssi_min, + 'rssi_max': self.rssi_max, + 'rssi_mean': round(self.rssi_mean, 1) if self.rssi_mean else None, + 'stability': round(self.rssi_stability, 2), + }, + 'movement': { + 'appears_stationary': self.appears_stationary, + 'pattern': self.movement_pattern, + }, + 'meeting_correlation': { + 'correlated': self.meeting_correlated, + 'observations_during_meeting': self.meeting_observations, + }, + } + + +class TimelineManager: + """ + Manages per-device timelines with time-bucketing. + + Buckets observations to keep memory bounded while preserving + useful behavioral patterns. + """ + + def __init__(self, bucket_seconds: int = 30, max_observations: int = 200): + """ + Args: + bucket_seconds: Time bucket size in seconds + max_observations: Maximum observations to keep per device + """ + self.bucket_seconds = bucket_seconds + self.max_observations = max_observations + self.timelines: dict[str, DeviceTimeline] = {} + self._meeting_windows: list[tuple[datetime, Optional[datetime]]] = [] + + def add_observation( + self, + identifier: str, + protocol: str, + rssi: Optional[int] = None, + channel: Optional[int] = None, + frequency: Optional[float] = None, + name: Optional[str] = None, + attributes: Optional[dict] = None + ) -> None: + """Add an observation for a device.""" + key = f"{protocol}:{identifier.upper()}" + now = datetime.now() + + if key not in self.timelines: + self.timelines[key] = DeviceTimeline( + identifier=identifier.upper(), + protocol=protocol, + name=name, + first_seen=now, + ) + + timeline = self.timelines[key] + + # Update name if provided + if name: + timeline.name = name + + # Check if we should bucket with previous observation + if timeline.observations: + last_obs = timeline.observations[-1] + time_diff = (now - last_obs.timestamp).total_seconds() + + if time_diff < self.bucket_seconds: + # Update existing bucket + if rssi is not None: + # Average RSSI + if last_obs.rssi is not None: + last_obs.rssi = (last_obs.rssi + rssi) // 2 + else: + last_obs.rssi = rssi + return + + # Add new observation + obs = DeviceObservation( + timestamp=now, + rssi=rssi, + present=True, + channel=channel, + frequency=frequency, + attributes=attributes or {}, + ) + timeline.observations.append(obs) + + # Enforce max observations + if len(timeline.observations) > self.max_observations: + timeline.observations = timeline.observations[-self.max_observations:] + + # Update metrics + timeline.last_seen = now + timeline.total_observations = len(timeline.observations) + + # Check meeting correlation + if self._is_during_meeting(now): + timeline.meeting_observations += 1 + timeline.meeting_correlated = True + + def start_meeting_window(self) -> None: + """Mark the start of a meeting window.""" + self._meeting_windows.append((datetime.now(), None)) + + def end_meeting_window(self) -> None: + """Mark the end of a meeting window.""" + if self._meeting_windows and self._meeting_windows[-1][1] is None: + start = self._meeting_windows[-1][0] + self._meeting_windows[-1] = (start, datetime.now()) + + def _is_during_meeting(self, timestamp: datetime) -> bool: + """Check if timestamp falls within a meeting window.""" + for start, end in self._meeting_windows: + if end is None: + if timestamp >= start: + return True + elif start <= timestamp <= end: + return True + return False + + def compute_metrics(self, identifier: str, protocol: str) -> Optional[DeviceTimeline]: + """Compute all metrics for a device timeline.""" + key = f"{protocol}:{identifier.upper()}" + if key not in self.timelines: + return None + + timeline = self.timelines[key] + + if not timeline.observations: + return timeline + + # RSSI metrics + rssi_values = [obs.rssi for obs in timeline.observations if obs.rssi is not None] + if rssi_values: + timeline.rssi_min = min(rssi_values) + timeline.rssi_max = max(rssi_values) + timeline.rssi_mean = sum(rssi_values) / len(rssi_values) + + # Calculate stability (0-1) + if len(rssi_values) >= 3: + variance = sum((r - timeline.rssi_mean) ** 2 for r in rssi_values) / len(rssi_values) + timeline.rssi_stability = max(0, 1 - (variance / 100)) + + # Movement assessment based on RSSI variance + rssi_range = timeline.rssi_max - timeline.rssi_min + if rssi_range < 10: + timeline.appears_stationary = True + timeline.movement_pattern = 'stationary' + elif rssi_range < 25: + timeline.appears_stationary = False + timeline.movement_pattern = 'mobile' + else: + timeline.appears_stationary = False + timeline.movement_pattern = 'intermittent' + + # Presence ratio + if timeline.first_seen and timeline.last_seen: + total_duration = (timeline.last_seen - timeline.first_seen).total_seconds() + if total_duration > 0: + # Estimate presence based on observation count and bucket size + estimated_present_time = timeline.total_observations * self.bucket_seconds + timeline.presence_ratio = min(1.0, estimated_present_time / total_duration) + + return timeline + + def get_timeline(self, identifier: str, protocol: str) -> Optional[DeviceTimeline]: + """Get computed timeline for a device.""" + return self.compute_metrics(identifier, protocol) + + def get_all_timelines(self) -> list[DeviceTimeline]: + """Get all device timelines with computed metrics.""" + for key in self.timelines: + protocol, identifier = key.split(':', 1) + self.compute_metrics(identifier, protocol) + return list(self.timelines.values()) + + +# ============================================================================= +# 5. Meeting-Window Summary Enhancements +# ============================================================================= + +@dataclass +class MeetingWindowSummary: + """ + Summary of device activity during a meeting window. + + Tracks devices first seen during meeting, behavior changes, + and applies meeting-window scoring modifiers. + """ + meeting_id: int + name: Optional[str] = None + start_time: Optional[datetime] = None + end_time: Optional[datetime] = None + duration_minutes: float = 0.0 + + # Devices first seen during meeting (high interest) + devices_first_seen: list[dict] = field(default_factory=list) + + # Devices with behavior change during meeting + devices_behavior_change: list[dict] = field(default_factory=list) + + # All active devices during meeting + active_devices: list[dict] = field(default_factory=list) + + # Summary metrics + total_devices_active: int = 0 + new_devices_count: int = 0 + behavior_changes_count: int = 0 + high_interest_count: int = 0 + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization.""" + return { + 'meeting_id': self.meeting_id, + 'name': self.name, + 'start_time': self.start_time.isoformat() if self.start_time else None, + 'end_time': self.end_time.isoformat() if self.end_time else None, + 'duration_minutes': round(self.duration_minutes, 1), + 'summary': { + 'total_devices_active': self.total_devices_active, + 'new_devices': self.new_devices_count, + 'behavior_changes': self.behavior_changes_count, + 'high_interest': self.high_interest_count, + }, + 'devices_first_seen': self.devices_first_seen, + 'devices_behavior_change': self.devices_behavior_change, + 'disclaimer': ( + "Meeting-correlated activity indicates temporal correlation only, " + "not confirmed surveillance. Devices may have legitimate reasons " + "for appearing during meetings." + ), + } + + +def generate_meeting_summary( + meeting_window: dict, + device_timelines: list[DeviceTimeline], + device_profiles: list[dict] +) -> MeetingWindowSummary: + """ + Generate summary of device activity during a meeting window. + + Args: + meeting_window: Meeting window dict from database + device_timelines: List of device timelines + device_profiles: List of device profiles from correlation engine + + Returns: + MeetingWindowSummary with analysis + """ + summary = MeetingWindowSummary( + meeting_id=meeting_window.get('id', 0), + name=meeting_window.get('name'), + ) + + # Parse times + start_str = meeting_window.get('start_time') + end_str = meeting_window.get('end_time') + + if start_str: + if isinstance(start_str, str): + summary.start_time = datetime.fromisoformat(start_str.replace('Z', '+00:00')).replace(tzinfo=None) + else: + summary.start_time = start_str + + if end_str: + if isinstance(end_str, str): + summary.end_time = datetime.fromisoformat(end_str.replace('Z', '+00:00')).replace(tzinfo=None) + else: + summary.end_time = end_str + + if summary.start_time and summary.end_time: + summary.duration_minutes = (summary.end_time - summary.start_time).total_seconds() / 60 + + if not summary.start_time: + return summary + + # Analyze device timelines + for timeline in device_timelines: + if not timeline.first_seen: + continue + + # Check if device was active during meeting + was_active = False + first_seen_during = False + + for obs in timeline.observations: + if summary.end_time: + if summary.start_time <= obs.timestamp <= summary.end_time: + was_active = True + if timeline.first_seen and abs((obs.timestamp - timeline.first_seen).total_seconds()) < 60: + first_seen_during = True + break + else: + # Meeting still ongoing + if obs.timestamp >= summary.start_time: + was_active = True + if timeline.first_seen and abs((obs.timestamp - timeline.first_seen).total_seconds()) < 60: + first_seen_during = True + break + + if was_active: + device_info = { + 'identifier': timeline.identifier, + 'protocol': timeline.protocol, + 'name': timeline.name, + 'meeting_correlated': True, + } + summary.active_devices.append(device_info) + + if first_seen_during: + device_info['first_seen_during_meeting'] = True + summary.devices_first_seen.append({ + **device_info, + 'description': 'Device first seen during meeting window', + 'risk_modifier': '+2 (meeting-correlated activity)', + }) + + # Update counts + summary.total_devices_active = len(summary.active_devices) + summary.new_devices_count = len(summary.devices_first_seen) + summary.behavior_changes_count = len(summary.devices_behavior_change) + + # Count high interest from profiles + for profile in device_profiles: + if profile.get('risk_level') == 'high_interest': + indicators = profile.get('indicators', []) + if any(i.get('type') == 'meeting_correlated' for i in indicators): + summary.high_interest_count += 1 + + return summary + + +# ============================================================================= +# 7. WiFi Advanced Indicators (LIMITED SCOPE) +# ============================================================================= + +@dataclass +class WiFiAdvancedIndicator: + """An advanced WiFi indicator detection.""" + indicator_type: str # 'evil_twin', 'probe_request', 'deauth_burst' + severity: str # 'high', 'medium', 'low' + description: str + details: dict = field(default_factory=dict) + timestamp: datetime = field(default_factory=datetime.now) + requires_monitor_mode: bool = False + + def to_dict(self) -> dict: + return { + 'type': self.indicator_type, + 'severity': self.severity, + 'description': self.description, + 'details': self.details, + 'timestamp': self.timestamp.isoformat(), + 'requires_monitor_mode': self.requires_monitor_mode, + 'disclaimer': ( + "Pattern detected - this is an indicator, not confirmation of an attack. " + "Further investigation required." + ), + } + + +class WiFiAdvancedDetector: + """ + Detects advanced WiFi indicators. + + LIMITED SCOPE - Only implements: + 1. Evil Twin patterns (same SSID, different BSSID/security/abnormal signal) + 2. Probe requests for sensitive SSIDs (requires monitor mode) + 3. Deauthentication bursts (requires monitor mode) + + All findings labeled as "pattern detected", never called attacks. + """ + + def __init__(self, monitor_mode_available: bool = False): + self.monitor_mode = monitor_mode_available + self.known_networks: dict[str, dict] = {} # SSID -> expected BSSID/security + self.probe_requests: list[dict] = [] + self.deauth_frames: list[dict] = [] + self.indicators: list[WiFiAdvancedIndicator] = [] + + def set_known_networks(self, networks: list[dict]) -> None: + """Set known/expected networks from baseline.""" + for net in networks: + ssid = net.get('essid', net.get('ssid', '')) + if ssid: + self.known_networks[ssid] = { + 'bssid': net.get('bssid', net.get('mac', '')).upper(), + 'security': net.get('encryption', net.get('privacy', '')), + 'channel': net.get('channel'), + 'rssi': net.get('power', net.get('signal')), + } + + def analyze_network(self, network: dict) -> list[WiFiAdvancedIndicator]: + """ + Analyze a network for evil twin patterns. + + Detects: Same SSID with different BSSID, security, or abnormal signal. + """ + indicators = [] + ssid = network.get('essid', network.get('ssid', '')) + bssid = network.get('bssid', network.get('mac', '')).upper() + security = network.get('encryption', network.get('privacy', '')) + rssi = network.get('power', network.get('signal')) + + if not ssid or ssid in ['', 'Hidden', '[Hidden]']: + return indicators + + if ssid in self.known_networks: + known = self.known_networks[ssid] + + # Different BSSID for same SSID + if known['bssid'] and known['bssid'] != bssid: + # Check security mismatch + security_mismatch = known['security'] and security and known['security'] != security + + # Check signal anomaly (significantly stronger than expected) + signal_anomaly = False + if rssi and known.get('rssi'): + try: + rssi_diff = int(rssi) - int(known['rssi']) + signal_anomaly = rssi_diff > 20 # Much stronger than expected + except (ValueError, TypeError): + pass + + if security_mismatch: + indicators.append(WiFiAdvancedIndicator( + indicator_type='evil_twin', + severity='high', + description=f'Evil twin pattern detected for SSID "{ssid}"', + details={ + 'ssid': ssid, + 'detected_bssid': bssid, + 'expected_bssid': known['bssid'], + 'detected_security': security, + 'expected_security': known['security'], + 'pattern': 'Different BSSID with security downgrade', + }, + requires_monitor_mode=False, + )) + elif signal_anomaly: + indicators.append(WiFiAdvancedIndicator( + indicator_type='evil_twin', + severity='medium', + description=f'Possible evil twin pattern for SSID "{ssid}"', + details={ + 'ssid': ssid, + 'detected_bssid': bssid, + 'expected_bssid': known['bssid'], + 'signal_difference': f'+{rssi_diff} dBm stronger than expected', + 'pattern': 'Different BSSID with abnormally strong signal', + }, + requires_monitor_mode=False, + )) + else: + indicators.append(WiFiAdvancedIndicator( + indicator_type='evil_twin', + severity='low', + description=f'Duplicate SSID detected: "{ssid}"', + details={ + 'ssid': ssid, + 'detected_bssid': bssid, + 'expected_bssid': known['bssid'], + 'pattern': 'Multiple APs with same SSID (may be legitimate)', + }, + requires_monitor_mode=False, + )) + + self.indicators.extend(indicators) + return indicators + + def add_probe_request(self, frame: dict) -> Optional[WiFiAdvancedIndicator]: + """ + Record a probe request frame (requires monitor mode). + + Detects repeated probing for sensitive SSIDs. + """ + if not self.monitor_mode: + return None + + self.probe_requests.append({ + 'timestamp': datetime.now(), + 'src_mac': frame.get('src_mac', '').upper(), + 'probed_ssid': frame.get('ssid', ''), + }) + + # Keep last 1000 probe requests + if len(self.probe_requests) > 1000: + self.probe_requests = self.probe_requests[-1000:] + + # Check for sensitive SSID probing + ssid = frame.get('ssid', '') + sensitive_patterns = [ + 'corp', 'internal', 'private', 'secure', 'vpn', + 'admin', 'management', 'executive', 'board', + ] + + is_sensitive = any(p in ssid.lower() for p in sensitive_patterns) if ssid else False + + if is_sensitive: + # Count recent probes for this SSID + recent_cutoff = datetime.now() - timedelta(minutes=5) + recent_probes = [ + p for p in self.probe_requests + if p['probed_ssid'] == ssid and p['timestamp'] > recent_cutoff + ] + + if len(recent_probes) >= 3: + indicator = WiFiAdvancedIndicator( + indicator_type='probe_request', + severity='medium', + description=f'Repeated probing for sensitive SSID "{ssid}"', + details={ + 'ssid': ssid, + 'probe_count': len(recent_probes), + 'source_macs': list(set(p['src_mac'] for p in recent_probes)), + 'pattern': 'Multiple probe requests for potentially sensitive network', + }, + requires_monitor_mode=True, + ) + self.indicators.append(indicator) + return indicator + + return None + + def add_deauth_frame(self, frame: dict) -> Optional[WiFiAdvancedIndicator]: + """ + Record a deauthentication frame (requires monitor mode). + + Detects abnormal deauth volume potentially indicating attack. + """ + if not self.monitor_mode: + return None + + self.deauth_frames.append({ + 'timestamp': datetime.now(), + 'src_mac': frame.get('src_mac', '').upper(), + 'dst_mac': frame.get('dst_mac', '').upper(), + 'bssid': frame.get('bssid', '').upper(), + 'reason': frame.get('reason_code'), + }) + + # Keep last 500 deauth frames + if len(self.deauth_frames) > 500: + self.deauth_frames = self.deauth_frames[-500:] + + # Check for deauth burst (>10 deauths in 10 seconds) + recent_cutoff = datetime.now() - timedelta(seconds=10) + recent_deauths = [d for d in self.deauth_frames if d['timestamp'] > recent_cutoff] + + if len(recent_deauths) >= 10: + # Check if targeting specific BSSID + bssid = frame.get('bssid', '').upper() + targeting_bssid = len([d for d in recent_deauths if d['bssid'] == bssid]) >= 5 + + indicator = WiFiAdvancedIndicator( + indicator_type='deauth_burst', + severity='high' if targeting_bssid else 'medium', + description='Deauthentication burst pattern detected', + details={ + 'deauth_count': len(recent_deauths), + 'time_window_seconds': 10, + 'targeted_bssid': bssid if targeting_bssid else None, + 'unique_sources': len(set(d['src_mac'] for d in recent_deauths)), + 'pattern': 'Abnormal deauthentication frame volume', + }, + requires_monitor_mode=True, + ) + self.indicators.append(indicator) + + # Clear recent to avoid repeated alerts + self.deauth_frames = [d for d in self.deauth_frames if d['timestamp'] <= recent_cutoff] + + return indicator + + return None + + def get_all_indicators(self) -> list[dict]: + """Get all detected indicators.""" + return [i.to_dict() for i in self.indicators] + + def get_unavailable_features(self) -> list[str]: + """Get list of features unavailable without monitor mode.""" + if self.monitor_mode: + return [] + return [ + "Probe request analysis: Requires monitor mode to capture probe frames.", + "Deauthentication detection: Requires monitor mode to capture management frames.", + "Raw 802.11 frame analysis: Not available in managed mode.", + ] + + +# ============================================================================= +# 8. Bluetooth Risk Explainability & Proximity Heuristics +# ============================================================================= + +class BLEProximity(Enum): + """RSSI-based proximity estimation.""" + VERY_CLOSE = 'very_close' # Within ~1m + CLOSE = 'close' # Within ~3m + MODERATE = 'moderate' # Within ~10m + FAR = 'far' # Beyond ~10m + UNKNOWN = 'unknown' + + +@dataclass +class BLERiskExplanation: + """ + Explainable risk assessment for a BLE device. + + Provides human-readable explanations, proximity estimates, + and recommended actions. + """ + identifier: str + name: Optional[str] = None + + # Risk assessment + risk_level: str = 'informational' + risk_score: int = 0 + risk_explanation: str = '' + + # Proximity + proximity: BLEProximity = BLEProximity.UNKNOWN + proximity_explanation: str = '' + estimated_distance: str = '' + + # Tracker detection + is_tracker: bool = False + tracker_type: Optional[str] = None + tracker_explanation: str = '' + + # Meeting correlation + meeting_correlated: bool = False + meeting_explanation: str = '' + + # Recommended action + recommended_action: str = '' + action_rationale: str = '' + + # All indicators with explanations + indicators: list[dict] = field(default_factory=list) + + def to_dict(self) -> dict: + return { + 'identifier': self.identifier, + 'name': self.name, + 'risk': { + 'level': self.risk_level, + 'score': self.risk_score, + 'explanation': self.risk_explanation, + }, + 'proximity': { + 'estimate': self.proximity.value, + 'explanation': self.proximity_explanation, + 'estimated_distance': self.estimated_distance, + }, + 'tracker': { + 'is_tracker': self.is_tracker, + 'type': self.tracker_type, + 'explanation': self.tracker_explanation, + }, + 'meeting_correlation': { + 'correlated': self.meeting_correlated, + 'explanation': self.meeting_explanation, + }, + 'recommended_action': { + 'action': self.recommended_action, + 'rationale': self.action_rationale, + }, + 'indicators': self.indicators, + 'disclaimer': ( + "Risk assessment is based on observable indicators and heuristics. " + "Proximity estimates are approximate based on RSSI and may vary with environment. " + "Tracker detection indicates brand presence, not confirmed threat." + ), + } + + +def estimate_ble_proximity(rssi: int) -> tuple[BLEProximity, str, str]: + """ + Estimate BLE device proximity from RSSI. + + Note: RSSI-based distance is highly variable due to: + - TX power differences between devices + - Environmental factors (walls, interference) + - Antenna characteristics + + Returns: + Tuple of (proximity enum, explanation, estimated distance string) + """ + if rssi is None: + return ( + BLEProximity.UNKNOWN, + "RSSI not available - cannot estimate proximity", + "Unknown" + ) + + # These thresholds are heuristic approximations + if rssi >= -50: + return ( + BLEProximity.VERY_CLOSE, + f"Very strong signal ({rssi} dBm) suggests device is very close", + "< 1 meter (approximate)" + ) + elif rssi >= -65: + return ( + BLEProximity.CLOSE, + f"Strong signal ({rssi} dBm) suggests device is nearby", + "1-3 meters (approximate)" + ) + elif rssi >= -80: + return ( + BLEProximity.MODERATE, + f"Moderate signal ({rssi} dBm) suggests device is in the area", + "3-10 meters (approximate)" + ) + else: + return ( + BLEProximity.FAR, + f"Weak signal ({rssi} dBm) suggests device is distant", + "> 10 meters (approximate)" + ) + + +def generate_ble_risk_explanation( + device: dict, + profile: Optional[dict] = None, + is_during_meeting: bool = False +) -> BLERiskExplanation: + """ + Generate human-readable risk explanation for a BLE device. + + Args: + device: BLE device dict with mac, name, rssi, etc. + profile: DeviceProfile dict from correlation engine + is_during_meeting: Whether device was detected during meeting + + Returns: + BLERiskExplanation with complete assessment + """ + mac = device.get('mac', device.get('address', '')).upper() + name = device.get('name', '') + rssi = device.get('rssi', device.get('signal')) + + explanation = BLERiskExplanation( + identifier=mac, + name=name if name else None, + ) + + # Proximity estimation + if rssi: + try: + rssi_int = int(rssi) + prox, prox_exp, dist = estimate_ble_proximity(rssi_int) + explanation.proximity = prox + explanation.proximity_explanation = prox_exp + explanation.estimated_distance = dist + except (ValueError, TypeError): + explanation.proximity = BLEProximity.UNKNOWN + explanation.proximity_explanation = "Could not parse RSSI value" + + # Tracker detection with explanation + tracker_info = device.get('tracker_type') or device.get('is_tracker') + if device.get('is_airtag'): + explanation.is_tracker = True + explanation.tracker_type = 'Apple AirTag' + explanation.tracker_explanation = ( + "Apple AirTag detected via manufacturer data. AirTags are legitimate " + "tracking devices but may indicate unwanted tracking if not recognized. " + "Apple's Find My network will alert iPhone users to unknown AirTags." + ) + elif device.get('is_tile'): + explanation.is_tracker = True + explanation.tracker_type = 'Tile' + explanation.tracker_explanation = ( + "Tile tracker detected. Tile trackers are common consumer devices " + "for finding lost items. Presence does not indicate surveillance." + ) + elif device.get('is_smarttag'): + explanation.is_tracker = True + explanation.tracker_type = 'Samsung SmartTag' + explanation.tracker_explanation = ( + "Samsung SmartTag detected. SmartTags are consumer tracking devices " + "similar to AirTags. Samsung phones can detect unknown SmartTags." + ) + elif device.get('is_espressif'): + explanation.tracker_type = 'ESP32/ESP8266' + explanation.tracker_explanation = ( + "Espressif chipset (ESP32/ESP8266) detected. These are programmable " + "development boards commonly used in IoT projects. They can be configured " + "for various purposes including custom tracking devices." + ) + + # Meeting correlation explanation + if is_during_meeting or device.get('meeting_correlated'): + explanation.meeting_correlated = True + explanation.meeting_explanation = ( + "Device detected during a marked meeting window. This temporal correlation " + "is noted but does not confirm malicious intent - many legitimate devices " + "are active during meetings (phones, laptops, wearables)." + ) + + # Build risk explanation from profile + if profile: + explanation.risk_level = profile.get('risk_level', 'informational') + explanation.risk_score = profile.get('total_score', 0) + + # Convert indicators to explanations + for ind in profile.get('indicators', []): + ind_type = ind.get('type', '') + ind_desc = ind.get('description', '') + + explanation.indicators.append({ + 'type': ind_type, + 'description': ind_desc, + 'explanation': _get_indicator_explanation(ind_type), + }) + + # Build overall risk explanation + if explanation.risk_level == 'high_interest': + explanation.risk_explanation = ( + f"This device has accumulated {explanation.risk_score} risk points " + "across multiple indicators, warranting closer investigation. " + "High interest does not confirm surveillance - manual verification required." + ) + elif explanation.risk_level == 'review': + explanation.risk_explanation = ( + f"This device shows {explanation.risk_score} risk points indicating " + "it should be reviewed but is not immediately concerning." + ) + else: + explanation.risk_explanation = ( + "This device shows typical characteristics and does not raise " + "significant concerns based on observable indicators." + ) + else: + explanation.risk_explanation = "No detailed profile available for risk assessment." + + # Recommended action + _set_recommended_action(explanation) + + return explanation + + +def _get_indicator_explanation(indicator_type: str) -> str: + """Get human-readable explanation for an indicator type.""" + explanations = { + 'unknown_device': ( + "Device manufacturer is unknown or uses a generic chipset. " + "This is common in DIY/hobbyist devices and some surveillance equipment." + ), + 'audio_capable': ( + "Device advertises audio services (headphones, speakers, etc.). " + "Audio-capable devices could theoretically transmit captured audio." + ), + 'persistent': ( + "Device has been detected repeatedly across multiple scans. " + "Persistence suggests a fixed or regularly present device." + ), + 'meeting_correlated': ( + "Device activity correlates with marked meeting windows. " + "This is a temporal pattern that warrants attention." + ), + 'hidden_identity': ( + "Device does not broadcast a name or uses minimal advertising. " + "Some legitimate devices minimize advertising for battery life." + ), + 'stable_rssi': ( + "Signal strength is very stable, suggesting a stationary device. " + "Fixed placement could indicate a planted device." + ), + 'mac_rotation': ( + "Device appears to use MAC address randomization. " + "This is a privacy feature in modern devices, also used to evade detection." + ), + 'known_tracker': ( + "Device matches known tracking device signatures. " + "May be a legitimate item tracker or unwanted surveillance." + ), + 'airtag_detected': ( + "Apple AirTag identified. Check if this belongs to someone present." + ), + 'tile_detected': ( + "Tile tracker identified. Common consumer tracking device." + ), + 'smarttag_detected': ( + "Samsung SmartTag identified. Consumer tracking device." + ), + 'esp32_device': ( + "Espressif development board detected. Highly programmable, " + "could be configured for custom surveillance applications." + ), + } + return explanations.get(indicator_type, "Indicator detected requiring review.") + + +def _set_recommended_action(explanation: BLERiskExplanation) -> None: + """Set recommended action based on risk assessment.""" + if explanation.risk_level == 'high_interest': + if explanation.is_tracker and explanation.proximity == BLEProximity.VERY_CLOSE: + explanation.recommended_action = 'Investigate immediately' + explanation.action_rationale = ( + "Unknown tracker in very close proximity warrants immediate " + "physical search of the area and personal belongings." + ) + elif explanation.is_tracker: + explanation.recommended_action = 'Investigate location' + explanation.action_rationale = ( + "Tracker detected - recommend searching the area to locate " + "the physical device and determine if it belongs to someone present." + ) + else: + explanation.recommended_action = 'Review and document' + explanation.action_rationale = ( + "Multiple risk indicators present. Document the finding, " + "attempt to identify the device, and consider physical search " + "if other indicators suggest surveillance." + ) + elif explanation.risk_level == 'review': + explanation.recommended_action = 'Monitor and document' + explanation.action_rationale = ( + "Device shows some indicators worth noting. Add to monitoring list " + "and compare against future sweeps to identify patterns." + ) + else: + explanation.recommended_action = 'Continue monitoring' + explanation.action_rationale = ( + "No immediate action required. Device will be tracked in subsequent " + "sweeps for pattern analysis." + ) + + +# ============================================================================= +# 9. Operator Playbooks ("What To Do Next") +# ============================================================================= + +@dataclass +class PlaybookStep: + """A single step in an operator playbook.""" + step_number: int + action: str + details: str + safety_note: Optional[str] = None + + +@dataclass +class OperatorPlaybook: + """ + Procedural guidance for TSCM operators based on findings. + + Playbooks are procedural (what to do), not prescriptive (how to decide). + All guidance is legally safe and professional. + """ + playbook_id: str + title: str + risk_level: str + description: str + steps: list[PlaybookStep] = field(default_factory=list) + when_to_escalate: str = '' + documentation_required: list[str] = field(default_factory=list) + + def to_dict(self) -> dict: + return { + 'playbook_id': self.playbook_id, + 'title': self.title, + 'risk_level': self.risk_level, + 'description': self.description, + 'steps': [ + { + 'step': s.step_number, + 'action': s.action, + 'details': s.details, + 'safety_note': s.safety_note, + } + for s in self.steps + ], + 'when_to_escalate': self.when_to_escalate, + 'documentation_required': self.documentation_required, + 'disclaimer': ( + "This playbook provides procedural guidance only. Actions should be " + "adapted to local laws, organizational policies, and professional judgment. " + "Do not disassemble, interfere with, or remove suspected devices without " + "proper authorization and legal guidance." + ), + } + + +# Predefined playbooks by risk level +PLAYBOOKS = { + 'high_interest_tracker': OperatorPlaybook( + playbook_id='PB-001', + title='High Interest: Unknown Tracker Detection', + risk_level='high_interest', + description='Guidance for responding to unknown tracking device detection', + steps=[ + PlaybookStep( + step_number=1, + action='Document the finding', + details='Record device identifier, signal strength, location, and timestamp. Take screenshots of the detection.', + ), + PlaybookStep( + step_number=2, + action='Estimate device location', + details='Use signal strength variations while moving to triangulate approximate device position. Note areas of strongest signal.', + safety_note='Do not touch or disturb any physical device found.', + ), + PlaybookStep( + step_number=3, + action='Physical search (if authorized)', + details='Systematically search the high-signal area. Check common hiding spots: under furniture, in plants, behind fixtures, in bags/belongings.', + safety_note='Only conduct physical searches with proper authorization.', + ), + PlaybookStep( + step_number=4, + action='Identify device owner', + details='If device is located, determine if it belongs to someone legitimately present. Apple/Samsung/Tile devices can be scanned by their respective apps.', + ), + PlaybookStep( + step_number=5, + action='Escalate if unidentified', + details='If device owner cannot be determined and device is in sensitive location, escalate to security management.', + ), + ], + when_to_escalate='Escalate immediately if: device is concealed in sensitive area, owner cannot be identified, or multiple unknown trackers are found.', + documentation_required=[ + 'Device identifier (MAC address)', + 'Signal strength readings at multiple locations', + 'Physical location description', + 'Photos of any located devices', + 'Names of individuals present during search', + ], + ), + + 'high_interest_generic': OperatorPlaybook( + playbook_id='PB-002', + title='High Interest: Suspicious Device Pattern', + risk_level='high_interest', + description='Guidance for devices with multiple high-risk indicators', + steps=[ + PlaybookStep( + step_number=1, + action='Review all indicators', + details='Examine each risk indicator in the device profile. Understand why the device scored high interest.', + ), + PlaybookStep( + step_number=2, + action='Cross-reference with baseline', + details='Check if device appears in baseline. New devices warrant more scrutiny than known devices.', + ), + PlaybookStep( + step_number=3, + action='Monitor for pattern', + details='Continue sweep and note if device persists, moves, or correlates with sensitive activities.', + ), + PlaybookStep( + step_number=4, + action='Attempt identification', + details='Research manufacturer OUI, check for matching devices in the environment, ask occupants about devices.', + ), + PlaybookStep( + step_number=5, + action='Document and report', + details='Add finding to sweep report with full details. Include in meeting/client debrief.', + ), + ], + when_to_escalate='Escalate if: device cannot be identified, shows surveillance-consistent behavior, or correlates strongly with sensitive activities.', + documentation_required=[ + 'Complete device profile', + 'All risk indicators with scores', + 'Timeline of observations', + 'Correlation with meeting windows', + 'Any identification attempts and results', + ], + ), + + 'needs_review': OperatorPlaybook( + playbook_id='PB-003', + title='Needs Review: Unknown Device', + risk_level='needs_review', + description='Guidance for devices requiring investigation but not immediately concerning', + steps=[ + PlaybookStep( + step_number=1, + action='Note the device', + details='Add device to monitoring list. Record basic details: identifier, type, signal strength.', + ), + PlaybookStep( + step_number=2, + action='Check against known devices', + details='Verify device is not a known infrastructure device or personal device of authorized personnel.', + ), + PlaybookStep( + step_number=3, + action='Continue sweep', + details='Complete the sweep. Review device in context of all findings.', + ), + PlaybookStep( + step_number=4, + action='Assess in final review', + details='During sweep wrap-up, decide if device warrants further investigation or can be added to baseline.', + ), + ], + when_to_escalate='Escalate if: multiple "needs review" devices appear together, or device shows high-interest indicators in subsequent sweeps.', + documentation_required=[ + 'Device identifier and type', + 'Brief description of why flagged', + 'Decision made (investigate further / add to baseline / monitor)', + ], + ), + + 'informational': OperatorPlaybook( + playbook_id='PB-004', + title='Informational: Known/Expected Device', + risk_level='informational', + description='Guidance for devices that appear normal and expected', + steps=[ + PlaybookStep( + step_number=1, + action='Verify against baseline', + details='Confirm device matches baseline entry. Note any changes (signal strength, channel, etc.).', + ), + PlaybookStep( + step_number=2, + action='Log observation', + details='Record observation for timeline tracking. Even known devices should be logged.', + ), + PlaybookStep( + step_number=3, + action='Continue sweep', + details='No further action required. Proceed with sweep.', + ), + ], + when_to_escalate='Only escalate if device shows unexpected behavior changes or additional risk indicators.', + documentation_required=[ + 'Device identifier (for timeline)', + 'Observation timestamp', + ], + ), + + 'wifi_evil_twin': OperatorPlaybook( + playbook_id='PB-005', + title='High Interest: Evil Twin Pattern Detected', + risk_level='high_interest', + description='Guidance when duplicate SSID with security mismatch is detected', + steps=[ + PlaybookStep( + step_number=1, + action='Document both access points', + details='Record details of legitimate AP and suspected rogue: BSSID, security, signal strength, channel.', + ), + PlaybookStep( + step_number=2, + action='Verify legitimate AP', + details='Confirm which AP is the authorized infrastructure. Check with IT/facilities if needed.', + ), + PlaybookStep( + step_number=3, + action='Locate rogue AP', + details='Use signal strength to estimate rogue AP location. Walk the area noting signal variations.', + safety_note='Do not connect to or interact with the suspected rogue AP.', + ), + PlaybookStep( + step_number=4, + action='Physical search', + details='Search suspected area for unauthorized access point. Check for hidden devices, suspicious equipment.', + ), + PlaybookStep( + step_number=5, + action='Report to IT Security', + details='Even if device not found, report the finding to IT Security for network monitoring.', + ), + ], + when_to_escalate='Escalate immediately. Evil twin attacks can capture credentials and traffic.', + documentation_required=[ + 'Both AP details (BSSID, SSID, security, channel, signal)', + 'Location where detected', + 'Signal strength map if created', + 'Physical search results', + ], + ), +} + + +def get_playbook_for_finding( + risk_level: str, + finding_type: Optional[str] = None, + indicators: Optional[list[dict]] = None +) -> OperatorPlaybook: + """ + Get appropriate playbook for a finding. + + Args: + risk_level: Risk level string + finding_type: Optional specific finding type + indicators: Optional list of indicators + + Returns: + Appropriate OperatorPlaybook + """ + # Check for specific finding types + if finding_type == 'evil_twin': + return PLAYBOOKS['wifi_evil_twin'] + + # Check indicators for tracker + if indicators: + tracker_types = ['airtag_detected', 'tile_detected', 'smarttag_detected', 'known_tracker'] + if any(i.get('type') in tracker_types for i in indicators): + if risk_level == 'high_interest': + return PLAYBOOKS['high_interest_tracker'] + + # Return based on risk level + if risk_level == 'high_interest': + return PLAYBOOKS['high_interest_generic'] + elif risk_level in ['review', 'needs_review']: + return PLAYBOOKS['needs_review'] + else: + return PLAYBOOKS['informational'] + + +def attach_playbook_to_finding(finding: dict) -> dict: + """ + Attach appropriate playbook to a finding dict. + + Args: + finding: Finding dict with risk_level, indicators, etc. + + Returns: + Finding dict with playbook attached + """ + risk_level = finding.get('risk_level', 'informational') + finding_type = finding.get('finding_type') + indicators = finding.get('indicators', []) + + playbook = get_playbook_for_finding(risk_level, finding_type, indicators) + finding['suggested_playbook'] = playbook.to_dict() + finding['suggested_next_steps'] = [ + f"Step {s.step_number}: {s.action}" + for s in playbook.steps[:3] # First 3 steps as quick reference + ] + + return finding + + +# ============================================================================= +# Global Instance Management +# ============================================================================= + +_timeline_manager: Optional[TimelineManager] = None +_wifi_detector: Optional[WiFiAdvancedDetector] = None + + +def get_timeline_manager() -> TimelineManager: + """Get or create global timeline manager.""" + global _timeline_manager + if _timeline_manager is None: + _timeline_manager = TimelineManager() + return _timeline_manager + + +def reset_timeline_manager() -> None: + """Reset global timeline manager.""" + global _timeline_manager + _timeline_manager = TimelineManager() + + +def get_wifi_detector(monitor_mode: bool = False) -> WiFiAdvancedDetector: + """Get or create global WiFi detector.""" + global _wifi_detector + if _wifi_detector is None: + _wifi_detector = WiFiAdvancedDetector(monitor_mode) + return _wifi_detector + + +def reset_wifi_detector(monitor_mode: bool = False) -> None: + """Reset global WiFi detector.""" + global _wifi_detector + _wifi_detector = WiFiAdvancedDetector(monitor_mode) diff --git a/utils/tscm/reports.py b/utils/tscm/reports.py new file mode 100644 index 0000000..4c97bb9 --- /dev/null +++ b/utils/tscm/reports.py @@ -0,0 +1,813 @@ +""" +TSCM Report Generation Module + +Generates: +1. Client-safe PDF reports with executive summary +2. Technical annex (JSON + CSV) with device timelines and indicators + +DISCLAIMER: All reports include mandatory disclaimers. +No packet data. No claims of confirmed surveillance. +""" + +from __future__ import annotations + +import csv +import io +import json +import logging +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Optional + +logger = logging.getLogger('intercept.tscm.reports') + +# ============================================================================= +# Report Data Structures +# ============================================================================= + +@dataclass +class ReportFinding: + """A single finding for the report.""" + identifier: str + protocol: str + name: Optional[str] + risk_level: str + risk_score: int + description: str + indicators: list[dict] = field(default_factory=list) + recommended_action: str = '' + playbook_reference: str = '' + + +@dataclass +class ReportMeetingSummary: + """Meeting window summary for report.""" + name: Optional[str] + start_time: str + end_time: Optional[str] + duration_minutes: float + devices_first_seen: int + behavior_changes: int + high_interest_devices: int + + +@dataclass +class TSCMReport: + """ + Complete TSCM sweep report. + + Contains all data needed for both client-safe PDF and technical annex. + """ + # Report metadata + report_id: str + generated_at: datetime + sweep_id: int + sweep_type: str + + # Location and context + location: Optional[str] = None + baseline_id: Optional[int] = None + baseline_name: Optional[str] = None + + # Executive summary + executive_summary: str = '' + overall_risk_assessment: str = 'low' # low, moderate, elevated, high + key_findings_count: int = 0 + + # Capabilities used + capabilities: dict = field(default_factory=dict) + limitations: list[str] = field(default_factory=list) + + # Findings by risk tier + high_interest_findings: list[ReportFinding] = field(default_factory=list) + needs_review_findings: list[ReportFinding] = field(default_factory=list) + informational_findings: list[ReportFinding] = field(default_factory=list) + + # Meeting window summaries + meeting_summaries: list[ReportMeetingSummary] = field(default_factory=list) + + # Statistics + total_devices_scanned: int = 0 + wifi_devices: int = 0 + bluetooth_devices: int = 0 + rf_signals: int = 0 + new_devices: int = 0 + missing_devices: int = 0 + + # Sweep duration + sweep_start: Optional[datetime] = None + sweep_end: Optional[datetime] = None + duration_minutes: float = 0.0 + + # Technical data (for annex only) + device_timelines: list[dict] = field(default_factory=list) + all_indicators: list[dict] = field(default_factory=list) + baseline_diff: Optional[dict] = None + correlation_data: list[dict] = field(default_factory=list) + + +# ============================================================================= +# Disclaimer Text +# ============================================================================= + +REPORT_DISCLAIMER = """ +IMPORTANT DISCLAIMER + +This report documents the findings of a Technical Surveillance Countermeasures +(TSCM) sweep conducted using electronic detection equipment. The following +limitations and considerations apply: + +1. DETECTION LIMITATIONS: No TSCM sweep can guarantee detection of all + surveillance devices. Sophisticated devices may evade detection. + +2. FINDINGS ARE INDICATORS: All findings represent patterns and indicators, + NOT confirmed surveillance devices. Each finding requires professional + interpretation and may have legitimate explanations. + +3. ENVIRONMENTAL FACTORS: Wireless signals are affected by building + construction, interference, and other environmental factors that may + impact detection accuracy. + +4. POINT-IN-TIME ASSESSMENT: This report reflects conditions at the time + of the sweep. Conditions may change after the assessment. + +5. NOT LEGAL ADVICE: This report does not constitute legal advice. Consult + qualified legal counsel for guidance on surveillance-related matters. + +6. PRIVACY CONSIDERATIONS: Some detected devices may be legitimate personal + devices of authorized individuals. + +This report should be treated as confidential and distributed only to +authorized personnel on a need-to-know basis. +""" + +ANNEX_DISCLAIMER = """ +TECHNICAL ANNEX DISCLAIMER + +This annex contains detailed technical data from the TSCM sweep. This data +is provided for documentation and audit purposes. + +- No raw packet captures or intercepted communications are included +- Device identifiers (MAC addresses) are included for tracking purposes +- Signal strength values are approximate and environment-dependent +- Timeline data is time-bucketed to preserve privacy +- All interpretations require professional TSCM expertise + +This data should be handled according to organizational data protection +policies and applicable privacy regulations. +""" + + +# ============================================================================= +# Report Generation Functions +# ============================================================================= + +def generate_executive_summary(report: TSCMReport) -> str: + """Generate executive summary text.""" + lines = [] + + # Opening + lines.append(f"TSCM Sweep Report - {report.location or 'Location Not Specified'}") + lines.append(f"Conducted: {report.sweep_start.strftime('%Y-%m-%d %H:%M') if report.sweep_start else 'Unknown'}") + lines.append(f"Duration: {report.duration_minutes:.0f} minutes") + lines.append("") + + # Overall assessment + assessment_text = { + 'low': 'No significant indicators of surveillance activity were detected.', + 'moderate': 'Some devices require review but no confirmed surveillance indicators.', + 'elevated': 'Multiple indicators warrant further investigation.', + 'high': 'Significant indicators detected requiring immediate attention.', + } + lines.append(f"OVERALL ASSESSMENT: {report.overall_risk_assessment.upper()}") + lines.append(assessment_text.get(report.overall_risk_assessment, '')) + lines.append("") + + # Key statistics + lines.append("SCAN STATISTICS:") + lines.append(f" - Total devices scanned: {report.total_devices_scanned}") + lines.append(f" - WiFi access points: {report.wifi_devices}") + lines.append(f" - Bluetooth devices: {report.bluetooth_devices}") + lines.append(f" - RF signals: {report.rf_signals}") + lines.append("") + + # Findings summary + lines.append("FINDINGS SUMMARY:") + lines.append(f" - High Interest (require investigation): {len(report.high_interest_findings)}") + lines.append(f" - Needs Review: {len(report.needs_review_findings)}") + lines.append(f" - Informational: {len(report.informational_findings)}") + lines.append("") + + # Baseline comparison if available + if report.baseline_name: + lines.append(f"BASELINE COMPARISON (vs '{report.baseline_name}'):") + lines.append(f" - New devices: {report.new_devices}") + lines.append(f" - Missing devices: {report.missing_devices}") + lines.append("") + + # Meeting window summary if available + if report.meeting_summaries: + lines.append("MEETING WINDOW ACTIVITY:") + for meeting in report.meeting_summaries: + lines.append(f" - {meeting.name or 'Unnamed meeting'}: " + f"{meeting.devices_first_seen} new devices, " + f"{meeting.high_interest_devices} high interest") + lines.append("") + + # Limitations + if report.limitations: + lines.append("SWEEP LIMITATIONS:") + for limit in report.limitations[:3]: # Top 3 limitations + lines.append(f" - {limit}") + lines.append("") + + return "\n".join(lines) + + +def generate_findings_section(findings: list[ReportFinding], title: str) -> str: + """Generate a findings section for the report.""" + if not findings: + return f"{title}\n\nNo findings in this category.\n" + + lines = [title, "=" * len(title), ""] + + for i, finding in enumerate(findings, 1): + lines.append(f"{i}. {finding.name or finding.identifier}") + lines.append(f" Protocol: {finding.protocol.upper()}") + lines.append(f" Identifier: {finding.identifier}") + lines.append(f" Risk Score: {finding.risk_score}") + lines.append(f" Description: {finding.description}") + if finding.indicators: + lines.append(" Indicators:") + for ind in finding.indicators[:5]: # Limit to 5 indicators + lines.append(f" - {ind.get('type', 'unknown')}: {ind.get('description', '')}") + lines.append(f" Recommended Action: {finding.recommended_action}") + if finding.playbook_reference: + lines.append(f" Reference: {finding.playbook_reference}") + lines.append("") + + return "\n".join(lines) + + +def generate_meeting_section(summaries: list[ReportMeetingSummary]) -> str: + """Generate meeting window summary section.""" + if not summaries: + return "MEETING WINDOW SUMMARY\n\nNo meeting windows were marked during this sweep.\n" + + lines = ["MEETING WINDOW SUMMARY", "=" * 22, ""] + + for meeting in summaries: + lines.append(f"Meeting: {meeting.name or 'Unnamed'}") + lines.append(f" Time: {meeting.start_time} - {meeting.end_time or 'ongoing'}") + lines.append(f" Duration: {meeting.duration_minutes:.0f} minutes") + lines.append(f" Devices first seen during meeting: {meeting.devices_first_seen}") + lines.append(f" Behavior changes detected: {meeting.behavior_changes}") + lines.append(f" High interest devices active: {meeting.high_interest_devices}") + + if meeting.devices_first_seen > 0 or meeting.high_interest_devices > 0: + lines.append(" NOTE: Meeting-correlated activity detected - see findings for details") + lines.append("") + + lines.append("Meeting-correlated activity indicates temporal correlation only.") + lines.append("Devices appearing during meetings may have legitimate explanations.") + lines.append("") + + return "\n".join(lines) + + +def generate_pdf_content(report: TSCMReport) -> str: + """ + Generate complete PDF report content. + + Returns plain text that can be converted to PDF. + For actual PDF generation, use a library like reportlab or weasyprint. + """ + sections = [] + + # Header + sections.append("=" * 70) + sections.append("TECHNICAL SURVEILLANCE COUNTERMEASURES (TSCM) SWEEP REPORT") + sections.append("=" * 70) + sections.append("") + sections.append(f"Report ID: {report.report_id}") + sections.append(f"Generated: {report.generated_at.strftime('%Y-%m-%d %H:%M:%S')}") + sections.append(f"Sweep ID: {report.sweep_id}") + sections.append("") + + # Executive Summary + sections.append("-" * 70) + sections.append("EXECUTIVE SUMMARY") + sections.append("-" * 70) + sections.append(report.executive_summary or generate_executive_summary(report)) + sections.append("") + + # High Interest Findings + if report.high_interest_findings: + sections.append("-" * 70) + sections.append(generate_findings_section( + report.high_interest_findings, + "HIGH INTEREST FINDINGS" + )) + + # Needs Review Findings + if report.needs_review_findings: + sections.append("-" * 70) + sections.append(generate_findings_section( + report.needs_review_findings, + "FINDINGS REQUIRING REVIEW" + )) + + # Meeting Window Summary + if report.meeting_summaries: + sections.append("-" * 70) + sections.append(generate_meeting_section(report.meeting_summaries)) + + # Capabilities & Limitations + sections.append("-" * 70) + sections.append("SWEEP CAPABILITIES & LIMITATIONS") + sections.append("=" * 33) + sections.append("") + + if report.capabilities: + caps = report.capabilities + sections.append("Equipment Used:") + if caps.get('wifi', {}).get('mode') != 'unavailable': + sections.append(f" - WiFi: {caps.get('wifi', {}).get('mode', 'unknown')} mode") + if caps.get('bluetooth', {}).get('mode') != 'unavailable': + sections.append(f" - Bluetooth: {caps.get('bluetooth', {}).get('mode', 'unknown')}") + if caps.get('rf', {}).get('available'): + sections.append(f" - RF/SDR: {caps.get('rf', {}).get('device_type', 'unknown')}") + sections.append("") + + if report.limitations: + sections.append("Limitations:") + for limit in report.limitations: + sections.append(f" - {limit}") + sections.append("") + + # Disclaimer + sections.append("-" * 70) + sections.append(REPORT_DISCLAIMER) + + # Footer + sections.append("") + sections.append("=" * 70) + sections.append("END OF REPORT") + sections.append("=" * 70) + + return "\n".join(sections) + + +def generate_technical_annex_json(report: TSCMReport) -> dict: + """ + Generate technical annex as JSON. + + Contains detailed device timelines, all indicators, and raw data + for audit and further analysis. + """ + return { + 'annex_type': 'tscm_technical_annex', + 'report_id': report.report_id, + 'generated_at': report.generated_at.isoformat(), + 'sweep_id': report.sweep_id, + 'disclaimer': ANNEX_DISCLAIMER.strip(), + + 'sweep_details': { + 'type': report.sweep_type, + 'location': report.location, + 'start_time': report.sweep_start.isoformat() if report.sweep_start else None, + 'end_time': report.sweep_end.isoformat() if report.sweep_end else None, + 'duration_minutes': report.duration_minutes, + 'baseline_id': report.baseline_id, + 'baseline_name': report.baseline_name, + }, + + 'capabilities': report.capabilities, + 'limitations': report.limitations, + + 'statistics': { + 'total_devices': report.total_devices_scanned, + 'wifi_devices': report.wifi_devices, + 'bluetooth_devices': report.bluetooth_devices, + 'rf_signals': report.rf_signals, + 'new_devices': report.new_devices, + 'missing_devices': report.missing_devices, + 'high_interest_count': len(report.high_interest_findings), + 'needs_review_count': len(report.needs_review_findings), + 'informational_count': len(report.informational_findings), + }, + + 'findings': { + 'high_interest': [ + { + 'identifier': f.identifier, + 'protocol': f.protocol, + 'name': f.name, + 'risk_score': f.risk_score, + 'description': f.description, + 'indicators': f.indicators, + 'recommended_action': f.recommended_action, + } + for f in report.high_interest_findings + ], + 'needs_review': [ + { + 'identifier': f.identifier, + 'protocol': f.protocol, + 'name': f.name, + 'risk_score': f.risk_score, + 'description': f.description, + 'indicators': f.indicators, + } + for f in report.needs_review_findings + ], + }, + + 'meeting_windows': [ + { + 'name': m.name, + 'start_time': m.start_time, + 'end_time': m.end_time, + 'duration_minutes': m.duration_minutes, + 'devices_first_seen': m.devices_first_seen, + 'behavior_changes': m.behavior_changes, + 'high_interest_devices': m.high_interest_devices, + } + for m in report.meeting_summaries + ], + + 'device_timelines': report.device_timelines, + 'all_indicators': report.all_indicators, + 'baseline_diff': report.baseline_diff, + 'correlations': report.correlation_data, + } + + +def generate_technical_annex_csv(report: TSCMReport) -> str: + """ + Generate device timeline data as CSV. + + Provides spreadsheet-compatible format for further analysis. + """ + output = io.StringIO() + writer = csv.writer(output) + + # Header + writer.writerow([ + 'identifier', + 'protocol', + 'name', + 'risk_level', + 'risk_score', + 'first_seen', + 'last_seen', + 'observation_count', + 'rssi_min', + 'rssi_max', + 'rssi_mean', + 'rssi_stability', + 'movement_pattern', + 'meeting_correlated', + 'indicators', + ]) + + # Device data from timelines + for timeline in report.device_timelines: + indicators_str = '; '.join( + f"{i.get('type', '')}({i.get('score', 0)})" + for i in timeline.get('indicators', []) + ) + + signal = timeline.get('signal', {}) + metrics = timeline.get('metrics', {}) + movement = timeline.get('movement', {}) + meeting = timeline.get('meeting_correlation', {}) + + writer.writerow([ + timeline.get('identifier', ''), + timeline.get('protocol', ''), + timeline.get('name', ''), + timeline.get('risk_level', 'informational'), + timeline.get('risk_score', 0), + metrics.get('first_seen', ''), + metrics.get('last_seen', ''), + metrics.get('total_observations', 0), + signal.get('rssi_min', ''), + signal.get('rssi_max', ''), + signal.get('rssi_mean', ''), + signal.get('stability', ''), + movement.get('pattern', ''), + meeting.get('correlated', False), + indicators_str, + ]) + + # Also add findings summary + writer.writerow([]) + writer.writerow(['--- FINDINGS SUMMARY ---']) + writer.writerow(['identifier', 'protocol', 'risk_level', 'risk_score', 'description', 'recommended_action']) + + all_findings = ( + report.high_interest_findings + + report.needs_review_findings + ) + + for finding in all_findings: + writer.writerow([ + finding.identifier, + finding.protocol, + finding.risk_level, + finding.risk_score, + finding.description, + finding.recommended_action, + ]) + + return output.getvalue() + + +# ============================================================================= +# Report Builder +# ============================================================================= + +class TSCMReportBuilder: + """ + Builder for constructing TSCM reports from sweep data. + + Usage: + builder = TSCMReportBuilder(sweep_id=123) + builder.set_location("Conference Room A") + builder.add_capabilities(capabilities_dict) + builder.add_finding(finding) + report = builder.build() + """ + + def __init__(self, sweep_id: int): + self.sweep_id = sweep_id + self.report = TSCMReport( + report_id=f"TSCM-{sweep_id}-{datetime.now().strftime('%Y%m%d%H%M%S')}", + generated_at=datetime.now(), + sweep_id=sweep_id, + sweep_type='standard', + ) + + def set_sweep_type(self, sweep_type: str) -> 'TSCMReportBuilder': + self.report.sweep_type = sweep_type + return self + + def set_location(self, location: str) -> 'TSCMReportBuilder': + self.report.location = location + return self + + def set_baseline(self, baseline_id: int, baseline_name: str) -> 'TSCMReportBuilder': + self.report.baseline_id = baseline_id + self.report.baseline_name = baseline_name + return self + + def set_sweep_times( + self, + start: datetime, + end: Optional[datetime] = None + ) -> 'TSCMReportBuilder': + self.report.sweep_start = start + self.report.sweep_end = end or datetime.now() + self.report.duration_minutes = ( + (self.report.sweep_end - self.report.sweep_start).total_seconds() / 60 + ) + return self + + def add_capabilities(self, capabilities: dict) -> 'TSCMReportBuilder': + self.report.capabilities = capabilities + self.report.limitations = capabilities.get('all_limitations', []) + return self + + def add_finding(self, finding: ReportFinding) -> 'TSCMReportBuilder': + if finding.risk_level == 'high_interest': + self.report.high_interest_findings.append(finding) + elif finding.risk_level in ['review', 'needs_review']: + self.report.needs_review_findings.append(finding) + else: + self.report.informational_findings.append(finding) + return self + + def add_findings_from_profiles(self, profiles: list[dict]) -> 'TSCMReportBuilder': + """Add findings from correlation engine device profiles.""" + for profile in profiles: + finding = ReportFinding( + identifier=profile.get('identifier', ''), + protocol=profile.get('protocol', ''), + name=profile.get('name'), + risk_level=profile.get('risk_level', 'informational'), + risk_score=profile.get('total_score', 0), + description=self._generate_finding_description(profile), + indicators=profile.get('indicators', []), + recommended_action=profile.get('recommended_action', 'monitor'), + playbook_reference=self._get_playbook_reference(profile), + ) + self.add_finding(finding) + + return self + + def _generate_finding_description(self, profile: dict) -> str: + """Generate description from profile indicators.""" + indicators = profile.get('indicators', []) + if not indicators: + return f"{profile.get('protocol', 'Unknown').upper()} device detected" + + # Use first indicator as primary description + primary = indicators[0] + desc = primary.get('description', 'Pattern detected') + + if len(indicators) > 1: + desc += f" (+{len(indicators) - 1} additional indicators)" + + return desc + + def _get_playbook_reference(self, profile: dict) -> str: + """Get playbook reference based on profile.""" + risk_level = profile.get('risk_level', 'informational') + indicators = profile.get('indicators', []) + + # Check for tracker + tracker_types = ['airtag_detected', 'tile_detected', 'smarttag_detected', 'known_tracker'] + if any(i.get('type') in tracker_types for i in indicators): + if risk_level == 'high_interest': + return 'PB-001 (Tracker Detection)' + + if risk_level == 'high_interest': + return 'PB-002 (Suspicious Device)' + elif risk_level in ['review', 'needs_review']: + return 'PB-003 (Unknown Device)' + + return '' + + def add_meeting_summary(self, summary: dict) -> 'TSCMReportBuilder': + """Add meeting window summary.""" + meeting = ReportMeetingSummary( + name=summary.get('name'), + start_time=summary.get('start_time', ''), + end_time=summary.get('end_time'), + duration_minutes=summary.get('duration_minutes', 0), + devices_first_seen=summary.get('devices_first_seen', 0), + behavior_changes=summary.get('behavior_changes', 0), + high_interest_devices=summary.get('high_interest_devices', 0), + ) + self.report.meeting_summaries.append(meeting) + return self + + def add_statistics( + self, + wifi: int = 0, + bluetooth: int = 0, + rf: int = 0, + new: int = 0, + missing: int = 0 + ) -> 'TSCMReportBuilder': + self.report.wifi_devices = wifi + self.report.bluetooth_devices = bluetooth + self.report.rf_signals = rf + self.report.total_devices_scanned = wifi + bluetooth + rf + self.report.new_devices = new + self.report.missing_devices = missing + return self + + def add_device_timelines(self, timelines: list[dict]) -> 'TSCMReportBuilder': + self.report.device_timelines = timelines + return self + + def add_all_indicators(self, indicators: list[dict]) -> 'TSCMReportBuilder': + self.report.all_indicators = indicators + return self + + def add_baseline_diff(self, diff: dict) -> 'TSCMReportBuilder': + self.report.baseline_diff = diff + return self + + def add_correlations(self, correlations: list[dict]) -> 'TSCMReportBuilder': + self.report.correlation_data = correlations + return self + + def build(self) -> TSCMReport: + """Build and return the complete report.""" + # Calculate overall risk assessment + if self.report.high_interest_findings: + if len(self.report.high_interest_findings) >= 3: + self.report.overall_risk_assessment = 'high' + else: + self.report.overall_risk_assessment = 'elevated' + elif self.report.needs_review_findings: + self.report.overall_risk_assessment = 'moderate' + else: + self.report.overall_risk_assessment = 'low' + + self.report.key_findings_count = ( + len(self.report.high_interest_findings) + + len(self.report.needs_review_findings) + ) + + # Generate executive summary + self.report.executive_summary = generate_executive_summary(self.report) + + return self.report + + +# ============================================================================= +# Report Generation API Functions +# ============================================================================= + +def generate_report( + sweep_id: int, + sweep_data: dict, + device_profiles: list[dict], + capabilities: dict, + timelines: list[dict], + baseline_diff: Optional[dict] = None, + meeting_summaries: Optional[list[dict]] = None, + correlations: Optional[list[dict]] = None, +) -> TSCMReport: + """ + Generate a complete TSCM report from sweep data. + + Args: + sweep_id: Sweep ID + sweep_data: Sweep dict from database + device_profiles: List of DeviceProfile dicts from correlation engine + capabilities: Capabilities dict + timelines: Device timeline dicts + baseline_diff: Optional baseline diff dict + meeting_summaries: Optional meeting summaries + correlations: Optional correlation data + + Returns: + Complete TSCMReport + """ + builder = TSCMReportBuilder(sweep_id) + + # Basic info + builder.set_sweep_type(sweep_data.get('sweep_type', 'standard')) + + # Parse times + started_at = sweep_data.get('started_at') + completed_at = sweep_data.get('completed_at') + if started_at: + if isinstance(started_at, str): + started_at = datetime.fromisoformat(started_at.replace('Z', '+00:00')).replace(tzinfo=None) + if completed_at: + if isinstance(completed_at, str): + completed_at = datetime.fromisoformat(completed_at.replace('Z', '+00:00')).replace(tzinfo=None) + builder.set_sweep_times(started_at, completed_at) + + # Capabilities + builder.add_capabilities(capabilities) + + # Add findings from profiles + builder.add_findings_from_profiles(device_profiles) + + # Statistics + results = sweep_data.get('results', {}) + builder.add_statistics( + wifi=len(results.get('wifi', [])), + bluetooth=len(results.get('bluetooth', [])), + rf=len(results.get('rf', [])), + new=baseline_diff.get('summary', {}).get('new_devices', 0) if baseline_diff else 0, + missing=baseline_diff.get('summary', {}).get('missing_devices', 0) if baseline_diff else 0, + ) + + # Technical data + builder.add_device_timelines(timelines) + + if baseline_diff: + builder.add_baseline_diff(baseline_diff) + + if meeting_summaries: + for summary in meeting_summaries: + builder.add_meeting_summary(summary) + + if correlations: + builder.add_correlations(correlations) + + # Extract all indicators + all_indicators = [] + for profile in device_profiles: + for ind in profile.get('indicators', []): + all_indicators.append({ + 'device': profile.get('identifier'), + 'protocol': profile.get('protocol'), + **ind + }) + builder.add_all_indicators(all_indicators) + + return builder.build() + + +def get_pdf_report(report: TSCMReport) -> str: + """Get PDF-ready report content.""" + return generate_pdf_content(report) + + +def get_json_annex(report: TSCMReport) -> dict: + """Get JSON technical annex.""" + return generate_technical_annex_json(report) + + +def get_csv_annex(report: TSCMReport) -> str: + """Get CSV technical annex.""" + return generate_technical_annex_csv(report)