Add comprehensive TSCM advanced features

Implement 9 major TSCM feature enhancements:

1. Capability & Coverage Reality Panel - Exposes what sweeps can/cannot
   detect based on OS, privileges, adapters, and SDR limits

2. Baseline Diff & Health - Shows changes vs baseline with health scoring
   (healthy/noisy/stale) based on age and device churn

3. Per-Device Timelines - Time-bucketed observations with RSSI stability,
   movement patterns, and meeting correlation

4. Whitelist/Known-Good Registry + Case Grouping - Global and per-location
   device registry with case management for sweeps/threats/notes

5. Meeting-Window Summary Enhancements - Tracks devices first seen during
   meetings with scoring modifiers

6. Client-Ready PDF Report + Technical Annex - Executive summary, findings
   by risk tier, JSON/CSV annex export

7. WiFi Advanced Indicators - Evil twin detection, probe request tracking,
   deauth burst detection (auto-disables without monitor mode)

8. Bluetooth Risk Explainability - Proximity estimates, tracker brand
   explanations, human-readable risk descriptions

9. Operator Playbooks - Procedural guidance by risk level with steps,
   safety notes, and documentation requirements

All features include mandatory disclaimers, preserve existing architecture,
and follow TSCM best practices (no packet capture, no surveillance claims).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-01-16 16:06:18 +00:00
parent 3210fc0d20
commit 234f254f4f
4 changed files with 4560 additions and 0 deletions

View File

@@ -2274,3 +2274,962 @@ def get_cluster_detail(cluster_id: str):
except Exception as e:
logger.error(f"Get cluster detail error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
# =============================================================================
# Capabilities & Coverage Endpoints
# =============================================================================
@tscm_bp.route('/capabilities')
def get_capabilities():
"""
Get current system capabilities for TSCM sweeping.
Returns what the system CAN and CANNOT detect based on OS,
privileges, adapters, and SDR hardware.
"""
try:
from utils.tscm.advanced import detect_sweep_capabilities
wifi_interface = request.args.get('wifi_interface', '')
bt_adapter = request.args.get('bt_adapter', '')
caps = detect_sweep_capabilities(
wifi_interface=wifi_interface,
bt_adapter=bt_adapter
)
return jsonify({
'status': 'success',
'capabilities': caps.to_dict()
})
except Exception as e:
logger.error(f"Get capabilities error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/sweep/<int:sweep_id>/capabilities')
def get_sweep_stored_capabilities(sweep_id: int):
"""Get stored capabilities for a specific sweep."""
from utils.database import get_sweep_capabilities
caps = get_sweep_capabilities(sweep_id)
if not caps:
return jsonify({'status': 'error', 'message': 'No capabilities stored for this sweep'}), 404
return jsonify({
'status': 'success',
'capabilities': caps
})
# =============================================================================
# Baseline Diff & Health Endpoints
# =============================================================================
@tscm_bp.route('/baseline/diff/<int:baseline_id>/<int:sweep_id>')
def get_baseline_diff(baseline_id: int, sweep_id: int):
"""
Get comprehensive diff between a baseline and a sweep.
Shows new devices, missing devices, changed characteristics,
and baseline health assessment.
"""
try:
from utils.tscm.advanced import calculate_baseline_diff
baseline = get_tscm_baseline(baseline_id)
if not baseline:
return jsonify({'status': 'error', 'message': 'Baseline not found'}), 404
sweep = get_tscm_sweep(sweep_id)
if not sweep:
return jsonify({'status': 'error', 'message': 'Sweep not found'}), 404
# Get current devices from sweep results
results = sweep.get('results', {})
if isinstance(results, str):
import json
results = json.loads(results)
current_wifi = results.get('wifi', [])
current_bt = results.get('bluetooth', [])
current_rf = results.get('rf', [])
diff = calculate_baseline_diff(
baseline=baseline,
current_wifi=current_wifi,
current_bt=current_bt,
current_rf=current_rf,
sweep_id=sweep_id
)
return jsonify({
'status': 'success',
'diff': diff.to_dict()
})
except Exception as e:
logger.error(f"Get baseline diff error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/baseline/<int:baseline_id>/health')
def get_baseline_health(baseline_id: int):
"""Get health assessment for a baseline."""
try:
from utils.tscm.advanced import BaselineHealth
from datetime import datetime
baseline = get_tscm_baseline(baseline_id)
if not baseline:
return jsonify({'status': 'error', 'message': 'Baseline not found'}), 404
# Calculate age
created_at = baseline.get('created_at')
age_hours = 0
if created_at:
if isinstance(created_at, str):
created = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
age_hours = (datetime.now() - created.replace(tzinfo=None)).total_seconds() / 3600
elif isinstance(created_at, datetime):
age_hours = (datetime.now() - created_at).total_seconds() / 3600
# Count devices
total_devices = (
len(baseline.get('wifi_networks', [])) +
len(baseline.get('bt_devices', [])) +
len(baseline.get('rf_frequencies', []))
)
# Determine health
health = 'healthy'
score = 1.0
reasons = []
if age_hours > 168:
health = 'stale'
score = 0.3
reasons.append(f'Baseline is {age_hours:.0f} hours old (over 1 week)')
elif age_hours > 72:
health = 'noisy'
score = 0.6
reasons.append(f'Baseline is {age_hours:.0f} hours old (over 3 days)')
if total_devices < 3:
score -= 0.2
reasons.append(f'Baseline has few devices ({total_devices})')
if health == 'healthy':
health = 'noisy'
return jsonify({
'status': 'success',
'health': {
'status': health,
'score': round(max(0, score), 2),
'age_hours': round(age_hours, 1),
'total_devices': total_devices,
'reasons': reasons,
}
})
except Exception as e:
logger.error(f"Get baseline health error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
# =============================================================================
# Device Timeline Endpoints
# =============================================================================
@tscm_bp.route('/device/<identifier>/timeline')
def get_device_timeline_endpoint(identifier: str):
"""
Get timeline of observations for a device.
Shows behavior over time including RSSI stability, presence,
and meeting window correlation.
"""
try:
from utils.tscm.advanced import get_timeline_manager
from utils.database import get_device_timeline
protocol = request.args.get('protocol', 'bluetooth')
since_hours = request.args.get('since_hours', 24, type=int)
# Try in-memory timeline first
manager = get_timeline_manager()
timeline = manager.get_timeline(identifier, protocol)
# Also get stored timeline from database
stored = get_device_timeline(identifier, since_hours=since_hours)
result = {
'identifier': identifier,
'protocol': protocol,
'observations': stored,
}
if timeline:
result['metrics'] = {
'first_seen': timeline.first_seen.isoformat() if timeline.first_seen else None,
'last_seen': timeline.last_seen.isoformat() if timeline.last_seen else None,
'total_observations': timeline.total_observations,
'presence_ratio': round(timeline.presence_ratio, 2),
}
result['signal'] = {
'rssi_min': timeline.rssi_min,
'rssi_max': timeline.rssi_max,
'rssi_mean': round(timeline.rssi_mean, 1) if timeline.rssi_mean else None,
'stability': round(timeline.rssi_stability, 2),
}
result['movement'] = {
'appears_stationary': timeline.appears_stationary,
'pattern': timeline.movement_pattern,
}
result['meeting_correlation'] = {
'correlated': timeline.meeting_correlated,
'observations_during_meeting': timeline.meeting_observations,
}
return jsonify({
'status': 'success',
'timeline': result
})
except Exception as e:
logger.error(f"Get device timeline error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/timelines')
def get_all_device_timelines():
"""Get all device timelines."""
try:
from utils.tscm.advanced import get_timeline_manager
manager = get_timeline_manager()
timelines = manager.get_all_timelines()
return jsonify({
'status': 'success',
'count': len(timelines),
'timelines': [t.to_dict() for t in timelines]
})
except Exception as e:
logger.error(f"Get all timelines error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
# =============================================================================
# Known-Good Registry (Whitelist) Endpoints
# =============================================================================
@tscm_bp.route('/known-devices', methods=['GET'])
def list_known_devices():
"""List all known-good devices."""
from utils.database import get_all_known_devices
location = request.args.get('location')
scope = request.args.get('scope')
devices = get_all_known_devices(location=location, scope=scope)
return jsonify({
'status': 'success',
'count': len(devices),
'devices': devices
})
@tscm_bp.route('/known-devices', methods=['POST'])
def add_known_device_endpoint():
"""
Add a device to the known-good registry.
Known devices remain visible but receive reduced risk scores.
They are NOT suppressed from reports (preserves audit trail).
"""
from utils.database import add_known_device
data = request.get_json() or {}
identifier = data.get('identifier')
protocol = data.get('protocol')
if not identifier or not protocol:
return jsonify({
'status': 'error',
'message': 'identifier and protocol are required'
}), 400
device_id = add_known_device(
identifier=identifier,
protocol=protocol,
name=data.get('name'),
description=data.get('description'),
location=data.get('location'),
scope=data.get('scope', 'global'),
added_by=data.get('added_by'),
score_modifier=data.get('score_modifier', -2),
metadata=data.get('metadata')
)
return jsonify({
'status': 'success',
'message': 'Device added to known-good registry',
'device_id': device_id
})
@tscm_bp.route('/known-devices/<identifier>', methods=['GET'])
def get_known_device_endpoint(identifier: str):
"""Get a known device by identifier."""
from utils.database import get_known_device
device = get_known_device(identifier)
if not device:
return jsonify({'status': 'error', 'message': 'Device not found'}), 404
return jsonify({
'status': 'success',
'device': device
})
@tscm_bp.route('/known-devices/<identifier>', methods=['DELETE'])
def delete_known_device_endpoint(identifier: str):
"""Remove a device from the known-good registry."""
from utils.database import delete_known_device
success = delete_known_device(identifier)
if not success:
return jsonify({'status': 'error', 'message': 'Device not found'}), 404
return jsonify({
'status': 'success',
'message': 'Device removed from known-good registry'
})
@tscm_bp.route('/known-devices/check/<identifier>')
def check_known_device(identifier: str):
"""Check if a device is in the known-good registry."""
from utils.database import is_known_good_device
location = request.args.get('location')
result = is_known_good_device(identifier, location=location)
return jsonify({
'status': 'success',
'is_known': result is not None,
'details': result
})
# =============================================================================
# Case Management Endpoints
# =============================================================================
@tscm_bp.route('/cases', methods=['GET'])
def list_cases():
"""List all TSCM cases."""
from utils.database import get_all_tscm_cases
status = request.args.get('status')
limit = request.args.get('limit', 50, type=int)
cases = get_all_tscm_cases(status=status, limit=limit)
return jsonify({
'status': 'success',
'count': len(cases),
'cases': cases
})
@tscm_bp.route('/cases', methods=['POST'])
def create_case():
"""Create a new TSCM case."""
from utils.database import create_tscm_case
data = request.get_json() or {}
name = data.get('name')
if not name:
return jsonify({'status': 'error', 'message': 'name is required'}), 400
case_id = create_tscm_case(
name=name,
description=data.get('description'),
location=data.get('location'),
priority=data.get('priority', 'normal'),
created_by=data.get('created_by'),
metadata=data.get('metadata')
)
return jsonify({
'status': 'success',
'message': 'Case created',
'case_id': case_id
})
@tscm_bp.route('/cases/<int:case_id>', methods=['GET'])
def get_case(case_id: int):
"""Get a TSCM case with all linked sweeps, threats, and notes."""
from utils.database import get_tscm_case
case = get_tscm_case(case_id)
if not case:
return jsonify({'status': 'error', 'message': 'Case not found'}), 404
return jsonify({
'status': 'success',
'case': case
})
@tscm_bp.route('/cases/<int:case_id>', methods=['PUT'])
def update_case(case_id: int):
"""Update a TSCM case."""
from utils.database import update_tscm_case
data = request.get_json() or {}
success = update_tscm_case(
case_id=case_id,
status=data.get('status'),
priority=data.get('priority'),
assigned_to=data.get('assigned_to'),
notes=data.get('notes')
)
if not success:
return jsonify({'status': 'error', 'message': 'Case not found'}), 404
return jsonify({
'status': 'success',
'message': 'Case updated'
})
@tscm_bp.route('/cases/<int:case_id>/sweeps/<int:sweep_id>', methods=['POST'])
def link_sweep_to_case(case_id: int, sweep_id: int):
"""Link a sweep to a case."""
from utils.database import add_sweep_to_case
success = add_sweep_to_case(case_id, sweep_id)
return jsonify({
'status': 'success' if success else 'error',
'message': 'Sweep linked to case' if success else 'Already linked or not found'
})
@tscm_bp.route('/cases/<int:case_id>/threats/<int:threat_id>', methods=['POST'])
def link_threat_to_case(case_id: int, threat_id: int):
"""Link a threat to a case."""
from utils.database import add_threat_to_case
success = add_threat_to_case(case_id, threat_id)
return jsonify({
'status': 'success' if success else 'error',
'message': 'Threat linked to case' if success else 'Already linked or not found'
})
@tscm_bp.route('/cases/<int:case_id>/notes', methods=['POST'])
def add_note_to_case(case_id: int):
"""Add a note to a case."""
from utils.database import add_case_note
data = request.get_json() or {}
content = data.get('content')
if not content:
return jsonify({'status': 'error', 'message': 'content is required'}), 400
note_id = add_case_note(
case_id=case_id,
content=content,
note_type=data.get('note_type', 'general'),
created_by=data.get('created_by')
)
return jsonify({
'status': 'success',
'message': 'Note added',
'note_id': note_id
})
# =============================================================================
# Meeting Window Enhanced Endpoints
# =============================================================================
@tscm_bp.route('/meeting/start-tracked', methods=['POST'])
def start_tracked_meeting():
"""
Start a tracked meeting window with database persistence.
Tracks devices first seen during meeting and behavior changes.
"""
from utils.database import start_meeting_window
from utils.tscm.advanced import get_timeline_manager
data = request.get_json() or {}
meeting_id = start_meeting_window(
sweep_id=_current_sweep_id,
name=data.get('name'),
location=data.get('location'),
notes=data.get('notes')
)
# Start meeting in correlation engine
correlation = get_correlation_engine()
correlation.start_meeting_window()
# Start in timeline manager
manager = get_timeline_manager()
manager.start_meeting_window()
_emit_event('meeting_started', {
'meeting_id': meeting_id,
'timestamp': datetime.now().isoformat(),
'name': data.get('name'),
})
return jsonify({
'status': 'success',
'message': 'Tracked meeting window started',
'meeting_id': meeting_id
})
@tscm_bp.route('/meeting/<int:meeting_id>/end', methods=['POST'])
def end_tracked_meeting(meeting_id: int):
"""End a tracked meeting window."""
from utils.database import end_meeting_window
from utils.tscm.advanced import get_timeline_manager
success = end_meeting_window(meeting_id)
if not success:
return jsonify({'status': 'error', 'message': 'Meeting not found or already ended'}), 404
# End in correlation engine
correlation = get_correlation_engine()
correlation.end_meeting_window()
# End in timeline manager
manager = get_timeline_manager()
manager.end_meeting_window()
_emit_event('meeting_ended', {
'meeting_id': meeting_id,
'timestamp': datetime.now().isoformat()
})
return jsonify({
'status': 'success',
'message': 'Meeting window ended'
})
@tscm_bp.route('/meeting/<int:meeting_id>/summary')
def get_meeting_summary_endpoint(meeting_id: int):
"""Get detailed summary of device activity during a meeting."""
try:
from utils.database import get_meeting_windows
from utils.tscm.advanced import generate_meeting_summary, get_timeline_manager
# Get meeting window
windows = get_meeting_windows(_current_sweep_id or 0)
meeting = None
for w in windows:
if w.get('id') == meeting_id:
meeting = w
break
if not meeting:
return jsonify({'status': 'error', 'message': 'Meeting not found'}), 404
# Get timelines and profiles
manager = get_timeline_manager()
timelines = manager.get_all_timelines()
correlation = get_correlation_engine()
profiles = [p.to_dict() for p in correlation.device_profiles.values()]
summary = generate_meeting_summary(meeting, timelines, profiles)
return jsonify({
'status': 'success',
'summary': summary.to_dict()
})
except Exception as e:
logger.error(f"Get meeting summary error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/meeting/active')
def get_active_meeting():
"""Get currently active meeting window."""
from utils.database import get_active_meeting_window
meeting = get_active_meeting_window(_current_sweep_id)
return jsonify({
'status': 'success',
'meeting': meeting,
'is_active': meeting is not None
})
# =============================================================================
# PDF Report & Technical Annex Endpoints
# =============================================================================
@tscm_bp.route('/report/pdf')
def get_pdf_report():
"""
Generate client-safe PDF report.
Contains executive summary, findings by risk tier, meeting window
summary, and mandatory disclaimers.
"""
try:
from utils.tscm.reports import generate_report, get_pdf_report
from utils.tscm.advanced import detect_sweep_capabilities, get_timeline_manager
sweep_id = request.args.get('sweep_id', _current_sweep_id, type=int)
if not sweep_id:
return jsonify({'status': 'error', 'message': 'No sweep specified'}), 400
sweep = get_tscm_sweep(sweep_id)
if not sweep:
return jsonify({'status': 'error', 'message': 'Sweep not found'}), 404
# Get data for report
correlation = get_correlation_engine()
profiles = [p.to_dict() for p in correlation.device_profiles.values()]
caps = detect_sweep_capabilities().to_dict()
manager = get_timeline_manager()
timelines = [t.to_dict() for t in manager.get_all_timelines()]
# Generate report
report = generate_report(
sweep_id=sweep_id,
sweep_data=sweep,
device_profiles=profiles,
capabilities=caps,
timelines=timelines
)
pdf_content = get_pdf_report(report)
return Response(
pdf_content,
mimetype='text/plain',
headers={
'Content-Disposition': f'attachment; filename=tscm_report_{sweep_id}.txt'
}
)
except Exception as e:
logger.error(f"Generate PDF report error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/report/annex')
def get_technical_annex():
"""
Generate technical annex (JSON + CSV).
Contains device timelines, all indicators, and detailed data
for audit purposes. No packet data included.
"""
try:
from utils.tscm.reports import generate_report, get_json_annex, get_csv_annex
from utils.tscm.advanced import detect_sweep_capabilities, get_timeline_manager
sweep_id = request.args.get('sweep_id', _current_sweep_id, type=int)
format_type = request.args.get('format', 'json')
if not sweep_id:
return jsonify({'status': 'error', 'message': 'No sweep specified'}), 400
sweep = get_tscm_sweep(sweep_id)
if not sweep:
return jsonify({'status': 'error', 'message': 'Sweep not found'}), 404
# Get data for report
correlation = get_correlation_engine()
profiles = [p.to_dict() for p in correlation.device_profiles.values()]
caps = detect_sweep_capabilities().to_dict()
manager = get_timeline_manager()
timelines = [t.to_dict() for t in manager.get_all_timelines()]
# Generate report
report = generate_report(
sweep_id=sweep_id,
sweep_data=sweep,
device_profiles=profiles,
capabilities=caps,
timelines=timelines
)
if format_type == 'csv':
csv_content = get_csv_annex(report)
return Response(
csv_content,
mimetype='text/csv',
headers={
'Content-Disposition': f'attachment; filename=tscm_annex_{sweep_id}.csv'
}
)
else:
annex = get_json_annex(report)
return jsonify({
'status': 'success',
'annex': annex
})
except Exception as e:
logger.error(f"Generate technical annex error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
# =============================================================================
# WiFi Advanced Indicators Endpoints
# =============================================================================
@tscm_bp.route('/wifi/advanced-indicators')
def get_wifi_advanced_indicators():
"""
Get advanced WiFi indicators (Evil Twin, Probes, Deauth).
These indicators require analysis of WiFi patterns.
Some features require monitor mode.
"""
try:
from utils.tscm.advanced import get_wifi_detector
detector = get_wifi_detector()
return jsonify({
'status': 'success',
'indicators': detector.get_all_indicators(),
'unavailable_features': detector.get_unavailable_features(),
'disclaimer': (
"All indicators represent pattern detections, NOT confirmed attacks. "
"Further investigation is required."
)
})
except Exception as e:
logger.error(f"Get WiFi indicators error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/wifi/analyze-network', methods=['POST'])
def analyze_wifi_network():
"""
Analyze a WiFi network for evil twin patterns.
Compares against known networks to detect SSID spoofing.
"""
try:
from utils.tscm.advanced import get_wifi_detector
data = request.get_json() or {}
detector = get_wifi_detector()
# Set known networks from baseline if available
baseline = get_active_tscm_baseline()
if baseline:
detector.set_known_networks(baseline.get('wifi_networks', []))
indicators = detector.analyze_network(data)
return jsonify({
'status': 'success',
'indicators': [i.to_dict() for i in indicators]
})
except Exception as e:
logger.error(f"Analyze WiFi network error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
# =============================================================================
# Bluetooth Risk Explainability Endpoints
# =============================================================================
@tscm_bp.route('/bluetooth/<identifier>/explain')
def explain_bluetooth_risk(identifier: str):
"""
Get human-readable risk explanation for a BLE device.
Includes proximity estimate, tracker explanation, and
recommended actions.
"""
try:
from utils.tscm.advanced import generate_ble_risk_explanation
# Get device from correlation engine
correlation = get_correlation_engine()
profile = None
key = f"bluetooth:{identifier.upper()}"
if key in correlation.device_profiles:
profile = correlation.device_profiles[key].to_dict()
# Try to find device info
device = {'mac': identifier}
if profile:
device['name'] = profile.get('name')
device['rssi'] = profile.get('rssi_samples', [None])[-1] if profile.get('rssi_samples') else None
# Check meeting status
is_meeting = correlation.is_during_meeting()
explanation = generate_ble_risk_explanation(device, profile, is_meeting)
return jsonify({
'status': 'success',
'explanation': explanation.to_dict()
})
except Exception as e:
logger.error(f"Explain BLE risk error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/bluetooth/<identifier>/proximity')
def get_bluetooth_proximity(identifier: str):
"""Get proximity estimate for a BLE device."""
try:
from utils.tscm.advanced import estimate_ble_proximity
rssi = request.args.get('rssi', type=int)
if rssi is None:
# Try to get from correlation engine
correlation = get_correlation_engine()
key = f"bluetooth:{identifier.upper()}"
if key in correlation.device_profiles:
profile = correlation.device_profiles[key]
if profile.rssi_samples:
rssi = profile.rssi_samples[-1]
if rssi is None:
return jsonify({
'status': 'error',
'message': 'RSSI value required'
}), 400
proximity, explanation, distance = estimate_ble_proximity(rssi)
return jsonify({
'status': 'success',
'proximity': {
'estimate': proximity.value,
'explanation': explanation,
'estimated_distance': distance,
'rssi_used': rssi,
},
'disclaimer': (
"Proximity estimates are approximate and affected by "
"environment, obstacles, and device characteristics."
)
})
except Exception as e:
logger.error(f"Get BLE proximity error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
# =============================================================================
# Operator Playbook Endpoints
# =============================================================================
@tscm_bp.route('/playbooks')
def list_playbooks():
"""List all available operator playbooks."""
try:
from utils.tscm.advanced import PLAYBOOKS
return jsonify({
'status': 'success',
'playbooks': {
pid: pb.to_dict()
for pid, pb in PLAYBOOKS.items()
}
})
except Exception as e:
logger.error(f"List playbooks error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/playbooks/<playbook_id>')
def get_playbook(playbook_id: str):
"""Get a specific playbook."""
try:
from utils.tscm.advanced import PLAYBOOKS
if playbook_id not in PLAYBOOKS:
return jsonify({'status': 'error', 'message': 'Playbook not found'}), 404
return jsonify({
'status': 'success',
'playbook': PLAYBOOKS[playbook_id].to_dict()
})
except Exception as e:
logger.error(f"Get playbook error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/findings/<identifier>/playbook')
def get_finding_playbook(identifier: str):
"""Get recommended playbook for a specific finding."""
try:
from utils.tscm.advanced import get_playbook_for_finding
# Get profile
correlation = get_correlation_engine()
profile = None
for protocol in ['bluetooth', 'wifi', 'rf']:
key = f"{protocol}:{identifier.upper()}"
if key in correlation.device_profiles:
profile = correlation.device_profiles[key].to_dict()
break
if not profile:
return jsonify({'status': 'error', 'message': 'Finding not found'}), 404
playbook = get_playbook_for_finding(
risk_level=profile.get('risk_level', 'informational'),
indicators=profile.get('indicators', [])
)
return jsonify({
'status': 'success',
'playbook': playbook.to_dict(),
'suggested_next_steps': [
f"Step {s.step_number}: {s.action}"
for s in playbook.steps[:3]
]
})
except Exception as e:
logger.error(f"Get finding playbook error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500