Add files via upload

This commit is contained in:
Colonel Panic
2025-08-29 20:37:40 -04:00
committed by GitHub
parent 04d76e1274
commit 661def11eb
2 changed files with 245 additions and 36 deletions
+213 -34
View File
@@ -22,6 +22,9 @@ detections = []
cumulative_detections = []
session_start_time = datetime.now()
gps_data = None
gps_history = [] # Buffer of recent GPS readings for temporal matching
MAX_GPS_HISTORY = 100 # Keep last 100 GPS readings
GPS_MATCH_THRESHOLD = 30 # Max seconds between detection and GPS reading
serial_connection = None
gps_enabled = False
flock_device_connected = False
@@ -217,6 +220,17 @@ def gps_reader():
parsed = parse_nmea_sentence(line)
if parsed:
gps_data = parsed
# Add to GPS history with timestamp for temporal matching
if parsed.get('fix_quality') > 0:
gps_entry = parsed.copy()
gps_entry['system_timestamp'] = time.time()
gps_history.append(gps_entry)
# Keep only recent GPS readings
if len(gps_history) > MAX_GPS_HISTORY:
gps_history.pop(0)
safe_socket_emit('gps_update', parsed)
# Also send parsed GPS data to terminal
@@ -273,28 +287,145 @@ def flock_reader():
break
time.sleep(0.1)
def find_best_gps_match(detection_timestamp):
"""Find the GPS reading closest in time to the detection timestamp"""
global gps_history
if not gps_history:
return None
try:
# Convert detection timestamp to epoch time
if isinstance(detection_timestamp, str):
# Try parsing ISO format first
try:
dt = datetime.fromisoformat(detection_timestamp.replace('Z', '+00:00'))
except:
# Try parsing the display format
dt = datetime.strptime(detection_timestamp, '%Y-%m-%d %H:%M:%S')
detection_time = dt.timestamp()
else:
detection_time = detection_timestamp
best_match = None
min_time_diff = float('inf')
for gps_entry in gps_history:
gps_time = gps_entry['system_timestamp']
time_diff = abs(detection_time - gps_time)
if time_diff < min_time_diff and time_diff <= GPS_MATCH_THRESHOLD:
min_time_diff = time_diff
best_match = gps_entry
return best_match
except Exception as e:
print(f"Error finding GPS match: {e}")
return None
def validate_gps_data(gps_data):
"""Validate GPS data integrity"""
if not gps_data:
return False, "No GPS data"
lat = gps_data.get('latitude')
lon = gps_data.get('longitude')
if lat is None or lon is None:
return False, "Missing coordinates"
# Basic coordinate validation
if not (-90 <= lat <= 90):
return False, f"Invalid latitude: {lat}"
if not (-180 <= lon <= 180):
return False, f"Invalid longitude: {lon}"
fix_quality = gps_data.get('fix_quality', 0)
if fix_quality < 1:
return False, f"Poor GPS fix quality: {fix_quality}"
return True, "Valid GPS data"
def add_detection_from_serial(data):
"""Add detection from serial data - counts detections per MAC address"""
global detections, cumulative_detections, gps_data, next_detection_id
# Add GPS data if available
if gps_data and gps_data.get('fix_quality') > 0:
data['gps'] = {
'latitude': gps_data.get('latitude'),
'longitude': gps_data.get('longitude'),
'altitude': gps_data.get('altitude'),
'timestamp': gps_data.get('timestamp'),
'satellites': gps_data.get('satellites'),
'fix_quality': gps_data.get('fix_quality')
}
# Add server timestamp first (system time when detection was processed)
system_time = time.time()
data['server_timestamp'] = datetime.fromtimestamp(system_time).isoformat()
# Try to find the best GPS match for this detection's timestamp
best_gps = find_best_gps_match(system_time)
preferred_timestamp = None
if best_gps:
# Validate GPS data before using it
is_valid, validation_msg = validate_gps_data(best_gps)
if is_valid:
time_diff = abs(system_time - best_gps['system_timestamp'])
data['gps'] = {
'latitude': best_gps.get('latitude'),
'longitude': best_gps.get('longitude'),
'altitude': best_gps.get('altitude'),
'timestamp': best_gps.get('timestamp'),
'satellites': best_gps.get('satellites'),
'fix_quality': best_gps.get('fix_quality'),
'time_diff': time_diff,
'match_quality': 'temporal'
}
# Prefer GPS timestamp when available and accurate
if time_diff < 5: # Very close temporal match
preferred_timestamp = best_gps.get('timestamp')
print(f"✓ Using GPS timestamp for MAC {data.get('mac_address', 'unknown')}: {time_diff:.2f}s difference")
else:
print(f"✓ GPS temporal match for MAC {data.get('mac_address', 'unknown')}: {time_diff:.2f}s difference")
else:
print(f"⚠ Invalid GPS data for temporal match: {validation_msg}")
best_gps = None
# Fallback to current GPS if no good temporal match
if not best_gps and gps_data and gps_data.get('fix_quality') > 0:
is_valid, validation_msg = validate_gps_data(gps_data)
if is_valid:
data['gps'] = {
'latitude': gps_data.get('latitude'),
'longitude': gps_data.get('longitude'),
'altitude': gps_data.get('altitude'),
'timestamp': gps_data.get('timestamp'),
'satellites': gps_data.get('satellites'),
'fix_quality': gps_data.get('fix_quality'),
'time_diff': None, # Unknown time difference
'match_quality': 'current'
}
# Use current GPS timestamp if available
preferred_timestamp = gps_data.get('timestamp')
print(f"○ Using current GPS timestamp for MAC {data.get('mac_address', 'unknown')} (no temporal match)")
else:
print(f"⚠ Current GPS data invalid: {validation_msg}")
# Set timestamps - prefer GPS timestamp when available
if preferred_timestamp:
data['timestamp'] = preferred_timestamp
data['detection_time'] = preferred_timestamp
data['timestamp_source'] = 'gps'
print(f"📍 Using GPS timestamp as primary timestamp for {data.get('mac_address', 'unknown')}")
else:
# Fallback to system timestamps
system_dt = datetime.fromtimestamp(system_time)
data['timestamp'] = system_dt.isoformat()
data['detection_time'] = system_dt.strftime('%Y-%m-%d %H:%M:%S')
data['timestamp_source'] = 'system'
print(f"🕐 Using system timestamp for {data.get('mac_address', 'unknown')} (no GPS available)")
# Log if no GPS could be assigned
if not data.get('gps'):
print(f"✗ No valid GPS data available for MAC {data.get('mac_address', 'unknown')}")
# Add manufacturer information
if 'mac_address' in data:
data['manufacturer'] = lookup_manufacturer(data['mac_address'])
# Add server timestamp
data['server_timestamp'] = datetime.now().isoformat()
# Check if we already have a detection for this MAC address
mac_address = data.get('mac_address')
existing_detection = None
@@ -685,32 +816,42 @@ def export_csv():
with open(filepath, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = [
'timestamp', 'detection_time', 'protocol', 'detection_method',
'ssid', 'mac_address', 'manufacturer', 'alias', 'rssi', 'signal_strength', 'channel',
'latitude', 'longitude', 'altitude', 'gps_timestamp', 'satellites', 'detection_count'
'timestamp', 'detection_time', 'server_timestamp', 'protocol', 'detection_method',
'ssid', 'device_name', 'mac_address', 'manufacturer', 'alias', 'rssi', 'last_rssi',
'signal_strength', 'channel', 'last_channel', 'detection_count',
'latitude', 'longitude', 'altitude', 'gps_timestamp', 'satellites', 'fix_quality', 'gps_time_diff', 'gps_match_quality', 'timestamp_source'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for detection in data_to_export:
gps_data = detection.get('gps', {})
row = {
'timestamp': detection.get('timestamp'),
'detection_time': detection.get('detection_time'),
'server_timestamp': detection.get('server_timestamp'),
'protocol': detection.get('protocol'),
'detection_method': detection.get('detection_method'),
'ssid': detection.get('ssid', ''),
'device_name': detection.get('device_name', ''),
'mac_address': detection.get('mac_address'),
'manufacturer': detection.get('manufacturer', 'Unknown'),
'alias': detection.get('alias', ''),
'rssi': detection.get('last_rssi') or detection.get('rssi'),
'rssi': detection.get('rssi'),
'last_rssi': detection.get('last_rssi'),
'signal_strength': detection.get('signal_strength'),
'channel': detection.get('last_channel') or detection.get('channel'),
'latitude': detection.get('gps', {}).get('latitude'),
'longitude': detection.get('gps', {}).get('longitude'),
'altitude': detection.get('gps', {}).get('altitude'),
'gps_timestamp': detection.get('gps', {}).get('timestamp'),
'satellites': detection.get('gps', {}).get('satellites'),
'detection_count': detection.get('detection_count', 1)
'channel': detection.get('channel'),
'last_channel': detection.get('last_channel'),
'detection_count': detection.get('detection_count', 1),
'latitude': gps_data.get('latitude'),
'longitude': gps_data.get('longitude'),
'altitude': gps_data.get('altitude'),
'gps_timestamp': gps_data.get('timestamp'),
'satellites': gps_data.get('satellites'),
'fix_quality': gps_data.get('fix_quality'),
'gps_time_diff': gps_data.get('time_diff'),
'gps_match_quality': gps_data.get('match_quality'),
'timestamp_source': detection.get('timestamp_source', 'unknown')
}
writer.writerow(row)
@@ -748,22 +889,60 @@ def export_kml():
for i, detection in enumerate(data_to_export):
gps = detection.get('gps', {})
if gps.get('latitude') and gps.get('longitude'):
# Use alias if available, otherwise use detection number
placemark_name = detection.get('alias') or f"Detection {i+1}"
# GPS accuracy indicator
gps_accuracy = ""
if gps.get('time_diff') is not None:
time_diff = gps.get('time_diff')
if time_diff < 5:
gps_accuracy = f" (✓ Precise: {time_diff:.1f}s)"
elif time_diff < 15:
gps_accuracy = f" (~ Good: {time_diff:.1f}s)"
else:
gps_accuracy = f" (⚠ Approximate: {time_diff:.1f}s)"
else:
gps_accuracy = " (? Unknown accuracy)"
# Build device info
device_info = ""
if detection.get('ssid'):
device_info += f"<b>SSID:</b> {detection.get('ssid')}<br/>"
if detection.get('device_name'):
device_info += f"<b>Device Name:</b> {detection.get('device_name')}<br/>"
# RSSI info
rssi_info = detection.get('last_rssi') or detection.get('rssi', 'N/A')
# Channel info
channel_info = detection.get('last_channel') or detection.get('channel', 'N/A')
kml_content += f"""
<Placemark>
<name>Detection {i+1}</name>
<name>{placemark_name}</name>
<description>
<![CDATA[
<b>Protocol:</b> {detection.get('protocol')}<br/>
<b>Method:</b> {detection.get('detection_method')}<br/>
<b>SSID:</b> {detection.get('ssid', 'N/A')}<br/>
<b>MAC:</b> {detection.get('mac_address')}<br/>
<b>Detection Method:</b> {detection.get('detection_method')}<br/>
{device_info}
<b>MAC Address:</b> {detection.get('mac_address')}<br/>
<b>Manufacturer:</b> {detection.get('manufacturer', 'Unknown')}<br/>
<b>Alias:</b> {detection.get('alias', 'N/A')}<br/>
<b>RSSI:</b> {detection.get('rssi')} dBm<br/>
<b>Signal:</b> {detection.get('signal_strength')}<br/>
<b>Channel:</b> {detection.get('channel')}<br/>
<b>Time:</b> {detection.get('detection_time')}<br/>
<b>GPS Satellites:</b> {gps.get('satellites', 'N/A')}
<b>Alias:</b> {detection.get('alias', 'None')}<br/>
<b>RSSI:</b> {rssi_info} dBm<br/>
<b>Signal Strength:</b> {detection.get('signal_strength', 'N/A')}<br/>
<b>Channel:</b> {channel_info}<br/>
<b>Detection Count:</b> {detection.get('detection_count', 1)}<br/>
<b>Detection Time:</b> {detection.get('detection_time', 'N/A')}<br/>
<b>Server Timestamp:</b> {detection.get('server_timestamp', 'N/A')}<br/>
<hr/>
<b>GPS Coordinates:</b> {gps.get('latitude'):.6f}, {gps.get('longitude'):.6f}{gps_accuracy}<br/>
<b>GPS Altitude:</b> {gps.get('altitude', 'N/A')} m<br/>
<b>GPS Satellites:</b> {gps.get('satellites', 'N/A')}<br/>
<b>GPS Fix Quality:</b> {gps.get('fix_quality', 'N/A')}<br/>
<b>GPS Match Quality:</b> {gps.get('match_quality', 'N/A')}<br/>
<b>GPS Timestamp:</b> {gps.get('timestamp', 'N/A')}<br/>
<b>Timestamp Source:</b> {detection.get('timestamp_source', 'Unknown').upper()}
]]>
</description>
<Point>
+32 -2
View File
@@ -862,6 +862,17 @@
letter-spacing: 0.5px;
}
.gps-tag {
background: #22c55e;
color: white;
padding: 0.15rem 0.4rem;
border-radius: 3px;
font-size: 0.7rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.3px;
}
.detection-count {
background: #059669;
color: white;
@@ -1979,6 +1990,7 @@
<div class="detection-type-badge">
<span class="detection-type">${detection.detection_method ? detection.detection_method.toUpperCase() : 'UNKNOWN'}</span>
<span class="detection-count">${count}×</span>
${detection.gps && detection.gps.latitude !== undefined ? '<span class="gps-tag">GPS</span>' : ''}
</div>
${gpsLink}
</div>
@@ -2677,15 +2689,33 @@
// Create popup content with data source indicator
const dataSource = isSessionData ? 'Session' : 'Cumulative';
const aliasText = detection.alias ? `<strong>Alias:</strong> ${detection.alias}<br>` : '';
// GPS accuracy indicator
let gpsAccuracy = '';
if (detection.gps.time_diff !== undefined && detection.gps.time_diff !== null) {
const timeDiff = detection.gps.time_diff;
if (timeDiff < 5) {
gpsAccuracy = ` <span style="color: #22c55e;">✓ Precise (${timeDiff.toFixed(1)}s)</span>`;
} else if (timeDiff < 15) {
gpsAccuracy = ` <span style="color: #f59e0b;">~ Good (${timeDiff.toFixed(1)}s)</span>`;
} else {
gpsAccuracy = ` <span style="color: #ef4444;">⚠ Approximate (${timeDiff.toFixed(1)}s)</span>`;
}
} else {
gpsAccuracy = ` <span style="color: #6b7280;">? Unknown accuracy</span>`;
}
const popupContent = `
<h3>Detection #${detection.id} (${dataSource})</h3>
<h3>${detection.alias || `Detection #${detection.id}`} (${dataSource})</h3>
${aliasText}
<strong>Protocol:</strong> ${detection.protocol}<br>
<strong>Method:</strong> ${detection.detection_method}<br>
<strong>MAC:</strong> ${detection.mac_address}<br>
${detection.ssid ? `<strong>SSID:</strong> ${detection.ssid}<br>` : ''}
${detection.manufacturer ? `<strong>Manufacturer:</strong> ${detection.manufacturer}<br>` : ''}
<strong>RSSI:</strong> ${detection.last_rssi || detection.rssi} dBm<br>
<strong>GPS:</strong> ${lat.toFixed(6)}, ${lng.toFixed(6)}<br>
<strong>GPS:</strong> ${lat.toFixed(6)}, ${lng.toFixed(6)}${gpsAccuracy}<br>
<strong>Satellites:</strong> ${detection.gps.satellites}<br>
<strong>Count:</strong> ${detection.detection_count || 1}<br>
<strong>Source:</strong> ${dataSource} Data