diff --git a/routes/adsb.py b/routes/adsb.py index 9a09168..95baec5 100644 --- a/routes/adsb.py +++ b/routes/adsb.py @@ -379,10 +379,62 @@ def parse_sbs_stream(service_addr): adsb_bytes_received = 0 adsb_lines_received = 0 + def flush_pending_updates(force: bool = False) -> None: + nonlocal last_update + if not pending_updates: + return + + now = time.time() + if not force and now - last_update < ADSB_UPDATE_INTERVAL: + return + + captured_at = datetime.now(timezone.utc) + for update_icao in tuple(pending_updates): + if update_icao in app_module.adsb_aircraft: + snapshot = app_module.adsb_aircraft[update_icao] + _broadcast_adsb_update({ + 'type': 'aircraft', + **snapshot + }) + adsb_snapshot_writer.enqueue({ + 'captured_at': captured_at, + 'icao': update_icao, + 'callsign': snapshot.get('callsign'), + 'registration': snapshot.get('registration'), + 'type_code': snapshot.get('type_code'), + 'type_desc': snapshot.get('type_desc'), + 'altitude': snapshot.get('altitude'), + 'speed': snapshot.get('speed'), + 'heading': snapshot.get('heading'), + 'vertical_rate': snapshot.get('vertical_rate'), + 'lat': snapshot.get('lat'), + 'lon': snapshot.get('lon'), + 'squawk': snapshot.get('squawk'), + 'source_host': service_addr, + 'snapshot': snapshot, + }) + # Geofence check + _gf_lat = snapshot.get('lat') + _gf_lon = snapshot.get('lon') + if _gf_lat is not None and _gf_lon is not None: + try: + from utils.geofence import get_geofence_manager + for _gf_evt in get_geofence_manager().check_position( + update_icao, 'aircraft', _gf_lat, _gf_lon, + {'callsign': snapshot.get('callsign'), 'altitude': snapshot.get('altitude')} + ): + process_event('adsb', _gf_evt, 'geofence') + except Exception: + pass + + pending_updates.clear() + last_update = now + while adsb_using_service: try: data = sock.recv(SOCKET_BUFFER_SIZE).decode('utf-8', errors='ignore') if not data: + flush_pending_updates(force=True) logger.warning("SBS connection closed (no data)") break adsb_bytes_received += len(data) @@ -501,56 +553,40 @@ def parse_sbs_stream(service_addr): 'squawk': sq, 'meaning': _EMERGENCY_SQUAWKS[sq], }, 'squawk_emergency') + elif msg_type == '2' and len(parts) > 15: + if parts[11]: + try: + aircraft['altitude'] = int(float(parts[11])) + except (ValueError, TypeError): + pass + if parts[12]: + try: + aircraft['speed'] = int(float(parts[12])) + except (ValueError, TypeError): + pass + if parts[13]: + try: + aircraft['heading'] = int(float(parts[13])) + except (ValueError, TypeError): + pass + if parts[14] and parts[15]: + try: + aircraft['lat'] = float(parts[14]) + aircraft['lon'] = float(parts[15]) + except (ValueError, TypeError): + pass + app_module.adsb_aircraft.set(icao, aircraft) pending_updates.add(icao) adsb_messages_received += 1 adsb_last_message_time = time.time() - - now = time.time() - if now - last_update >= ADSB_UPDATE_INTERVAL: - for update_icao in pending_updates: - if update_icao in app_module.adsb_aircraft: - snapshot = app_module.adsb_aircraft[update_icao] - _broadcast_adsb_update({ - 'type': 'aircraft', - **snapshot - }) - adsb_snapshot_writer.enqueue({ - 'captured_at': datetime.now(timezone.utc), - 'icao': update_icao, - 'callsign': snapshot.get('callsign'), - 'registration': snapshot.get('registration'), - 'type_code': snapshot.get('type_code'), - 'type_desc': snapshot.get('type_desc'), - 'altitude': snapshot.get('altitude'), - 'speed': snapshot.get('speed'), - 'heading': snapshot.get('heading'), - 'vertical_rate': snapshot.get('vertical_rate'), - 'lat': snapshot.get('lat'), - 'lon': snapshot.get('lon'), - 'squawk': snapshot.get('squawk'), - 'source_host': service_addr, - 'snapshot': snapshot, - }) - # Geofence check - _gf_lat = snapshot.get('lat') - _gf_lon = snapshot.get('lon') - if _gf_lat is not None and _gf_lon is not None: - try: - from utils.geofence import get_geofence_manager - for _gf_evt in get_geofence_manager().check_position( - update_icao, 'aircraft', _gf_lat, _gf_lon, - {'callsign': snapshot.get('callsign'), 'altitude': snapshot.get('altitude')} - ): - process_event('adsb', _gf_evt, 'geofence') - except Exception: - pass - pending_updates.clear() - last_update = now + flush_pending_updates() except socket.timeout: + flush_pending_updates() continue + flush_pending_updates(force=True) sock.close() adsb_connected = False except OSError as e: