From fc48ff7d9f8bfaa75e128bc1760345f95cfa822a Mon Sep 17 00:00:00 2001 From: Smittix Date: Fri, 16 Jan 2026 13:50:32 +0000 Subject: [PATCH] Add comprehensive APRS packet parsing support - Add Mic-E position decoding (destination field encoding) - Add compressed position format parsing (Base-91) - Add complete telemetry parsing with analog/digital values - Add telemetry definition messages (PARM, UNIT, EQNS, BITS) - Add message ACK/REJ parsing and sequence numbers - Add weather data parsing in position packets - Add PHG (Power/Height/Gain/Directivity) parsing - Add Direction Finding (DF) report parsing - Add timestamp extraction from position packets - Add third-party traffic, NMEA, user-defined format parsing - Add bulletin, NWS alert, query, and capabilities parsing - Expand weather fields (luminosity, snow, radiation, etc.) Co-Authored-By: Claude Opus 4.5 --- routes/aprs.py | 900 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 879 insertions(+), 21 deletions(-) diff --git a/routes/aprs.py b/routes/aprs.py index c8f554f..41bb886 100644 --- a/routes/aprs.py +++ b/routes/aprs.py @@ -97,7 +97,20 @@ MODEM 1200 def parse_aprs_packet(raw_packet: str) -> Optional[dict]: - """Parse APRS packet into structured data.""" + """Parse APRS packet into structured data. + + Supports all major APRS packet types: + - Position reports (standard, compressed, Mic-E) + - Weather reports (standalone and in position packets) + - Objects and Items + - Messages (including ACK/REJ and telemetry definitions) + - Telemetry data + - Status reports + - Queries and capabilities + - Third-party traffic + - Raw GPS/NMEA data + - User-defined formats + """ try: # Basic APRS packet format: CALLSIGN>PATH:DATA # Example: N0CALL-9>APRS,TCPIP*:@092345z4903.50N/07201.75W_090/000g005t077 @@ -118,35 +131,165 @@ def parse_aprs_packet(raw_packet: str) -> Optional[dict]: 'timestamp': datetime.utcnow().isoformat() + 'Z', } + # Extract destination from path (first element before any comma) + dest_parts = path.split(',') + dest = dest_parts[0] if dest_parts else '' + + # Check for Mic-E format first (data starts with ` or ' and dest is 6+ chars) + if len(data) >= 9 and data[0] in '`\'' and len(dest) >= 6: + mic_e_data = parse_mic_e(dest, data) + if mic_e_data: + packet['packet_type'] = 'position' + packet['position_format'] = 'mic_e' + packet.update(mic_e_data) + return packet + # Determine packet type and parse accordingly if data.startswith('!') or data.startswith('='): - # Position without timestamp + # Position without timestamp (! = no messaging, = = with messaging) packet['packet_type'] = 'position' - pos = parse_position(data[1:]) - if pos: - packet.update(pos) + packet['messaging_capable'] = data.startswith('=') + pos_data = data[1:] + + # Check for compressed format (starts with /\[A-Za-z]) + if len(pos_data) >= 13 and pos_data[0] in '/\\': + pos = parse_compressed_position(pos_data) + if pos: + packet['position_format'] = 'compressed' + packet.update(pos) + else: + pos = parse_position(pos_data) + if pos: + packet['position_format'] = 'uncompressed' + packet.update(pos) + + # Check for weather data in position packet (after position) + if '_' in pos_data or 'g' in pos_data or 't' in pos_data[15:] if len(pos_data) > 15 else False: + weather = parse_weather(pos_data) + if weather: + packet['weather'] = weather + + # Check for PHG data + phg = parse_phg(pos_data) + if phg: + packet.update(phg) + + # Check for RNG data + rng = parse_rng(pos_data) + if rng: + packet.update(rng) + + # Check for DF report data + df = parse_df_report(pos_data) + if df: + packet.update(df) elif data.startswith('/') or data.startswith('@'): - # Position with timestamp + # Position with timestamp (/ = no messaging, @ = with messaging) packet['packet_type'] = 'position' - # Skip timestamp (7 chars) and parse position + packet['messaging_capable'] = data.startswith('@') + + # Parse timestamp (first 7 chars after type indicator) if len(data) > 8: - pos = parse_position(data[8:]) - if pos: - packet.update(pos) + ts_data = parse_timestamp(data[1:8]) + if ts_data: + packet.update(ts_data) + + pos_data = data[8:] + + # Check for compressed format + if len(pos_data) >= 13 and pos_data[0] in '/\\': + pos = parse_compressed_position(pos_data) + if pos: + packet['position_format'] = 'compressed' + packet.update(pos) + else: + pos = parse_position(pos_data) + if pos: + packet['position_format'] = 'uncompressed' + packet.update(pos) + + # Check for weather data in position packet + weather = parse_weather(pos_data) + if weather: + packet['weather'] = weather + + # Check for PHG data + phg = parse_phg(pos_data) + if phg: + packet.update(phg) + + # Check for RNG data + rng = parse_rng(pos_data) + if rng: + packet.update(rng) elif data.startswith('>'): # Status message packet['packet_type'] = 'status' - packet['status'] = data[1:] + status_data = data[1:] + packet['status'] = status_data + + # Check for Maidenhead grid locator in status (common pattern) + grid_match = re.match(r'^([A-R]{2}[0-9]{2}[A-X]{0,2})\s*', status_data, re.IGNORECASE) + if grid_match: + packet['grid'] = grid_match.group(1).upper() elif data.startswith(':'): - # Message + # Message format - check for various subtypes packet['packet_type'] = 'message' + + # Standard message: :ADDRESSEE:MESSAGE msg_match = re.match(r'^:([A-Z0-9 -]{9}):(.*)$', data, re.IGNORECASE) if msg_match: - packet['addressee'] = msg_match.group(1).strip() - packet['message'] = msg_match.group(2) + addressee = msg_match.group(1).strip() + message = msg_match.group(2) + packet['addressee'] = addressee + + # Check for telemetry definition messages + telem_def_match = re.match(r'^(PARM|UNIT|EQNS|BITS)\.(.*)$', message) + if telem_def_match: + packet['packet_type'] = 'telemetry_definition' + telem_def = parse_telemetry_definition( + addressee, telem_def_match.group(1), telem_def_match.group(2) + ) + if telem_def: + packet.update(telem_def) + else: + packet['message'] = message + + # Check for ACK/REJ + ack_match = re.match(r'^ack(\w+)$', message, re.IGNORECASE) + if ack_match: + packet['message_type'] = 'ack' + packet['ack_id'] = ack_match.group(1) + + rej_match = re.match(r'^rej(\w+)$', message, re.IGNORECASE) + if rej_match: + packet['message_type'] = 'rej' + packet['rej_id'] = rej_match.group(1) + + # Check for message ID (for acknowledgment) + msgid_match = re.search(r'\{(\w{1,5})$', message) + if msgid_match: + packet['message_id'] = msgid_match.group(1) + packet['message'] = message[:message.rfind('{')] + + # Bulletin format: :BLNn :message + elif data[1:4] == 'BLN': + packet['packet_type'] = 'bulletin' + bln_match = re.match(r'^:BLN([0-9A-Z])[ ]*:(.*)$', data, re.IGNORECASE) + if bln_match: + packet['bulletin_id'] = bln_match.group(1) + packet['bulletin'] = bln_match.group(2) + + # NWS weather alert: :NWS-xxxxx:message + elif data[1:5] == 'NWS-': + packet['packet_type'] = 'nws_alert' + nws_match = re.match(r'^:NWS-([A-Z]+)[ ]*:(.*)$', data, re.IGNORECASE) + if nws_match: + packet['nws_id'] = nws_match.group(1) + packet['alert'] = nws_match.group(2) elif data.startswith('_'): # Weather report (Positionless) @@ -155,15 +298,19 @@ def parse_aprs_packet(raw_packet: str) -> Optional[dict]: elif data.startswith(';'): # Object format: ;OBJECTNAME*DDHHMMzPOSITION or ;OBJECTNAME_DDHHMMzPOSITION - # Name is exactly 9 chars, then * (live) or _ (killed), then 7-char timestamp, then position packet['packet_type'] = 'object' obj_data = parse_object(data) if obj_data: packet.update(obj_data) + # Check for weather data in object + remaining = data[18:] if len(data) > 18 else '' + weather = parse_weather(remaining) + if weather: + packet['weather'] = weather + elif data.startswith(')'): # Item format: )ITEMNAME!POSITION or )ITEMNAME_POSITION - # Name is 3-9 chars, terminated by ! (live) or _ (killed), then position packet['packet_type'] = 'item' item_data = parse_item(data) if item_data: @@ -172,11 +319,60 @@ def parse_aprs_packet(raw_packet: str) -> Optional[dict]: elif data.startswith('T'): # Telemetry packet['packet_type'] = 'telemetry' + telem = parse_telemetry(data) + if telem: + packet.update(telem) + + elif data.startswith('}'): + # Third-party traffic + packet['packet_type'] = 'third_party' + third = parse_third_party(data) + if third: + packet.update(third) + + elif data.startswith('$'): + # Raw GPS NMEA data + packet['packet_type'] = 'nmea' + nmea = parse_nmea(data) + if nmea: + packet.update(nmea) + + elif data.startswith('{'): + # User-defined format + packet['packet_type'] = 'user_defined' + user = parse_user_defined(data) + if user: + packet.update(user) + + elif data.startswith('<'): + # Station capabilities + packet['packet_type'] = 'capabilities' + caps = parse_capabilities(data) + if caps: + packet.update(caps) + + elif data.startswith('?'): + # Query + packet['packet_type'] = 'query' + query = parse_capabilities(data) + if query: + packet.update(query) else: packet['packet_type'] = 'other' packet['data'] = data + # Extract comment if present (after standard data) + # Many APRS packets have freeform comments at the end + if 'data' not in packet and packet['packet_type'] in ('position', 'object', 'item'): + # Look for common comment patterns + comment_match = re.search(r'/([^/]+)$', data) + if comment_match and not re.match(r'^A=[-\d]+', comment_match.group(1)): + potential_comment = comment_match.group(1) + # Exclude things that look like data fields + if len(potential_comment) > 3 and not re.match(r'^\d{3}/', potential_comment): + packet['comment'] = potential_comment + return packet except Exception as e: @@ -350,13 +546,22 @@ def parse_item(data: str) -> Optional[dict]: def parse_weather(data: str) -> dict: - """Parse APRS weather data.""" + """Parse APRS weather data. + + Weather data can appear in positionless weather reports (starting with _) + or as an extension after position data. Supports all standard APRS weather fields. + """ weather = {} - # Wind direction: cCCC + # Wind direction: cCCC (degrees) or _CCC at start of positionless match = re.search(r'c(\d{3})', data) if match: weather['wind_direction'] = int(match.group(1)) + elif data.startswith('_') and len(data) > 4: + # Positionless format starts with _MMDDhhmm then wind dir + wind_match = re.match(r'_\d{8}(\d{3})', data) + if wind_match: + weather['wind_direction'] = int(wind_match.group(1)) # Wind speed: sSSS (mph) match = re.search(r's(\d{3})', data) @@ -368,7 +573,7 @@ def parse_weather(data: str) -> dict: if match: weather['wind_gust'] = int(match.group(1)) - # Temperature: tTTT (Fahrenheit) + # Temperature: tTTT (Fahrenheit, can be negative) match = re.search(r't(-?\d{2,3})', data) if match: weather['temperature'] = int(match.group(1)) @@ -378,12 +583,17 @@ def parse_weather(data: str) -> dict: if match: weather['rain_1h'] = int(match.group(1)) / 100.0 - # Rain last 24h: pPPP + # Rain last 24h: pPPP (hundredths of inch) match = re.search(r'p(\d{3})', data) if match: weather['rain_24h'] = int(match.group(1)) / 100.0 - # Humidity: hHH (%) + # Rain since midnight: PPPP (hundredths of inch) + match = re.search(r'P(\d{3})', data) + if match: + weather['rain_midnight'] = int(match.group(1)) / 100.0 + + # Humidity: hHH (%, 00 = 100%) match = re.search(r'h(\d{2})', data) if match: h = int(match.group(1)) @@ -394,9 +604,657 @@ def parse_weather(data: str) -> dict: if match: weather['pressure'] = int(match.group(1)) / 10.0 + # Luminosity: LLLL (watts per square meter) + # L = 0-999 W/m², l = 1000-1999 W/m² (subtract 1000) + match = re.search(r'L(\d{3})', data) + if match: + weather['luminosity'] = int(match.group(1)) + else: + match = re.search(r'l(\d{3})', data) + if match: + weather['luminosity'] = int(match.group(1)) + 1000 + + # Snow (last 24h): #SSS (inches) + match = re.search(r'#(\d{3})', data) + if match: + weather['snow_24h'] = int(match.group(1)) + + # Raw rain counter: !RRR (for Peet Bros stations) + match = re.search(r'!(\d{3})', data) + if match: + weather['rain_raw'] = int(match.group(1)) + + # Radiation: X### (nanosieverts/hour) - some weather stations + match = re.search(r'X(\d{3})', data) + if match: + weather['radiation'] = int(match.group(1)) + + # Flooding/water level: F### (feet above/below flood stage) + match = re.search(r'F(-?\d{3})', data) + if match: + weather['flood_level'] = int(match.group(1)) + + # Voltage: V### (volts, for battery monitoring) + match = re.search(r'V(\d{3})', data) + if match: + weather['voltage'] = int(match.group(1)) / 10.0 + + # Software type often at end (e.g., "Davis" or "Arduino") + # Extract as weather station type + wx_type_match = re.search(r'([A-Za-z]{4,})$', data) + if wx_type_match: + weather['wx_station_type'] = wx_type_match.group(1) + return weather +# Mic-E encoding tables +MIC_E_DEST_TABLE = { + '0': (0, 'S', 0), '1': (1, 'S', 0), '2': (2, 'S', 0), '3': (3, 'S', 0), + '4': (4, 'S', 0), '5': (5, 'S', 0), '6': (6, 'S', 0), '7': (7, 'S', 0), + '8': (8, 'S', 0), '9': (9, 'S', 0), + 'A': (0, 'S', 1), 'B': (1, 'S', 1), 'C': (2, 'S', 1), 'D': (3, 'S', 1), + 'E': (4, 'S', 1), 'F': (5, 'S', 1), 'G': (6, 'S', 1), 'H': (7, 'S', 1), + 'I': (8, 'S', 1), 'J': (9, 'S', 1), + 'K': (0, 'S', 1), 'L': (0, 'S', 0), + 'P': (0, 'N', 1), 'Q': (1, 'N', 1), 'R': (2, 'N', 1), 'S': (3, 'N', 1), + 'T': (4, 'N', 1), 'U': (5, 'N', 1), 'V': (6, 'N', 1), 'W': (7, 'N', 1), + 'X': (8, 'N', 1), 'Y': (9, 'N', 1), + 'Z': (0, 'N', 1), +} + +# Mic-E message types encoded in destination +MIC_E_MESSAGE_TYPES = { + (1, 1, 1): ('off_duty', 'Off Duty'), + (1, 1, 0): ('en_route', 'En Route'), + (1, 0, 1): ('in_service', 'In Service'), + (1, 0, 0): ('returning', 'Returning'), + (0, 1, 1): ('committed', 'Committed'), + (0, 1, 0): ('special', 'Special'), + (0, 0, 1): ('priority', 'Priority'), + (0, 0, 0): ('emergency', 'Emergency'), +} + + +def parse_mic_e(dest: str, data: str) -> Optional[dict]: + """Parse Mic-E encoded position from destination and data fields. + + Mic-E is a highly compressed format that encodes: + - Latitude in the destination address (6 chars) + - Longitude, speed, course in the information field + - Status message type in destination address bits + + Data field format: starts with ` or ' then: + - byte 0: longitude degrees + 28 + - byte 1: longitude minutes + 28 + - byte 2: longitude hundredths + 28 + - byte 3: speed (tens) + course (hundreds) + 28 + - byte 4: speed (units) + course (tens) + 28 + - byte 5: course (units) + 28 + - byte 6: symbol code + - byte 7: symbol table + - remaining: optional altitude, telemetry, status text + """ + try: + if len(dest) < 6 or len(data) < 9: + return None + + # First char indicates Mic-E type: ` = current, ' = old + mic_e_type = 'current' if data[0] == '`' else 'old' + + # Parse latitude from destination (first 6 chars) + lat_digits = [] + lat_dir = 'N' + lon_offset = 0 + msg_bits = [] + + for i, char in enumerate(dest[:6]): + if char not in MIC_E_DEST_TABLE: + # Try uppercase + char = char.upper() + if char not in MIC_E_DEST_TABLE: + return None + + digit, ns, msg_bit = MIC_E_DEST_TABLE[char] + lat_digits.append(digit) + + # First 3 chars determine N/S and message type + if i < 3: + msg_bits.append(msg_bit) + # Char 4 determines latitude N/S + if i == 3: + lat_dir = ns + # Char 5 determines longitude offset (100 degrees) + if i == 4: + lon_offset = 100 if ns == 'N' else 0 + # Char 6 determines longitude W/E + if i == 5: + lon_dir = 'W' if ns == 'N' else 'E' + + # Calculate latitude + lat_deg = lat_digits[0] * 10 + lat_digits[1] + lat_min = lat_digits[2] * 10 + lat_digits[3] + (lat_digits[4] * 10 + lat_digits[5]) / 100.0 + lat = lat_deg + lat_min / 60.0 + if lat_dir == 'S': + lat = -lat + + # Parse longitude from data (bytes 1-3 after type char) + d = data[1:] # Skip type char + + # Longitude degrees (adjusted for offset) + lon_deg = ord(d[0]) - 28 + if lon_offset == 100: + lon_deg += 100 + if lon_deg >= 180 and lon_deg <= 189: + lon_deg -= 80 + elif lon_deg >= 190 and lon_deg <= 199: + lon_deg -= 190 + + # Longitude minutes + lon_min = ord(d[1]) - 28 + if lon_min >= 60: + lon_min -= 60 + + # Longitude hundredths of minutes + lon_hun = ord(d[2]) - 28 + + lon = lon_deg + (lon_min + lon_hun / 100.0) / 60.0 + if lon_dir == 'W': + lon = -lon + + # Parse speed and course (bytes 4-6) + sp = ord(d[3]) - 28 + dc = ord(d[4]) - 28 + se = ord(d[5]) - 28 + + speed = (sp * 10) + (dc // 10) + if speed >= 800: + speed -= 800 + + course = ((dc % 10) * 100) + se + if course >= 400: + course -= 400 + + # Get symbol (bytes 7-8) + symbol_code = d[6] + symbol_table = d[7] + + result = { + 'lat': round(lat, 6), + 'lon': round(lon, 6), + 'symbol': symbol_table + symbol_code, + 'speed': speed, # knots + 'course': course, + 'mic_e_type': mic_e_type, + } + + # Decode message type from first 3 destination chars + msg_tuple = tuple(msg_bits) + if msg_tuple in MIC_E_MESSAGE_TYPES: + result['mic_e_status'] = MIC_E_MESSAGE_TYPES[msg_tuple][0] + result['mic_e_status_text'] = MIC_E_MESSAGE_TYPES[msg_tuple][1] + + # Parse optional fields after symbol (byte 9 onwards) + if len(d) > 8: + extra = d[8:] + + # Altitude: `XXX} where XXX is base-91 encoded + alt_match = re.search(r'([\x21-\x7b]{3})\}', extra) + if alt_match: + alt_chars = alt_match.group(1) + alt = ((ord(alt_chars[0]) - 33) * 91 * 91 + + (ord(alt_chars[1]) - 33) * 91 + + (ord(alt_chars[2]) - 33) - 10000) + result['altitude'] = alt # meters + + # Status text (after altitude or at end) + status_text = re.sub(r'[\x21-\x7b]{3}\}', '', extra).strip() + if status_text: + result['status'] = status_text + + return result + + except Exception as e: + logger.debug(f"Failed to parse Mic-E: {e}") + return None + + +def parse_compressed_position(data: str) -> Optional[dict]: + r"""Parse compressed position format (Base-91 encoding). + + Compressed format: /YYYYXXXX$csT + - / or \\ = symbol table + - YYYY = 4-char base-91 latitude + - XXXX = 4-char base-91 longitude + - $ = symbol code + - cs = compressed course/speed or altitude + - T = compression type byte + """ + try: + # Compressed positions start with symbol table char followed by 4+4+1+2+1 chars + if len(data) < 13: + return None + + symbol_table = data[0] + + # Decode base-91 latitude (chars 1-4) + lat_chars = data[1:5] + lat_val = 0 + for c in lat_chars: + lat_val = lat_val * 91 + (ord(c) - 33) + lat = 90.0 - (lat_val / 380926.0) + + # Decode base-91 longitude (chars 5-8) + lon_chars = data[5:9] + lon_val = 0 + for c in lon_chars: + lon_val = lon_val * 91 + (ord(c) - 33) + lon = -180.0 + (lon_val / 190463.0) + + # Symbol code + symbol_code = data[9] + + result = { + 'lat': round(lat, 6), + 'lon': round(lon, 6), + 'symbol': symbol_table + symbol_code, + 'compressed': True, + } + + # Course/speed or altitude (chars 10-11) and type byte (char 12) + if len(data) >= 13: + c = ord(data[10]) - 33 + s = ord(data[11]) - 33 + t = ord(data[12]) - 33 + + # Type byte bits: + # bit 5 (0x20): GPS fix - current (1) or old (0) + # bit 4 (0x10): NMEA source - GGA (1) or other (0) + # bit 3 (0x08): Origin - compressed (1) or software (0) + # bits 0-2: compression type + + comp_type = t & 0x07 + + if comp_type == 0: + # c/s are course/speed + if c != 0 or s != 0: + result['course'] = c * 4 + result['speed'] = round(1.08 ** s - 1, 1) # knots + elif comp_type == 1: + # c/s are altitude + if c != 0 or s != 0: + alt = 1.002 ** (c * 91 + s) + result['altitude'] = round(alt) # feet + elif comp_type == 2: + # Radio range + if s != 0: + result['range'] = round(2 * 1.08 ** s, 1) # miles + + # GPS fix quality from type byte + if t & 0x20: + result['gps_fix'] = 'current' + else: + result['gps_fix'] = 'old' + + return result + + except Exception as e: + logger.debug(f"Failed to parse compressed position: {e}") + return None + + +def parse_telemetry(data: str) -> Optional[dict]: + """Parse APRS telemetry data. + + Format: T#sss,aaa,aaa,aaa,aaa,aaa,bbbbbbbb + - T#sss = sequence number (001-999 or MIC) + - aaa = analog values (0-255, up to 5 channels) + - bbbbbbbb = 8 digital bits + """ + try: + if not data.startswith('T'): + return None + + result = {'packet_type': 'telemetry'} + + # Match telemetry format + match = re.match( + r'^T#(\d{3}|MIC),(\d{1,3}),(\d{1,3}),(\d{1,3}),(\d{1,3}),(\d{1,3}),([01]{8})', + data + ) + + if match: + result['sequence'] = match.group(1) + result['analog'] = [ + int(match.group(2)), + int(match.group(3)), + int(match.group(4)), + int(match.group(5)), + int(match.group(6)), + ] + result['digital'] = match.group(7) + result['digital_bits'] = [int(b) for b in match.group(7)] + return result + + # Try simpler format without digital bits + match = re.match( + r'^T#(\d{3}|MIC),(\d{1,3}),(\d{1,3}),(\d{1,3}),(\d{1,3}),(\d{1,3})', + data + ) + + if match: + result['sequence'] = match.group(1) + result['analog'] = [ + int(match.group(2)), + int(match.group(3)), + int(match.group(4)), + int(match.group(5)), + int(match.group(6)), + ] + return result + + # Even simpler - just sequence and some analog + match = re.match(r'^T#(\d{3}|MIC),(.+)$', data) + if match: + result['sequence'] = match.group(1) + values = match.group(2).split(',') + result['analog'] = [int(v) for v in values if v.isdigit()] + return result + + return None + + except Exception as e: + logger.debug(f"Failed to parse telemetry: {e}") + return None + + +def parse_telemetry_definition(callsign: str, msg_type: str, content: str) -> Optional[dict]: + """Parse telemetry definition messages (PARM, UNIT, EQNS, BITS). + + These messages define the meaning of telemetry values for a station. + Format: :CALLSIGN :PARM.p1,p2,p3,p4,p5,b1,b2,b3,b4,b5,b6,b7,b8 + """ + try: + result = { + 'telemetry_definition': True, + 'definition_type': msg_type, + 'for_station': callsign.strip(), + } + + values = [v.strip() for v in content.split(',')] + + if msg_type == 'PARM': + # Parameter names + result['param_names'] = values[:5] # Analog names + result['bit_names'] = values[5:13] # Digital bit names + + elif msg_type == 'UNIT': + # Units for parameters + result['param_units'] = values[:5] + result['bit_labels'] = values[5:13] + + elif msg_type == 'EQNS': + # Equations: a*x^2 + b*x + c for each analog channel + # Format: a1,b1,c1,a2,b2,c2,a3,b3,c3,a4,b4,c4,a5,b5,c5 + result['equations'] = [] + for i in range(0, min(15, len(values)), 3): + if i + 2 < len(values): + result['equations'].append({ + 'a': float(values[i]) if values[i] else 0, + 'b': float(values[i + 1]) if values[i + 1] else 1, + 'c': float(values[i + 2]) if values[i + 2] else 0, + }) + + elif msg_type == 'BITS': + # Bit sense and project name + # Format: bbbbbbbb,Project Name + if values: + result['bit_sense'] = values[0][:8] + if len(values) > 1: + result['project_name'] = ','.join(values[1:]) + + return result + + except Exception as e: + logger.debug(f"Failed to parse telemetry definition: {e}") + return None + + +def parse_phg(data: str) -> Optional[dict]: + """Parse PHG (Power/Height/Gain/Directivity) data. + + Format: PHGphgd + - p = power code (0-9) + - h = height code (0-9) + - g = gain code (0-9) + - d = directivity code (0-9) + """ + try: + match = re.search(r'PHG(\d)(\d)(\d)(\d)', data) + if not match: + return None + + p, h, g, d = [int(x) for x in match.groups()] + + # Power in watts: p^2 + power_watts = p * p + + # Height in feet: 10 * 2^h + height_feet = 10 * (2 ** h) + + # Gain in dB + gain_db = g + + # Directivity (0=omni, 1-8 = 45° sectors starting from N) + directions = ['omni', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW', 'N'] + directivity = directions[d] if d < len(directions) else 'omni' + + return { + 'phg': True, + 'power_watts': power_watts, + 'height_feet': height_feet, + 'gain_db': gain_db, + 'directivity': directivity, + 'directivity_code': d, + } + + except Exception as e: + logger.debug(f"Failed to parse PHG: {e}") + return None + + +def parse_rng(data: str) -> Optional[dict]: + """Parse RNG (radio range) data. + + Format: RNGrrrr where rrrr is range in miles. + """ + try: + match = re.search(r'RNG(\d{4})', data) + if match: + return {'range_miles': int(match.group(1))} + return None + except Exception: + return None + + +def parse_df_report(data: str) -> Optional[dict]: + """Parse Direction Finding (DF) report. + + Format: CSE/SPD/BRG/NRQ or similar patterns. + - BRG = bearing to signal + - NRQ = Number/Range/Quality + """ + try: + result = {} + + # DF bearing format: /BRG (3 digits) + brg_match = re.search(r'/(\d{3})/', data) + if brg_match: + result['df_bearing'] = int(brg_match.group(1)) + + # NRQ format + nrq_match = re.search(r'/(\d)(\d)(\d)$', data) + if nrq_match: + n, r, q = [int(x) for x in nrq_match.groups()] + result['df_hits'] = n # Number of signal hits + result['df_range'] = r # Range: 0=useless, 8=exact + result['df_quality'] = q # Quality: 0=useless, 8=excellent + + return result if result else None + + except Exception: + return None + + +def parse_timestamp(data: str) -> Optional[dict]: + """Parse APRS timestamp from position data. + + Formats: + - DDHHMMz = day/hour/minute zulu + - HHMMSSh = hour/minute/second local + - DDHHMMl = day/hour/minute local (with /or not followed by position) + """ + try: + result = {} + + # Zulu time: DDHHMMz + match = re.match(r'^(\d{2})(\d{2})(\d{2})z', data) + if match: + result['time_day'] = int(match.group(1)) + result['time_hour'] = int(match.group(2)) + result['time_minute'] = int(match.group(3)) + result['time_format'] = 'zulu' + return result + + # Local time: HHMMSSh + match = re.match(r'^(\d{2})(\d{2})(\d{2})h', data) + if match: + result['time_hour'] = int(match.group(1)) + result['time_minute'] = int(match.group(2)) + result['time_second'] = int(match.group(3)) + result['time_format'] = 'local' + return result + + # Local with day: DDHHMMl (less common) + match = re.match(r'^(\d{2})(\d{2})(\d{2})/', data) + if match: + result['time_day'] = int(match.group(1)) + result['time_hour'] = int(match.group(2)) + result['time_minute'] = int(match.group(3)) + result['time_format'] = 'local_day' + return result + + return None + + except Exception: + return None + + +def parse_third_party(data: str) -> Optional[dict]: + """Parse third-party traffic (packets relayed from another network). + + Format: }CALL>PATH:DATA (the } indicates third-party) + """ + try: + if not data.startswith('}'): + return None + + # The rest is a standard APRS packet + inner_packet = data[1:] + + # Parse the inner packet + inner = parse_aprs_packet(inner_packet) + if inner: + return { + 'third_party': True, + 'inner_packet': inner, + } + + return {'third_party': True, 'inner_raw': inner_packet} + + except Exception: + return None + + +def parse_user_defined(data: str) -> Optional[dict]: + """Parse user-defined data format. + + Format: {UUXXXX... + - { = user-defined marker + - UU = 2-char user ID (experimental use) + - XXXX = user-defined data + """ + try: + if not data.startswith('{') or len(data) < 3: + return None + + return { + 'user_defined': True, + 'user_id': data[1:3], + 'user_data': data[3:], + } + + except Exception: + return None + + +def parse_capabilities(data: str) -> Optional[dict]: + """Parse station capabilities response. + + Format: Optional[dict]: + """Parse raw GPS NMEA sentences. + + APRS can include raw NMEA data starting with $. + """ + try: + if not data.startswith('$'): + return None + + result = { + 'nmea': True, + 'nmea_sentence': data, + } + + # Try to identify sentence type + if data.startswith('$GPGGA') or data.startswith('$GNGGA'): + result['nmea_type'] = 'GGA' + elif data.startswith('$GPRMC') or data.startswith('$GNRMC'): + result['nmea_type'] = 'RMC' + elif data.startswith('$GPGLL') or data.startswith('$GNGLL'): + result['nmea_type'] = 'GLL' + + return result + + except Exception: + return None + + def parse_audio_level(line: str) -> Optional[int]: """Parse direwolf audio level line and return normalized level (0-100).