Fix APRS parser for direwolf bracket-prefixed frames

This commit is contained in:
Smittix
2026-02-24 22:52:34 +00:00
parent f6b0edaf5a
commit 4bf452d462
3 changed files with 87 additions and 30 deletions

View File

@@ -2860,11 +2860,22 @@ class ModeManager:
pass
logger.info("APRS reader stopped")
def _parse_aprs_packet(self, line: str) -> dict | None:
"""Parse APRS packet from direwolf or multimon-ng."""
match = re.match(r'([A-Z0-9-]+)>([^:]+):(.+)', line)
if not match:
return None
def _parse_aprs_packet(self, line: str) -> dict | None:
"""Parse APRS packet from direwolf or multimon-ng."""
if not line:
return None
# Normalize common decoder prefixes before parsing.
# multimon-ng: "AFSK1200: ..."
# direwolf: "[0.4] ...", "[0L] ..."
line = line.strip()
if line.startswith('AFSK1200:'):
line = line[9:].strip()
line = re.sub(r'^(?:\[[^\]]+\]\s*)+', '', line)
match = re.match(r'([A-Z0-9-]+)>([^:]+):(.+)', line)
if not match:
return None
callsign = match.group(1)
path = match.group(2)

View File

@@ -94,7 +94,7 @@ def find_rtl_power() -> Optional[str]:
DIREWOLF_CONFIG_PATH = os.path.join(tempfile.gettempdir(), 'intercept_direwolf.conf')
def create_direwolf_config() -> str:
def create_direwolf_config() -> str:
"""Create a minimal direwolf config for receive-only operation."""
config = """# Minimal direwolf config for INTERCEPT (receive-only)
# Audio input is handled via stdin
@@ -104,12 +104,32 @@ CHANNEL 0
MYCALL N0CALL
MODEM 1200
"""
with open(DIREWOLF_CONFIG_PATH, 'w') as f:
f.write(config)
return DIREWOLF_CONFIG_PATH
def parse_aprs_packet(raw_packet: str) -> Optional[dict]:
with open(DIREWOLF_CONFIG_PATH, 'w') as f:
f.write(config)
return DIREWOLF_CONFIG_PATH
def normalize_aprs_output_line(line: str) -> str:
"""Normalize a decoder output line to raw APRS packet format.
Handles common decoder prefixes:
- multimon-ng: ``AFSK1200: ...``
- direwolf tags: ``[0.4] ...``, ``[0L] ...``, etc.
"""
if not line:
return ''
normalized = line.strip()
if normalized.startswith('AFSK1200:'):
normalized = normalized[9:].strip()
# Strip one or more leading bracket tags emitted by decoders.
# Examples: [0.4], [0L], [NONE]
normalized = re.sub(r'^(?:\[[^\]]+\]\s*)+', '', normalized)
return normalized
def parse_aprs_packet(raw_packet: str) -> Optional[dict]:
"""Parse APRS packet into structured data.
Supports all major APRS packet types:
@@ -123,13 +143,17 @@ def parse_aprs_packet(raw_packet: str) -> Optional[dict]:
- Third-party traffic
- Raw GPS/NMEA data
- User-defined formats
"""
try:
# Basic APRS packet format: CALLSIGN>PATH:DATA
# Example: N0CALL-9>APRS,TCPIP*:@092345z4903.50N/07201.75W_090/000g005t077
match = re.match(r'^([A-Z0-9-]+)>([^:]+):(.+)$', raw_packet, re.IGNORECASE)
if not match:
"""
try:
raw_packet = normalize_aprs_output_line(raw_packet)
if not raw_packet:
return None
# Basic APRS packet format: CALLSIGN>PATH:DATA
# Example: N0CALL-9>APRS,TCPIP*:@092345z4903.50N/07201.75W_090/000g005t077
match = re.match(r'^([A-Z0-9-]+)>([^:]+):(.+)$', raw_packet, re.IGNORECASE)
if not match:
return None
callsign = match.group(1).upper()
@@ -1348,17 +1372,12 @@ def stream_aprs_output(rtl_process: subprocess.Popen, decoder_process: subproces
app_module.aprs_queue.put(meter_msg)
continue # Audio level lines are not packets
# multimon-ng prefixes decoded packets with "AFSK1200: "
if line.startswith('AFSK1200:'):
line = line[9:].strip()
# direwolf often prefixes packets with "[0.4] " or similar audio level indicator
# Strip any leading bracket prefix like "[0.4] " before parsing
line = re.sub(r'^\[\d+\.\d+\]\s*', '', line)
# Skip non-packet lines (APRS format: CALL>PATH:DATA)
if '>' not in line or ':' not in line:
continue
# Normalize decoder prefixes (multimon/direwolf) before parsing.
line = normalize_aprs_output_line(line)
# Skip non-packet lines (APRS format: CALL>PATH:DATA)
if '>' not in line or ':' not in line:
continue
packet = parse_aprs_packet(line)
if packet:

27
tests/test_aprs_parser.py Normal file
View File

@@ -0,0 +1,27 @@
"""APRS packet parser regression tests."""
from __future__ import annotations
import pytest
from routes.aprs import parse_aprs_packet
_BASE_PACKET = "N0CALL-9>APRS,TCPIP*:@092345z4903.50N/07201.75W_090/000g005t077"
@pytest.mark.parametrize(
"line",
[
_BASE_PACKET,
f"[0.4] {_BASE_PACKET}",
f"[0L] {_BASE_PACKET}",
f"AFSK1200: {_BASE_PACKET}",
f"AFSK1200: [0L] {_BASE_PACKET}",
],
)
def test_parse_aprs_packet_accepts_decoder_prefix_variants(line: str) -> None:
packet = parse_aprs_packet(line)
assert packet is not None
assert packet["callsign"] == "N0CALL-9"
assert packet["type"] == "aprs"