Add comprehensive APRS packet parsing support

- Add Mic-E position decoding (destination field encoding)
- Add compressed position format parsing (Base-91)
- Add complete telemetry parsing with analog/digital values
- Add telemetry definition messages (PARM, UNIT, EQNS, BITS)
- Add message ACK/REJ parsing and sequence numbers
- Add weather data parsing in position packets
- Add PHG (Power/Height/Gain/Directivity) parsing
- Add Direction Finding (DF) report parsing
- Add timestamp extraction from position packets
- Add third-party traffic, NMEA, user-defined format parsing
- Add bulletin, NWS alert, query, and capabilities parsing
- Expand weather fields (luminosity, snow, radiation, etc.)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-01-16 13:50:32 +00:00
parent af39d40847
commit fc48ff7d9f

View File

@@ -97,7 +97,20 @@ MODEM 1200
def parse_aprs_packet(raw_packet: str) -> Optional[dict]:
"""Parse APRS packet into structured data."""
"""Parse APRS packet into structured data.
Supports all major APRS packet types:
- Position reports (standard, compressed, Mic-E)
- Weather reports (standalone and in position packets)
- Objects and Items
- Messages (including ACK/REJ and telemetry definitions)
- Telemetry data
- Status reports
- Queries and capabilities
- Third-party traffic
- Raw GPS/NMEA data
- User-defined formats
"""
try:
# Basic APRS packet format: CALLSIGN>PATH:DATA
# Example: N0CALL-9>APRS,TCPIP*:@092345z4903.50N/07201.75W_090/000g005t077
@@ -118,35 +131,165 @@ def parse_aprs_packet(raw_packet: str) -> Optional[dict]:
'timestamp': datetime.utcnow().isoformat() + 'Z',
}
# Extract destination from path (first element before any comma)
dest_parts = path.split(',')
dest = dest_parts[0] if dest_parts else ''
# Check for Mic-E format first (data starts with ` or ' and dest is 6+ chars)
if len(data) >= 9 and data[0] in '`\'' and len(dest) >= 6:
mic_e_data = parse_mic_e(dest, data)
if mic_e_data:
packet['packet_type'] = 'position'
packet['position_format'] = 'mic_e'
packet.update(mic_e_data)
return packet
# Determine packet type and parse accordingly
if data.startswith('!') or data.startswith('='):
# Position without timestamp
# Position without timestamp (! = no messaging, = = with messaging)
packet['packet_type'] = 'position'
pos = parse_position(data[1:])
if pos:
packet.update(pos)
packet['messaging_capable'] = data.startswith('=')
pos_data = data[1:]
# Check for compressed format (starts with /\[A-Za-z])
if len(pos_data) >= 13 and pos_data[0] in '/\\':
pos = parse_compressed_position(pos_data)
if pos:
packet['position_format'] = 'compressed'
packet.update(pos)
else:
pos = parse_position(pos_data)
if pos:
packet['position_format'] = 'uncompressed'
packet.update(pos)
# Check for weather data in position packet (after position)
if '_' in pos_data or 'g' in pos_data or 't' in pos_data[15:] if len(pos_data) > 15 else False:
weather = parse_weather(pos_data)
if weather:
packet['weather'] = weather
# Check for PHG data
phg = parse_phg(pos_data)
if phg:
packet.update(phg)
# Check for RNG data
rng = parse_rng(pos_data)
if rng:
packet.update(rng)
# Check for DF report data
df = parse_df_report(pos_data)
if df:
packet.update(df)
elif data.startswith('/') or data.startswith('@'):
# Position with timestamp
# Position with timestamp (/ = no messaging, @ = with messaging)
packet['packet_type'] = 'position'
# Skip timestamp (7 chars) and parse position
packet['messaging_capable'] = data.startswith('@')
# Parse timestamp (first 7 chars after type indicator)
if len(data) > 8:
pos = parse_position(data[8:])
if pos:
packet.update(pos)
ts_data = parse_timestamp(data[1:8])
if ts_data:
packet.update(ts_data)
pos_data = data[8:]
# Check for compressed format
if len(pos_data) >= 13 and pos_data[0] in '/\\':
pos = parse_compressed_position(pos_data)
if pos:
packet['position_format'] = 'compressed'
packet.update(pos)
else:
pos = parse_position(pos_data)
if pos:
packet['position_format'] = 'uncompressed'
packet.update(pos)
# Check for weather data in position packet
weather = parse_weather(pos_data)
if weather:
packet['weather'] = weather
# Check for PHG data
phg = parse_phg(pos_data)
if phg:
packet.update(phg)
# Check for RNG data
rng = parse_rng(pos_data)
if rng:
packet.update(rng)
elif data.startswith('>'):
# Status message
packet['packet_type'] = 'status'
packet['status'] = data[1:]
status_data = data[1:]
packet['status'] = status_data
# Check for Maidenhead grid locator in status (common pattern)
grid_match = re.match(r'^([A-R]{2}[0-9]{2}[A-X]{0,2})\s*', status_data, re.IGNORECASE)
if grid_match:
packet['grid'] = grid_match.group(1).upper()
elif data.startswith(':'):
# Message
# Message format - check for various subtypes
packet['packet_type'] = 'message'
# Standard message: :ADDRESSEE:MESSAGE
msg_match = re.match(r'^:([A-Z0-9 -]{9}):(.*)$', data, re.IGNORECASE)
if msg_match:
packet['addressee'] = msg_match.group(1).strip()
packet['message'] = msg_match.group(2)
addressee = msg_match.group(1).strip()
message = msg_match.group(2)
packet['addressee'] = addressee
# Check for telemetry definition messages
telem_def_match = re.match(r'^(PARM|UNIT|EQNS|BITS)\.(.*)$', message)
if telem_def_match:
packet['packet_type'] = 'telemetry_definition'
telem_def = parse_telemetry_definition(
addressee, telem_def_match.group(1), telem_def_match.group(2)
)
if telem_def:
packet.update(telem_def)
else:
packet['message'] = message
# Check for ACK/REJ
ack_match = re.match(r'^ack(\w+)$', message, re.IGNORECASE)
if ack_match:
packet['message_type'] = 'ack'
packet['ack_id'] = ack_match.group(1)
rej_match = re.match(r'^rej(\w+)$', message, re.IGNORECASE)
if rej_match:
packet['message_type'] = 'rej'
packet['rej_id'] = rej_match.group(1)
# Check for message ID (for acknowledgment)
msgid_match = re.search(r'\{(\w{1,5})$', message)
if msgid_match:
packet['message_id'] = msgid_match.group(1)
packet['message'] = message[:message.rfind('{')]
# Bulletin format: :BLNn :message
elif data[1:4] == 'BLN':
packet['packet_type'] = 'bulletin'
bln_match = re.match(r'^:BLN([0-9A-Z])[ ]*:(.*)$', data, re.IGNORECASE)
if bln_match:
packet['bulletin_id'] = bln_match.group(1)
packet['bulletin'] = bln_match.group(2)
# NWS weather alert: :NWS-xxxxx:message
elif data[1:5] == 'NWS-':
packet['packet_type'] = 'nws_alert'
nws_match = re.match(r'^:NWS-([A-Z]+)[ ]*:(.*)$', data, re.IGNORECASE)
if nws_match:
packet['nws_id'] = nws_match.group(1)
packet['alert'] = nws_match.group(2)
elif data.startswith('_'):
# Weather report (Positionless)
@@ -155,15 +298,19 @@ def parse_aprs_packet(raw_packet: str) -> Optional[dict]:
elif data.startswith(';'):
# Object format: ;OBJECTNAME*DDHHMMzPOSITION or ;OBJECTNAME_DDHHMMzPOSITION
# Name is exactly 9 chars, then * (live) or _ (killed), then 7-char timestamp, then position
packet['packet_type'] = 'object'
obj_data = parse_object(data)
if obj_data:
packet.update(obj_data)
# Check for weather data in object
remaining = data[18:] if len(data) > 18 else ''
weather = parse_weather(remaining)
if weather:
packet['weather'] = weather
elif data.startswith(')'):
# Item format: )ITEMNAME!POSITION or )ITEMNAME_POSITION
# Name is 3-9 chars, terminated by ! (live) or _ (killed), then position
packet['packet_type'] = 'item'
item_data = parse_item(data)
if item_data:
@@ -172,11 +319,60 @@ def parse_aprs_packet(raw_packet: str) -> Optional[dict]:
elif data.startswith('T'):
# Telemetry
packet['packet_type'] = 'telemetry'
telem = parse_telemetry(data)
if telem:
packet.update(telem)
elif data.startswith('}'):
# Third-party traffic
packet['packet_type'] = 'third_party'
third = parse_third_party(data)
if third:
packet.update(third)
elif data.startswith('$'):
# Raw GPS NMEA data
packet['packet_type'] = 'nmea'
nmea = parse_nmea(data)
if nmea:
packet.update(nmea)
elif data.startswith('{'):
# User-defined format
packet['packet_type'] = 'user_defined'
user = parse_user_defined(data)
if user:
packet.update(user)
elif data.startswith('<'):
# Station capabilities
packet['packet_type'] = 'capabilities'
caps = parse_capabilities(data)
if caps:
packet.update(caps)
elif data.startswith('?'):
# Query
packet['packet_type'] = 'query'
query = parse_capabilities(data)
if query:
packet.update(query)
else:
packet['packet_type'] = 'other'
packet['data'] = data
# Extract comment if present (after standard data)
# Many APRS packets have freeform comments at the end
if 'data' not in packet and packet['packet_type'] in ('position', 'object', 'item'):
# Look for common comment patterns
comment_match = re.search(r'/([^/]+)$', data)
if comment_match and not re.match(r'^A=[-\d]+', comment_match.group(1)):
potential_comment = comment_match.group(1)
# Exclude things that look like data fields
if len(potential_comment) > 3 and not re.match(r'^\d{3}/', potential_comment):
packet['comment'] = potential_comment
return packet
except Exception as e:
@@ -350,13 +546,22 @@ def parse_item(data: str) -> Optional[dict]:
def parse_weather(data: str) -> dict:
"""Parse APRS weather data."""
"""Parse APRS weather data.
Weather data can appear in positionless weather reports (starting with _)
or as an extension after position data. Supports all standard APRS weather fields.
"""
weather = {}
# Wind direction: cCCC
# Wind direction: cCCC (degrees) or _CCC at start of positionless
match = re.search(r'c(\d{3})', data)
if match:
weather['wind_direction'] = int(match.group(1))
elif data.startswith('_') and len(data) > 4:
# Positionless format starts with _MMDDhhmm then wind dir
wind_match = re.match(r'_\d{8}(\d{3})', data)
if wind_match:
weather['wind_direction'] = int(wind_match.group(1))
# Wind speed: sSSS (mph)
match = re.search(r's(\d{3})', data)
@@ -368,7 +573,7 @@ def parse_weather(data: str) -> dict:
if match:
weather['wind_gust'] = int(match.group(1))
# Temperature: tTTT (Fahrenheit)
# Temperature: tTTT (Fahrenheit, can be negative)
match = re.search(r't(-?\d{2,3})', data)
if match:
weather['temperature'] = int(match.group(1))
@@ -378,12 +583,17 @@ def parse_weather(data: str) -> dict:
if match:
weather['rain_1h'] = int(match.group(1)) / 100.0
# Rain last 24h: pPPP
# Rain last 24h: pPPP (hundredths of inch)
match = re.search(r'p(\d{3})', data)
if match:
weather['rain_24h'] = int(match.group(1)) / 100.0
# Humidity: hHH (%)
# Rain since midnight: PPPP (hundredths of inch)
match = re.search(r'P(\d{3})', data)
if match:
weather['rain_midnight'] = int(match.group(1)) / 100.0
# Humidity: hHH (%, 00 = 100%)
match = re.search(r'h(\d{2})', data)
if match:
h = int(match.group(1))
@@ -394,9 +604,657 @@ def parse_weather(data: str) -> dict:
if match:
weather['pressure'] = int(match.group(1)) / 10.0
# Luminosity: LLLL (watts per square meter)
# L = 0-999 W/m², l = 1000-1999 W/m² (subtract 1000)
match = re.search(r'L(\d{3})', data)
if match:
weather['luminosity'] = int(match.group(1))
else:
match = re.search(r'l(\d{3})', data)
if match:
weather['luminosity'] = int(match.group(1)) + 1000
# Snow (last 24h): #SSS (inches)
match = re.search(r'#(\d{3})', data)
if match:
weather['snow_24h'] = int(match.group(1))
# Raw rain counter: !RRR (for Peet Bros stations)
match = re.search(r'!(\d{3})', data)
if match:
weather['rain_raw'] = int(match.group(1))
# Radiation: X### (nanosieverts/hour) - some weather stations
match = re.search(r'X(\d{3})', data)
if match:
weather['radiation'] = int(match.group(1))
# Flooding/water level: F### (feet above/below flood stage)
match = re.search(r'F(-?\d{3})', data)
if match:
weather['flood_level'] = int(match.group(1))
# Voltage: V### (volts, for battery monitoring)
match = re.search(r'V(\d{3})', data)
if match:
weather['voltage'] = int(match.group(1)) / 10.0
# Software type often at end (e.g., "Davis" or "Arduino")
# Extract as weather station type
wx_type_match = re.search(r'([A-Za-z]{4,})$', data)
if wx_type_match:
weather['wx_station_type'] = wx_type_match.group(1)
return weather
# Mic-E encoding tables
MIC_E_DEST_TABLE = {
'0': (0, 'S', 0), '1': (1, 'S', 0), '2': (2, 'S', 0), '3': (3, 'S', 0),
'4': (4, 'S', 0), '5': (5, 'S', 0), '6': (6, 'S', 0), '7': (7, 'S', 0),
'8': (8, 'S', 0), '9': (9, 'S', 0),
'A': (0, 'S', 1), 'B': (1, 'S', 1), 'C': (2, 'S', 1), 'D': (3, 'S', 1),
'E': (4, 'S', 1), 'F': (5, 'S', 1), 'G': (6, 'S', 1), 'H': (7, 'S', 1),
'I': (8, 'S', 1), 'J': (9, 'S', 1),
'K': (0, 'S', 1), 'L': (0, 'S', 0),
'P': (0, 'N', 1), 'Q': (1, 'N', 1), 'R': (2, 'N', 1), 'S': (3, 'N', 1),
'T': (4, 'N', 1), 'U': (5, 'N', 1), 'V': (6, 'N', 1), 'W': (7, 'N', 1),
'X': (8, 'N', 1), 'Y': (9, 'N', 1),
'Z': (0, 'N', 1),
}
# Mic-E message types encoded in destination
MIC_E_MESSAGE_TYPES = {
(1, 1, 1): ('off_duty', 'Off Duty'),
(1, 1, 0): ('en_route', 'En Route'),
(1, 0, 1): ('in_service', 'In Service'),
(1, 0, 0): ('returning', 'Returning'),
(0, 1, 1): ('committed', 'Committed'),
(0, 1, 0): ('special', 'Special'),
(0, 0, 1): ('priority', 'Priority'),
(0, 0, 0): ('emergency', 'Emergency'),
}
def parse_mic_e(dest: str, data: str) -> Optional[dict]:
"""Parse Mic-E encoded position from destination and data fields.
Mic-E is a highly compressed format that encodes:
- Latitude in the destination address (6 chars)
- Longitude, speed, course in the information field
- Status message type in destination address bits
Data field format: starts with ` or ' then:
- byte 0: longitude degrees + 28
- byte 1: longitude minutes + 28
- byte 2: longitude hundredths + 28
- byte 3: speed (tens) + course (hundreds) + 28
- byte 4: speed (units) + course (tens) + 28
- byte 5: course (units) + 28
- byte 6: symbol code
- byte 7: symbol table
- remaining: optional altitude, telemetry, status text
"""
try:
if len(dest) < 6 or len(data) < 9:
return None
# First char indicates Mic-E type: ` = current, ' = old
mic_e_type = 'current' if data[0] == '`' else 'old'
# Parse latitude from destination (first 6 chars)
lat_digits = []
lat_dir = 'N'
lon_offset = 0
msg_bits = []
for i, char in enumerate(dest[:6]):
if char not in MIC_E_DEST_TABLE:
# Try uppercase
char = char.upper()
if char not in MIC_E_DEST_TABLE:
return None
digit, ns, msg_bit = MIC_E_DEST_TABLE[char]
lat_digits.append(digit)
# First 3 chars determine N/S and message type
if i < 3:
msg_bits.append(msg_bit)
# Char 4 determines latitude N/S
if i == 3:
lat_dir = ns
# Char 5 determines longitude offset (100 degrees)
if i == 4:
lon_offset = 100 if ns == 'N' else 0
# Char 6 determines longitude W/E
if i == 5:
lon_dir = 'W' if ns == 'N' else 'E'
# Calculate latitude
lat_deg = lat_digits[0] * 10 + lat_digits[1]
lat_min = lat_digits[2] * 10 + lat_digits[3] + (lat_digits[4] * 10 + lat_digits[5]) / 100.0
lat = lat_deg + lat_min / 60.0
if lat_dir == 'S':
lat = -lat
# Parse longitude from data (bytes 1-3 after type char)
d = data[1:] # Skip type char
# Longitude degrees (adjusted for offset)
lon_deg = ord(d[0]) - 28
if lon_offset == 100:
lon_deg += 100
if lon_deg >= 180 and lon_deg <= 189:
lon_deg -= 80
elif lon_deg >= 190 and lon_deg <= 199:
lon_deg -= 190
# Longitude minutes
lon_min = ord(d[1]) - 28
if lon_min >= 60:
lon_min -= 60
# Longitude hundredths of minutes
lon_hun = ord(d[2]) - 28
lon = lon_deg + (lon_min + lon_hun / 100.0) / 60.0
if lon_dir == 'W':
lon = -lon
# Parse speed and course (bytes 4-6)
sp = ord(d[3]) - 28
dc = ord(d[4]) - 28
se = ord(d[5]) - 28
speed = (sp * 10) + (dc // 10)
if speed >= 800:
speed -= 800
course = ((dc % 10) * 100) + se
if course >= 400:
course -= 400
# Get symbol (bytes 7-8)
symbol_code = d[6]
symbol_table = d[7]
result = {
'lat': round(lat, 6),
'lon': round(lon, 6),
'symbol': symbol_table + symbol_code,
'speed': speed, # knots
'course': course,
'mic_e_type': mic_e_type,
}
# Decode message type from first 3 destination chars
msg_tuple = tuple(msg_bits)
if msg_tuple in MIC_E_MESSAGE_TYPES:
result['mic_e_status'] = MIC_E_MESSAGE_TYPES[msg_tuple][0]
result['mic_e_status_text'] = MIC_E_MESSAGE_TYPES[msg_tuple][1]
# Parse optional fields after symbol (byte 9 onwards)
if len(d) > 8:
extra = d[8:]
# Altitude: `XXX} where XXX is base-91 encoded
alt_match = re.search(r'([\x21-\x7b]{3})\}', extra)
if alt_match:
alt_chars = alt_match.group(1)
alt = ((ord(alt_chars[0]) - 33) * 91 * 91 +
(ord(alt_chars[1]) - 33) * 91 +
(ord(alt_chars[2]) - 33) - 10000)
result['altitude'] = alt # meters
# Status text (after altitude or at end)
status_text = re.sub(r'[\x21-\x7b]{3}\}', '', extra).strip()
if status_text:
result['status'] = status_text
return result
except Exception as e:
logger.debug(f"Failed to parse Mic-E: {e}")
return None
def parse_compressed_position(data: str) -> Optional[dict]:
r"""Parse compressed position format (Base-91 encoding).
Compressed format: /YYYYXXXX$csT
- / or \\ = symbol table
- YYYY = 4-char base-91 latitude
- XXXX = 4-char base-91 longitude
- $ = symbol code
- cs = compressed course/speed or altitude
- T = compression type byte
"""
try:
# Compressed positions start with symbol table char followed by 4+4+1+2+1 chars
if len(data) < 13:
return None
symbol_table = data[0]
# Decode base-91 latitude (chars 1-4)
lat_chars = data[1:5]
lat_val = 0
for c in lat_chars:
lat_val = lat_val * 91 + (ord(c) - 33)
lat = 90.0 - (lat_val / 380926.0)
# Decode base-91 longitude (chars 5-8)
lon_chars = data[5:9]
lon_val = 0
for c in lon_chars:
lon_val = lon_val * 91 + (ord(c) - 33)
lon = -180.0 + (lon_val / 190463.0)
# Symbol code
symbol_code = data[9]
result = {
'lat': round(lat, 6),
'lon': round(lon, 6),
'symbol': symbol_table + symbol_code,
'compressed': True,
}
# Course/speed or altitude (chars 10-11) and type byte (char 12)
if len(data) >= 13:
c = ord(data[10]) - 33
s = ord(data[11]) - 33
t = ord(data[12]) - 33
# Type byte bits:
# bit 5 (0x20): GPS fix - current (1) or old (0)
# bit 4 (0x10): NMEA source - GGA (1) or other (0)
# bit 3 (0x08): Origin - compressed (1) or software (0)
# bits 0-2: compression type
comp_type = t & 0x07
if comp_type == 0:
# c/s are course/speed
if c != 0 or s != 0:
result['course'] = c * 4
result['speed'] = round(1.08 ** s - 1, 1) # knots
elif comp_type == 1:
# c/s are altitude
if c != 0 or s != 0:
alt = 1.002 ** (c * 91 + s)
result['altitude'] = round(alt) # feet
elif comp_type == 2:
# Radio range
if s != 0:
result['range'] = round(2 * 1.08 ** s, 1) # miles
# GPS fix quality from type byte
if t & 0x20:
result['gps_fix'] = 'current'
else:
result['gps_fix'] = 'old'
return result
except Exception as e:
logger.debug(f"Failed to parse compressed position: {e}")
return None
def parse_telemetry(data: str) -> Optional[dict]:
"""Parse APRS telemetry data.
Format: T#sss,aaa,aaa,aaa,aaa,aaa,bbbbbbbb
- T#sss = sequence number (001-999 or MIC)
- aaa = analog values (0-255, up to 5 channels)
- bbbbbbbb = 8 digital bits
"""
try:
if not data.startswith('T'):
return None
result = {'packet_type': 'telemetry'}
# Match telemetry format
match = re.match(
r'^T#(\d{3}|MIC),(\d{1,3}),(\d{1,3}),(\d{1,3}),(\d{1,3}),(\d{1,3}),([01]{8})',
data
)
if match:
result['sequence'] = match.group(1)
result['analog'] = [
int(match.group(2)),
int(match.group(3)),
int(match.group(4)),
int(match.group(5)),
int(match.group(6)),
]
result['digital'] = match.group(7)
result['digital_bits'] = [int(b) for b in match.group(7)]
return result
# Try simpler format without digital bits
match = re.match(
r'^T#(\d{3}|MIC),(\d{1,3}),(\d{1,3}),(\d{1,3}),(\d{1,3}),(\d{1,3})',
data
)
if match:
result['sequence'] = match.group(1)
result['analog'] = [
int(match.group(2)),
int(match.group(3)),
int(match.group(4)),
int(match.group(5)),
int(match.group(6)),
]
return result
# Even simpler - just sequence and some analog
match = re.match(r'^T#(\d{3}|MIC),(.+)$', data)
if match:
result['sequence'] = match.group(1)
values = match.group(2).split(',')
result['analog'] = [int(v) for v in values if v.isdigit()]
return result
return None
except Exception as e:
logger.debug(f"Failed to parse telemetry: {e}")
return None
def parse_telemetry_definition(callsign: str, msg_type: str, content: str) -> Optional[dict]:
"""Parse telemetry definition messages (PARM, UNIT, EQNS, BITS).
These messages define the meaning of telemetry values for a station.
Format: :CALLSIGN :PARM.p1,p2,p3,p4,p5,b1,b2,b3,b4,b5,b6,b7,b8
"""
try:
result = {
'telemetry_definition': True,
'definition_type': msg_type,
'for_station': callsign.strip(),
}
values = [v.strip() for v in content.split(',')]
if msg_type == 'PARM':
# Parameter names
result['param_names'] = values[:5] # Analog names
result['bit_names'] = values[5:13] # Digital bit names
elif msg_type == 'UNIT':
# Units for parameters
result['param_units'] = values[:5]
result['bit_labels'] = values[5:13]
elif msg_type == 'EQNS':
# Equations: a*x^2 + b*x + c for each analog channel
# Format: a1,b1,c1,a2,b2,c2,a3,b3,c3,a4,b4,c4,a5,b5,c5
result['equations'] = []
for i in range(0, min(15, len(values)), 3):
if i + 2 < len(values):
result['equations'].append({
'a': float(values[i]) if values[i] else 0,
'b': float(values[i + 1]) if values[i + 1] else 1,
'c': float(values[i + 2]) if values[i + 2] else 0,
})
elif msg_type == 'BITS':
# Bit sense and project name
# Format: bbbbbbbb,Project Name
if values:
result['bit_sense'] = values[0][:8]
if len(values) > 1:
result['project_name'] = ','.join(values[1:])
return result
except Exception as e:
logger.debug(f"Failed to parse telemetry definition: {e}")
return None
def parse_phg(data: str) -> Optional[dict]:
"""Parse PHG (Power/Height/Gain/Directivity) data.
Format: PHGphgd
- p = power code (0-9)
- h = height code (0-9)
- g = gain code (0-9)
- d = directivity code (0-9)
"""
try:
match = re.search(r'PHG(\d)(\d)(\d)(\d)', data)
if not match:
return None
p, h, g, d = [int(x) for x in match.groups()]
# Power in watts: p^2
power_watts = p * p
# Height in feet: 10 * 2^h
height_feet = 10 * (2 ** h)
# Gain in dB
gain_db = g
# Directivity (0=omni, 1-8 = 45° sectors starting from N)
directions = ['omni', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW', 'N']
directivity = directions[d] if d < len(directions) else 'omni'
return {
'phg': True,
'power_watts': power_watts,
'height_feet': height_feet,
'gain_db': gain_db,
'directivity': directivity,
'directivity_code': d,
}
except Exception as e:
logger.debug(f"Failed to parse PHG: {e}")
return None
def parse_rng(data: str) -> Optional[dict]:
"""Parse RNG (radio range) data.
Format: RNGrrrr where rrrr is range in miles.
"""
try:
match = re.search(r'RNG(\d{4})', data)
if match:
return {'range_miles': int(match.group(1))}
return None
except Exception:
return None
def parse_df_report(data: str) -> Optional[dict]:
"""Parse Direction Finding (DF) report.
Format: CSE/SPD/BRG/NRQ or similar patterns.
- BRG = bearing to signal
- NRQ = Number/Range/Quality
"""
try:
result = {}
# DF bearing format: /BRG (3 digits)
brg_match = re.search(r'/(\d{3})/', data)
if brg_match:
result['df_bearing'] = int(brg_match.group(1))
# NRQ format
nrq_match = re.search(r'/(\d)(\d)(\d)$', data)
if nrq_match:
n, r, q = [int(x) for x in nrq_match.groups()]
result['df_hits'] = n # Number of signal hits
result['df_range'] = r # Range: 0=useless, 8=exact
result['df_quality'] = q # Quality: 0=useless, 8=excellent
return result if result else None
except Exception:
return None
def parse_timestamp(data: str) -> Optional[dict]:
"""Parse APRS timestamp from position data.
Formats:
- DDHHMMz = day/hour/minute zulu
- HHMMSSh = hour/minute/second local
- DDHHMMl = day/hour/minute local (with /or not followed by position)
"""
try:
result = {}
# Zulu time: DDHHMMz
match = re.match(r'^(\d{2})(\d{2})(\d{2})z', data)
if match:
result['time_day'] = int(match.group(1))
result['time_hour'] = int(match.group(2))
result['time_minute'] = int(match.group(3))
result['time_format'] = 'zulu'
return result
# Local time: HHMMSSh
match = re.match(r'^(\d{2})(\d{2})(\d{2})h', data)
if match:
result['time_hour'] = int(match.group(1))
result['time_minute'] = int(match.group(2))
result['time_second'] = int(match.group(3))
result['time_format'] = 'local'
return result
# Local with day: DDHHMMl (less common)
match = re.match(r'^(\d{2})(\d{2})(\d{2})/', data)
if match:
result['time_day'] = int(match.group(1))
result['time_hour'] = int(match.group(2))
result['time_minute'] = int(match.group(3))
result['time_format'] = 'local_day'
return result
return None
except Exception:
return None
def parse_third_party(data: str) -> Optional[dict]:
"""Parse third-party traffic (packets relayed from another network).
Format: }CALL>PATH:DATA (the } indicates third-party)
"""
try:
if not data.startswith('}'):
return None
# The rest is a standard APRS packet
inner_packet = data[1:]
# Parse the inner packet
inner = parse_aprs_packet(inner_packet)
if inner:
return {
'third_party': True,
'inner_packet': inner,
}
return {'third_party': True, 'inner_raw': inner_packet}
except Exception:
return None
def parse_user_defined(data: str) -> Optional[dict]:
"""Parse user-defined data format.
Format: {UUXXXX...
- { = user-defined marker
- UU = 2-char user ID (experimental use)
- XXXX = user-defined data
"""
try:
if not data.startswith('{') or len(data) < 3:
return None
return {
'user_defined': True,
'user_id': data[1:3],
'user_data': data[3:],
}
except Exception:
return None
def parse_capabilities(data: str) -> Optional[dict]:
"""Parse station capabilities response.
Format: <capability1,capability2,...
or query format: ?APRS? or ?WX? etc.
"""
try:
if data.startswith('<'):
# Capabilities response
caps = data[1:].split(',')
return {
'capabilities': [c.strip() for c in caps if c.strip()],
}
elif data.startswith('?'):
# Query
query_match = re.match(r'\?([A-Z]+)\?', data)
if query_match:
return {
'query': True,
'query_type': query_match.group(1),
}
return None
except Exception:
return None
def parse_nmea(data: str) -> Optional[dict]:
"""Parse raw GPS NMEA sentences.
APRS can include raw NMEA data starting with $.
"""
try:
if not data.startswith('$'):
return None
result = {
'nmea': True,
'nmea_sentence': data,
}
# Try to identify sentence type
if data.startswith('$GPGGA') or data.startswith('$GNGGA'):
result['nmea_type'] = 'GGA'
elif data.startswith('$GPRMC') or data.startswith('$GNRMC'):
result['nmea_type'] = 'RMC'
elif data.startswith('$GPGLL') or data.startswith('$GNGLL'):
result['nmea_type'] = 'GLL'
return result
except Exception:
return None
def parse_audio_level(line: str) -> Optional[int]:
"""Parse direwolf audio level line and return normalized level (0-100).