fix: harden DSC decoder against noise-induced false decodes

Stricter dot pattern detection (200 bits/100 alternations), bounded
phasing strip (max 7), symbol check bit parity validation, EOS minimum
position check, strict MMSI decode (reject out-of-range symbols),
format-aware telecommand extraction, and expanded critical category
detection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-26 22:52:41 +00:00
parent ee81eb44cd
commit 645b3b8632
4 changed files with 164 additions and 25 deletions
+114 -13
View File
@@ -515,18 +515,16 @@ class TestDSCDecoder:
assert result == '002320001'
def test_decode_mmsi_short_symbols(self, decoder):
"""Test MMSI decoding handles short symbol list."""
"""Test MMSI decoding returns None for short symbol list."""
result = decoder._decode_mmsi([1, 2, 3])
assert result == '000000000'
assert result is None
def test_decode_mmsi_invalid_symbols(self, decoder):
"""Test MMSI decoding handles invalid symbol values."""
# Symbols > 99 should be treated as 0
"""Test MMSI decoding returns None for out-of-range symbols."""
# Symbols > 99 should cause decode to fail
symbols = [100, 32, 12, 34, 56]
result = decoder._decode_mmsi(symbols)
# First symbol (100) becomes 00, padded result "0032123456",
# trim leading pad digit -> "032123456"
assert result == '032123456'
assert result is None
def test_decode_position_northeast(self, decoder):
"""Test position decoding for NE quadrant."""
@@ -577,8 +575,9 @@ class TestDSCDecoder:
def test_bits_to_symbol(self, decoder):
"""Test bit to symbol conversion."""
# Symbol value is first 7 bits (LSB first)
# Value 100 = 0b1100100 -> bits [0,0,1,0,0,1,1, x,x,x]
bits = [0, 0, 1, 0, 0, 1, 1, 0, 0, 0]
# Value 100 = 0b1100100 -> bits [0,0,1,0,0,1,1] -> 3 ones
# Check bits must make total even -> need 1 more one -> [1,0,0]
bits = [0, 0, 1, 0, 0, 1, 1, 1, 0, 0]
result = decoder._bits_to_symbol(bits)
assert result == 100
@@ -588,14 +587,14 @@ class TestDSCDecoder:
assert result == -1
def test_detect_dot_pattern(self, decoder):
"""Test dot pattern detection."""
# Dot pattern is alternating 1010101...
decoder.bit_buffer = [1, 0] * 25 # 50 alternating bits
"""Test dot pattern detection with 200+ alternating bits."""
# Dot pattern requires at least 200 bits / 100 alternations
decoder.bit_buffer = [1, 0] * 110 # 220 alternating bits
assert decoder._detect_dot_pattern() is True
def test_detect_dot_pattern_insufficient(self, decoder):
"""Test dot pattern not detected with insufficient alternations."""
decoder.bit_buffer = [1, 0] * 5 # Only 10 bits
decoder.bit_buffer = [1, 0] * 40 # Only 80 bits, below 200 threshold
assert decoder._detect_dot_pattern() is False
def test_detect_dot_pattern_not_alternating(self, decoder):
@@ -603,6 +602,84 @@ class TestDSCDecoder:
decoder.bit_buffer = [1, 1, 1, 1, 0, 0, 0, 0] * 5
assert decoder._detect_dot_pattern() is False
def test_bounded_phasing_strip(self, decoder):
"""Test that >7 phasing symbols causes decode to return None."""
# Build message bits: 10 phasing symbols (120) + format + data
# Each symbol is 10 bits. Phasing symbol 120 = 0b1111000 LSB first
# 120 in 7 bits LSB-first: 0,0,0,1,1,1,1 + 3 check bits
# 120 = 0b1111000 -> LSB first: 0,0,0,1,1,1,1 -> ones=4 (even) -> check [0,0,0]
phasing_bits = [0, 0, 0, 1, 1, 1, 1, 0, 0, 0] # symbol 120
# 10 phasing symbols (>7 max)
decoder.message_bits = phasing_bits * 10
# Add some non-phasing symbols after (enough for a message)
# Symbol 112 (INDIVIDUAL) = 0b1110000 LSB-first: 0,0,0,0,1,1,1 -> ones=3 (odd) -> need odd check
# For simplicity, just add enough bits for the decoder to attempt
for _ in range(20):
decoder.message_bits.extend([0, 0, 0, 0, 1, 1, 1, 1, 0, 0])
result = decoder._try_decode_message()
assert result is None
def test_eos_minimum_length(self, decoder):
"""Test that EOS found too early in the symbol stream is skipped."""
# Build a message where EOS appears at position 5 (< MIN_SYMBOLS_FOR_FORMAT=12)
# This should not be accepted as a valid message end
# Symbol 127 (EOS) = 0b1111111 LSB-first: 1,1,1,1,1,1,1 -> ones=7 (odd) -> check needs 1 one
# Use a simple approach: create symbols directly via _try_decode_message
# Create 5 normal symbols + EOS at position 5 — should be skipped
# Followed by more symbols and a real EOS at position 15
from utils.dsc.decoder import DSCDecoder
d = DSCDecoder()
# Build symbols manually: we need _try_decode_message to find EOS too early
# Symbol 112 = format code. We'll build 10 bits per symbol.
# Since check bit validation is now active, we need valid check bits.
# Symbol value 10 = 0b0001010 LSB-first: 0,1,0,1,0,0,0, ones=2 (even) -> check [0,0,0]
sym_10 = [0, 1, 0, 1, 0, 0, 0, 0, 0, 0]
# Symbol 127 (EOS) = 0b1111111, ones=7 (odd) -> check needs odd total -> [1,0,0]
sym_eos = [1, 1, 1, 1, 1, 1, 1, 1, 0, 0]
# 5 normal symbols + early EOS (should be skipped) + 8 more normal + real EOS
d.message_bits = sym_10 * 5 + sym_eos + sym_10 * 8 + sym_eos
result = d._try_decode_message()
# The early EOS at index 5 should be skipped; the one at index 14
# is past MIN_SYMBOLS_FOR_FORMAT so it can be accepted.
# But the message content is garbage, so _decode_symbols will likely
# return None for other reasons. The key test: it doesn't return a
# message truncated at position 5.
# Just verify no crash and either None or a valid longer message
# (not truncated at the early EOS)
assert result is None or len(result.get('raw', '')) > 18
def test_bits_to_symbol_check_bit_validation(self, decoder):
"""Test that _bits_to_symbol rejects symbols with invalid check bits."""
# Symbol 100 = 0b1100100 LSB-first: 0,0,1,0,0,1,1
# ones in data = 3, need total even -> check bits need 1 one
# Valid: [0,0,1,0,0,1,1, 1,0,0] -> total ones = 4 (even) -> valid
valid_bits = [0, 0, 1, 0, 0, 1, 1, 1, 0, 0]
assert decoder._bits_to_symbol(valid_bits) == 100
# Invalid: flip one check bit -> total ones = 5 (odd) -> invalid
invalid_bits = [0, 0, 1, 0, 0, 1, 1, 0, 0, 0]
assert decoder._bits_to_symbol(invalid_bits) == -1
def test_safety_is_critical(self):
"""Test that SAFETY category is marked as critical."""
import json
from utils.dsc.parser import parse_dsc_message
raw = json.dumps({
'type': 'dsc',
'format': 123,
'source_mmsi': '232123456',
'category': 'SAFETY',
'timestamp': '2025-01-15T12:00:00Z',
'raw': '123232123456100122',
})
msg = parse_dsc_message(raw)
assert msg is not None
assert msg['is_critical'] is True
class TestDSCConstants:
"""Tests for DSC constants."""
@@ -670,3 +747,27 @@ class TestDSCConstants:
assert DSC_BAUD_RATE == 1200
assert DSC_MARK_FREQ == 2100
assert DSC_SPACE_FREQ == 1300
def test_telecommand_codes_full(self):
"""Test TELECOMMAND_CODES_FULL covers 0-127 range."""
from utils.dsc.constants import TELECOMMAND_CODES_FULL
assert len(TELECOMMAND_CODES_FULL) == 128
# Known codes map correctly
assert TELECOMMAND_CODES_FULL[100] == 'F3E_G3E_ALL'
assert TELECOMMAND_CODES_FULL[107] == 'DISTRESS_ACK'
# Unknown codes map to "UNKNOWN"
assert TELECOMMAND_CODES_FULL[0] == 'UNKNOWN'
assert TELECOMMAND_CODES_FULL[99] == 'UNKNOWN'
def test_telecommand_formats(self):
"""Test TELECOMMAND_FORMATS contains correct format codes."""
from utils.dsc.constants import TELECOMMAND_FORMATS
assert {112, 114, 116, 120, 123} == TELECOMMAND_FORMATS
def test_min_symbols_for_format(self):
"""Test MIN_SYMBOLS_FOR_FORMAT constant."""
from utils.dsc.constants import MIN_SYMBOLS_FOR_FORMAT
assert MIN_SYMBOLS_FOR_FORMAT == 12
+9
View File
@@ -89,6 +89,15 @@ TELECOMMAND_CODES = {
201: 'POLL_RESPONSE', # Poll response
}
# Full 0-127 telecommand lookup (maps unknown codes to "UNKNOWN")
TELECOMMAND_CODES_FULL = {i: TELECOMMAND_CODES.get(i, "UNKNOWN") for i in range(128)}
# Format codes that carry telecommand fields
TELECOMMAND_FORMATS = {112, 114, 116, 120, 123}
# Minimum symbols (after phasing strip) before an EOS can be accepted
MIN_SYMBOLS_FOR_FORMAT = 12
# =============================================================================
# DSC Symbol Definitions
+37 -11
View File
@@ -43,6 +43,8 @@ from .constants import (
FORMAT_CODES,
DISTRESS_NATURE_CODES,
VALID_EOS,
TELECOMMAND_FORMATS,
MIN_SYMBOLS_FOR_FORMAT,
)
# Configure logging
@@ -222,13 +224,14 @@ class DSCDecoder:
Detect DSC dot pattern for synchronization.
The dot pattern is at least 200 alternating bits (1010101...).
We look for at least 20 consecutive alternations.
We require at least 100 consecutive alternations to avoid
false sync triggers from noise.
"""
if len(self.bit_buffer) < 40:
if len(self.bit_buffer) < 200:
return False
# Check last 40 bits for alternating pattern
last_bits = self.bit_buffer[-40:]
# Check last 200 bits for alternating pattern
last_bits = self.bit_buffer[-200:]
alternations = 0
for i in range(1, len(last_bits)):
@@ -237,7 +240,7 @@ class DSCDecoder:
else:
alternations = 0
if alternations >= 20:
if alternations >= 100:
return True
return False
@@ -263,27 +266,37 @@ class DSCDecoder:
if end <= len(self.message_bits):
symbol_bits = self.message_bits[start:end]
symbol_value = self._bits_to_symbol(symbol_bits)
if symbol_value == -1:
logger.debug("DSC symbol check bit failure, aborting decode")
return None
symbols.append(symbol_value)
# Strip phasing sequence (RX/DX symbols 120-126) from the
# start of the message. Per ITU-R M.493, after the dot pattern
# there are 7 phasing symbols before the format specifier.
# Bound to max 7 — if more are present, this is a bad sync.
msg_start = 0
for i, sym in enumerate(symbols):
if 120 <= sym <= 126:
msg_start = i + 1
else:
break
if msg_start > 7:
logger.debug("DSC bad sync: >7 phasing symbols stripped")
return None
symbols = symbols[msg_start:]
if len(symbols) < 5:
return None
# Look for EOS (End of Sequence) - symbols 117, 122, or 127
# EOS must appear after at least MIN_SYMBOLS_FOR_FORMAT symbols
eos_found = False
eos_index = -1
for i, sym in enumerate(symbols):
if sym in VALID_EOS:
if i < MIN_SYMBOLS_FOR_FORMAT:
continue # Too early — not a real EOS
eos_found = True
eos_index = i
break
@@ -300,7 +313,9 @@ class DSCDecoder:
Convert 10 bits to symbol value.
DSC uses 10-bit symbols: 7 information bits + 3 error bits.
We extract the 7-bit value.
The 3 check bits provide parity such that the total number of
'1' bits across all 10 bits should be even (even parity).
Returns -1 if the check bits are invalid.
"""
if len(bits) != 10:
return -1
@@ -311,6 +326,11 @@ class DSCDecoder:
if bits[i]:
value |= (1 << i)
# Validate check bits: total number of 1s should be even
ones = sum(bits)
if ones % 2 != 0:
return -1
return value
def _decode_symbols(self, symbols: list[int]) -> dict | None:
@@ -356,9 +376,13 @@ class DSCDecoder:
# Decode MMSI from symbols 1-5 (destination/address)
dest_mmsi = self._decode_mmsi(symbols[1:6])
if dest_mmsi is None:
return None
# Decode self-ID from symbols 6-10 (source)
source_mmsi = self._decode_mmsi(symbols[6:11])
if source_mmsi is None:
return None
message = {
'type': 'dsc',
@@ -387,8 +411,9 @@ class DSCDecoder:
if position:
message['position'] = position
# Telecommand fields (usually last two before EOS)
if len(remaining) >= 2:
# Telecommand fields (last two before EOS) — only for formats
# that carry telecommand fields per ITU-R M.493
if format_code in TELECOMMAND_FORMATS and len(remaining) >= 2:
message['telecommand1'] = remaining[-2]
message['telecommand2'] = remaining[-1]
@@ -402,20 +427,21 @@ class DSCDecoder:
logger.warning(f"DSC decode error: {e}")
return None
def _decode_mmsi(self, symbols: list[int]) -> str:
def _decode_mmsi(self, symbols: list[int]) -> str | None:
"""
Decode MMSI from 5 DSC symbols.
Each symbol represents 2 BCD digits (00-99).
5 symbols = 10 digits, but MMSI is 9 digits (first symbol has leading 0).
Returns None if any symbol is out of valid BCD range.
"""
if len(symbols) < 5:
return '000000000'
return None
digits = []
for sym in symbols:
if sym < 0 or sym > 99:
sym = 0
return None
# Each symbol is 2 BCD digits
digits.append(f'{sym:02d}')
+4 -1
View File
@@ -248,7 +248,10 @@ def parse_dsc_message(raw_line: str) -> dict[str, Any] | None:
msg['priority'] = get_category_priority(msg['category'])
# Mark if this is a critical alert
msg['is_critical'] = msg['category'] in ('DISTRESS', 'ALL_SHIPS_URGENCY_SAFETY')
msg['is_critical'] = msg['category'] in (
'DISTRESS', 'DISTRESS_ACK', 'DISTRESS_RELAY',
'URGENCY', 'SAFETY', 'ALL_SHIPS_URGENCY_SAFETY',
)
return msg