Fix DSC decoder for ITU-R M.493 compliance

Correct modulation parameters (1200 bps, 2100/1300 Hz tones), replace
invented format codes with the six ITU-defined specifiers {102, 112,
114, 116, 120, 123}, accept all valid EOS symbols (117, 122, 127),
add parser validation (format, MMSI, raw field, telecommand range),
and fix truthiness bugs that dropped zero-valued fields.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-25 22:02:08 +00:00
parent 935b7a4d9d
commit 8a46293e5c
4 changed files with 339 additions and 88 deletions
+17 -21
View File
@@ -14,30 +14,26 @@ from __future__ import annotations
# =============================================================================
FORMAT_CODES = {
100: 'DISTRESS', # All ships distress alert
102: 'ALL_SHIPS', # All ships call
104: 'GROUP', # Group call
106: 'DISTRESS_ACK', # Distress acknowledgement
108: 'DISTRESS_RELAY', # Distress relay
110: 'GEOGRAPHIC', # Geographic area call
112: 'INDIVIDUAL', # Individual call
114: 'INDIVIDUAL_ACK', # Individual acknowledgement
116: 'ROUTINE', # Routine call
118: 'SAFETY', # Safety call
120: 'URGENCY', # Urgency call
102: 'ALL_SHIPS', # All ships call
112: 'INDIVIDUAL', # Individual call
114: 'INDIVIDUAL_ACK', # Individual acknowledgement
116: 'GROUP', # Group call (including geographic area)
120: 'DISTRESS', # Distress alert
123: 'ALL_SHIPS_URGENCY_SAFETY', # All ships urgency/safety
}
# Valid ITU-R M.493 format specifiers
VALID_FORMAT_SPECIFIERS = {102, 112, 114, 116, 120, 123}
# Valid EOS (End of Sequence) symbols per ITU-R M.493
VALID_EOS = {117, 122, 127}
# Category priority (lower = higher priority)
CATEGORY_PRIORITY = {
'DISTRESS': 0,
'DISTRESS_ACK': 1,
'DISTRESS_RELAY': 2,
'URGENCY': 3,
'SAFETY': 4,
'ROUTINE': 5,
'ALL_SHIPS_URGENCY_SAFETY': 2,
'ALL_SHIPS': 5,
'GROUP': 5,
'GEOGRAPHIC': 5,
'INDIVIDUAL': 5,
'INDIVIDUAL_ACK': 5,
}
@@ -453,11 +449,11 @@ VHF_CHANNELS = {
# DSC Modulation Parameters
# =============================================================================
DSC_BAUD_RATE = 100 # 100 baud per ITU-R M.493
DSC_BAUD_RATE = 1200 # 1200 bps per ITU-R M.493
# FSK tone frequencies (Hz)
DSC_MARK_FREQ = 1800 # B (mark) - binary 1
DSC_SPACE_FREQ = 1200 # Y (space) - binary 0
# FSK tone frequencies (Hz) on 1700 Hz subcarrier
DSC_MARK_FREQ = 2100 # B (mark) - binary 1
DSC_SPACE_FREQ = 1300 # Y (space) - binary 0
# Audio sample rate for decoding
DSC_AUDIO_SAMPLE_RATE = 48000
+23 -21
View File
@@ -5,9 +5,9 @@ DSC (Digital Selective Calling) decoder.
Decodes VHF DSC signals per ITU-R M.493. Reads 48kHz 16-bit signed
audio from stdin (from rtl_fm) and outputs JSON messages to stdout.
DSC uses 100 baud FSK with:
- Mark (1): 1800 Hz
- Space (0): 1200 Hz
DSC uses 1200 bps FSK on a 1700 Hz subcarrier with:
- Mark (1): 2100 Hz
- Space (0): 1300 Hz
Frame structure:
1. Dot pattern: 200 bits alternating 1/0 for synchronization
@@ -42,6 +42,7 @@ from .constants import (
DSC_AUDIO_SAMPLE_RATE,
FORMAT_CODES,
DISTRESS_NATURE_CODES,
VALID_EOS,
)
# Configure logging
@@ -57,7 +58,7 @@ class DSCDecoder:
"""
DSC FSK decoder.
Demodulates 100 baud FSK audio and decodes DSC protocol.
Demodulates 1200 bps FSK audio and decodes DSC protocol.
"""
def __init__(self, sample_rate: int = DSC_AUDIO_SAMPLE_RATE):
@@ -66,13 +67,13 @@ class DSCDecoder:
self.samples_per_bit = sample_rate // self.baud_rate
# FSK frequencies
self.mark_freq = DSC_MARK_FREQ # 1800 Hz = binary 1
self.space_freq = DSC_SPACE_FREQ # 1200 Hz = binary 0
self.mark_freq = DSC_MARK_FREQ # 2100 Hz = binary 1
self.space_freq = DSC_SPACE_FREQ # 1300 Hz = binary 0
# Bandpass filter for DSC band (1100-1900 Hz)
# Bandpass filter for DSC band (1100-2300 Hz)
nyq = sample_rate / 2
low = 1100 / nyq
high = 1900 / nyq
high = 2300 / nyq
self.bp_b, self.bp_a = scipy_signal.butter(4, [low, high], btype='band')
# Build FSK correlators
@@ -278,11 +279,11 @@ class DSCDecoder:
if len(symbols) < 5:
return None
# Look for EOS (End of Sequence) - symbol 127
# Look for EOS (End of Sequence) - symbols 117, 122, or 127
eos_found = False
eos_index = -1
for i, sym in enumerate(symbols):
if sym == 127: # EOS symbol
if sym in VALID_EOS:
eos_found = True
eos_index = i
break
@@ -337,20 +338,21 @@ class DSCDecoder:
format_code = symbols[0]
format_text = FORMAT_CODES.get(format_code, f'UNKNOWN-{format_code}')
# Determine category from format
category = 'ROUTINE'
if format_code == 100:
# Derive category from format specifier per ITU-R M.493
if format_code == 120:
category = 'DISTRESS'
elif format_code == 106:
category = 'DISTRESS_ACK'
elif format_code == 108:
category = 'DISTRESS_RELAY'
elif format_code == 118:
category = 'SAFETY'
elif format_code == 120:
category = 'URGENCY'
elif format_code == 123:
category = 'ALL_SHIPS_URGENCY_SAFETY'
elif format_code == 102:
category = 'ALL_SHIPS'
elif format_code == 116:
category = 'GROUP'
elif format_code == 112:
category = 'INDIVIDUAL'
elif format_code == 114:
category = 'INDIVIDUAL_ACK'
else:
category = FORMAT_CODES.get(format_code, 'UNKNOWN')
# Decode MMSI from symbols 1-5 (destination/address)
dest_mmsi = self._decode_mmsi(symbols[1:6])
+60 -9
View File
@@ -19,6 +19,8 @@ from .constants import (
TELECOMMAND_CODES,
CATEGORY_PRIORITY,
MID_COUNTRY_MAP,
VALID_FORMAT_SPECIFIERS,
VALID_EOS,
)
logger = logging.getLogger('intercept.dsc.parser')
@@ -139,13 +141,62 @@ def parse_dsc_message(raw_line: str) -> dict[str, Any] | None:
if 'source_mmsi' not in data:
return None
# ITU-R M.493 validation: format specifier must be valid
format_code = data.get('format')
if format_code not in VALID_FORMAT_SPECIFIERS:
logger.debug(f"Rejected DSC: invalid format specifier {format_code}")
return None
# Validate MMSIs
source_mmsi = str(data.get('source_mmsi', ''))
if not validate_mmsi(source_mmsi):
logger.debug(f"Rejected DSC: invalid source MMSI {source_mmsi}")
return None
dest_mmsi_val = data.get('dest_mmsi')
if dest_mmsi_val is not None:
dest_mmsi_str = str(dest_mmsi_val)
if not validate_mmsi(dest_mmsi_str):
logger.debug(f"Rejected DSC: invalid dest MMSI {dest_mmsi_str}")
return None
# Validate raw field structure if present
raw = data.get('raw')
if raw is not None:
raw_str = str(raw)
if not re.match(r'^\d+$', raw_str):
logger.debug("Rejected DSC: raw field contains non-digits")
return None
if len(raw_str) % 3 != 0:
logger.debug("Rejected DSC: raw field length not divisible by 3")
return None
# Last 3-digit token must be a valid EOS symbol
if len(raw_str) >= 3:
last_token = int(raw_str[-3:])
if last_token not in VALID_EOS:
logger.debug(f"Rejected DSC: raw EOS token {last_token} not valid")
return None
# Validate telecommand values if present (must be 100-127)
for tc_field in ('telecommand1', 'telecommand2'):
tc_val = data.get(tc_field)
if tc_val is not None:
try:
tc_int = int(tc_val)
except (ValueError, TypeError):
logger.debug(f"Rejected DSC: invalid {tc_field} value {tc_val}")
return None
if tc_int < 100 or tc_int > 127:
logger.debug(f"Rejected DSC: {tc_field} {tc_int} out of range 100-127")
return None
# Build parsed message
msg = {
'type': 'dsc_message',
'source_mmsi': str(data.get('source_mmsi', '')),
'dest_mmsi': str(data.get('dest_mmsi', '')) if data.get('dest_mmsi') else None,
'format_code': data.get('format'),
'format_text': get_format_text(data.get('format', 0)),
'source_mmsi': source_mmsi,
'dest_mmsi': str(data.get('dest_mmsi', '')) if data.get('dest_mmsi') is not None else None,
'format_code': format_code,
'format_text': get_format_text(format_code),
'category': data.get('category', 'UNKNOWN').upper(),
'timestamp': data.get('timestamp') or datetime.utcnow().isoformat(),
}
@@ -156,7 +207,7 @@ def parse_dsc_message(raw_line: str) -> dict[str, Any] | None:
msg['source_country'] = country
# Add distress nature if present
if 'nature' in data and data['nature']:
if data.get('nature') is not None:
msg['nature_code'] = data['nature']
msg['nature_of_distress'] = get_distress_nature_text(data['nature'])
@@ -173,16 +224,16 @@ def parse_dsc_message(raw_line: str) -> dict[str, Any] | None:
pass
# Add telecommand info
if 'telecommand1' in data and data['telecommand1']:
if data.get('telecommand1') is not None:
msg['telecommand1'] = data['telecommand1']
msg['telecommand1_text'] = get_telecommand_text(data['telecommand1'])
if 'telecommand2' in data and data['telecommand2']:
if data.get('telecommand2') is not None:
msg['telecommand2'] = data['telecommand2']
msg['telecommand2_text'] = get_telecommand_text(data['telecommand2'])
# Add channel if present
if 'channel' in data and data['channel']:
if data.get('channel') is not None:
msg['channel'] = data['channel']
# Add EOS (End of Sequence) info
@@ -197,7 +248,7 @@ def parse_dsc_message(raw_line: str) -> dict[str, Any] | None:
msg['priority'] = get_category_priority(msg['category'])
# Mark if this is a critical alert
msg['is_critical'] = msg['category'] in ('DISTRESS', 'DISTRESS_ACK', 'DISTRESS_RELAY', 'URGENCY')
msg['is_critical'] = msg['category'] in ('DISTRESS', 'ALL_SHIPS_URGENCY_SAFETY')
return msg