diff --git a/tests/test_dsc.py b/tests/test_dsc.py index 9b83663..276a156 100644 --- a/tests/test_dsc.py +++ b/tests/test_dsc.py @@ -515,18 +515,16 @@ class TestDSCDecoder: assert result == '002320001' def test_decode_mmsi_short_symbols(self, decoder): - """Test MMSI decoding handles short symbol list.""" + """Test MMSI decoding returns None for short symbol list.""" result = decoder._decode_mmsi([1, 2, 3]) - assert result == '000000000' + assert result is None def test_decode_mmsi_invalid_symbols(self, decoder): - """Test MMSI decoding handles invalid symbol values.""" - # Symbols > 99 should be treated as 0 + """Test MMSI decoding returns None for out-of-range symbols.""" + # Symbols > 99 should cause decode to fail symbols = [100, 32, 12, 34, 56] result = decoder._decode_mmsi(symbols) - # First symbol (100) becomes 00, padded result "0032123456", - # trim leading pad digit -> "032123456" - assert result == '032123456' + assert result is None def test_decode_position_northeast(self, decoder): """Test position decoding for NE quadrant.""" @@ -577,8 +575,9 @@ class TestDSCDecoder: def test_bits_to_symbol(self, decoder): """Test bit to symbol conversion.""" # Symbol value is first 7 bits (LSB first) - # Value 100 = 0b1100100 -> bits [0,0,1,0,0,1,1, x,x,x] - bits = [0, 0, 1, 0, 0, 1, 1, 0, 0, 0] + # Value 100 = 0b1100100 -> bits [0,0,1,0,0,1,1] -> 3 ones + # Check bits must make total even -> need 1 more one -> [1,0,0] + bits = [0, 0, 1, 0, 0, 1, 1, 1, 0, 0] result = decoder._bits_to_symbol(bits) assert result == 100 @@ -588,14 +587,14 @@ class TestDSCDecoder: assert result == -1 def test_detect_dot_pattern(self, decoder): - """Test dot pattern detection.""" - # Dot pattern is alternating 1010101... - decoder.bit_buffer = [1, 0] * 25 # 50 alternating bits + """Test dot pattern detection with 200+ alternating bits.""" + # Dot pattern requires at least 200 bits / 100 alternations + decoder.bit_buffer = [1, 0] * 110 # 220 alternating bits assert decoder._detect_dot_pattern() is True def test_detect_dot_pattern_insufficient(self, decoder): """Test dot pattern not detected with insufficient alternations.""" - decoder.bit_buffer = [1, 0] * 5 # Only 10 bits + decoder.bit_buffer = [1, 0] * 40 # Only 80 bits, below 200 threshold assert decoder._detect_dot_pattern() is False def test_detect_dot_pattern_not_alternating(self, decoder): @@ -603,6 +602,84 @@ class TestDSCDecoder: decoder.bit_buffer = [1, 1, 1, 1, 0, 0, 0, 0] * 5 assert decoder._detect_dot_pattern() is False + def test_bounded_phasing_strip(self, decoder): + """Test that >7 phasing symbols causes decode to return None.""" + # Build message bits: 10 phasing symbols (120) + format + data + # Each symbol is 10 bits. Phasing symbol 120 = 0b1111000 LSB first + # 120 in 7 bits LSB-first: 0,0,0,1,1,1,1 + 3 check bits + # 120 = 0b1111000 -> LSB first: 0,0,0,1,1,1,1 -> ones=4 (even) -> check [0,0,0] + phasing_bits = [0, 0, 0, 1, 1, 1, 1, 0, 0, 0] # symbol 120 + # 10 phasing symbols (>7 max) + decoder.message_bits = phasing_bits * 10 + # Add some non-phasing symbols after (enough for a message) + # Symbol 112 (INDIVIDUAL) = 0b1110000 LSB-first: 0,0,0,0,1,1,1 -> ones=3 (odd) -> need odd check + # For simplicity, just add enough bits for the decoder to attempt + for _ in range(20): + decoder.message_bits.extend([0, 0, 0, 0, 1, 1, 1, 1, 0, 0]) + result = decoder._try_decode_message() + assert result is None + + def test_eos_minimum_length(self, decoder): + """Test that EOS found too early in the symbol stream is skipped.""" + # Build a message where EOS appears at position 5 (< MIN_SYMBOLS_FOR_FORMAT=12) + # This should not be accepted as a valid message end + # Symbol 127 (EOS) = 0b1111111 LSB-first: 1,1,1,1,1,1,1 -> ones=7 (odd) -> check needs 1 one + # Use a simple approach: create symbols directly via _try_decode_message + # Create 5 normal symbols + EOS at position 5 — should be skipped + # Followed by more symbols and a real EOS at position 15 + from utils.dsc.decoder import DSCDecoder + d = DSCDecoder() + + # Build symbols manually: we need _try_decode_message to find EOS too early + # Symbol 112 = format code. We'll build 10 bits per symbol. + # Since check bit validation is now active, we need valid check bits. + # Symbol value 10 = 0b0001010 LSB-first: 0,1,0,1,0,0,0, ones=2 (even) -> check [0,0,0] + sym_10 = [0, 1, 0, 1, 0, 0, 0, 0, 0, 0] + # Symbol 127 (EOS) = 0b1111111, ones=7 (odd) -> check needs odd total -> [1,0,0] + sym_eos = [1, 1, 1, 1, 1, 1, 1, 1, 0, 0] + + # 5 normal symbols + early EOS (should be skipped) + 8 more normal + real EOS + d.message_bits = sym_10 * 5 + sym_eos + sym_10 * 8 + sym_eos + result = d._try_decode_message() + # The early EOS at index 5 should be skipped; the one at index 14 + # is past MIN_SYMBOLS_FOR_FORMAT so it can be accepted. + # But the message content is garbage, so _decode_symbols will likely + # return None for other reasons. The key test: it doesn't return a + # message truncated at position 5. + # Just verify no crash and either None or a valid longer message + # (not truncated at the early EOS) + assert result is None or len(result.get('raw', '')) > 18 + + def test_bits_to_symbol_check_bit_validation(self, decoder): + """Test that _bits_to_symbol rejects symbols with invalid check bits.""" + # Symbol 100 = 0b1100100 LSB-first: 0,0,1,0,0,1,1 + # ones in data = 3, need total even -> check bits need 1 one + # Valid: [0,0,1,0,0,1,1, 1,0,0] -> total ones = 4 (even) -> valid + valid_bits = [0, 0, 1, 0, 0, 1, 1, 1, 0, 0] + assert decoder._bits_to_symbol(valid_bits) == 100 + + # Invalid: flip one check bit -> total ones = 5 (odd) -> invalid + invalid_bits = [0, 0, 1, 0, 0, 1, 1, 0, 0, 0] + assert decoder._bits_to_symbol(invalid_bits) == -1 + + def test_safety_is_critical(self): + """Test that SAFETY category is marked as critical.""" + import json + + from utils.dsc.parser import parse_dsc_message + + raw = json.dumps({ + 'type': 'dsc', + 'format': 123, + 'source_mmsi': '232123456', + 'category': 'SAFETY', + 'timestamp': '2025-01-15T12:00:00Z', + 'raw': '123232123456100122', + }) + msg = parse_dsc_message(raw) + assert msg is not None + assert msg['is_critical'] is True + class TestDSCConstants: """Tests for DSC constants.""" @@ -670,3 +747,27 @@ class TestDSCConstants: assert DSC_BAUD_RATE == 1200 assert DSC_MARK_FREQ == 2100 assert DSC_SPACE_FREQ == 1300 + + def test_telecommand_codes_full(self): + """Test TELECOMMAND_CODES_FULL covers 0-127 range.""" + from utils.dsc.constants import TELECOMMAND_CODES_FULL + + assert len(TELECOMMAND_CODES_FULL) == 128 + # Known codes map correctly + assert TELECOMMAND_CODES_FULL[100] == 'F3E_G3E_ALL' + assert TELECOMMAND_CODES_FULL[107] == 'DISTRESS_ACK' + # Unknown codes map to "UNKNOWN" + assert TELECOMMAND_CODES_FULL[0] == 'UNKNOWN' + assert TELECOMMAND_CODES_FULL[99] == 'UNKNOWN' + + def test_telecommand_formats(self): + """Test TELECOMMAND_FORMATS contains correct format codes.""" + from utils.dsc.constants import TELECOMMAND_FORMATS + + assert {112, 114, 116, 120, 123} == TELECOMMAND_FORMATS + + def test_min_symbols_for_format(self): + """Test MIN_SYMBOLS_FOR_FORMAT constant.""" + from utils.dsc.constants import MIN_SYMBOLS_FOR_FORMAT + + assert MIN_SYMBOLS_FOR_FORMAT == 12 diff --git a/utils/dsc/constants.py b/utils/dsc/constants.py index d8372f9..727f596 100644 --- a/utils/dsc/constants.py +++ b/utils/dsc/constants.py @@ -89,6 +89,15 @@ TELECOMMAND_CODES = { 201: 'POLL_RESPONSE', # Poll response } +# Full 0-127 telecommand lookup (maps unknown codes to "UNKNOWN") +TELECOMMAND_CODES_FULL = {i: TELECOMMAND_CODES.get(i, "UNKNOWN") for i in range(128)} + +# Format codes that carry telecommand fields +TELECOMMAND_FORMATS = {112, 114, 116, 120, 123} + +# Minimum symbols (after phasing strip) before an EOS can be accepted +MIN_SYMBOLS_FOR_FORMAT = 12 + # ============================================================================= # DSC Symbol Definitions diff --git a/utils/dsc/decoder.py b/utils/dsc/decoder.py index 3f6399e..bab7720 100644 --- a/utils/dsc/decoder.py +++ b/utils/dsc/decoder.py @@ -43,6 +43,8 @@ from .constants import ( FORMAT_CODES, DISTRESS_NATURE_CODES, VALID_EOS, + TELECOMMAND_FORMATS, + MIN_SYMBOLS_FOR_FORMAT, ) # Configure logging @@ -222,13 +224,14 @@ class DSCDecoder: Detect DSC dot pattern for synchronization. The dot pattern is at least 200 alternating bits (1010101...). - We look for at least 20 consecutive alternations. + We require at least 100 consecutive alternations to avoid + false sync triggers from noise. """ - if len(self.bit_buffer) < 40: + if len(self.bit_buffer) < 200: return False - # Check last 40 bits for alternating pattern - last_bits = self.bit_buffer[-40:] + # Check last 200 bits for alternating pattern + last_bits = self.bit_buffer[-200:] alternations = 0 for i in range(1, len(last_bits)): @@ -237,7 +240,7 @@ class DSCDecoder: else: alternations = 0 - if alternations >= 20: + if alternations >= 100: return True return False @@ -263,27 +266,37 @@ class DSCDecoder: if end <= len(self.message_bits): symbol_bits = self.message_bits[start:end] symbol_value = self._bits_to_symbol(symbol_bits) + if symbol_value == -1: + logger.debug("DSC symbol check bit failure, aborting decode") + return None symbols.append(symbol_value) # Strip phasing sequence (RX/DX symbols 120-126) from the # start of the message. Per ITU-R M.493, after the dot pattern # there are 7 phasing symbols before the format specifier. + # Bound to max 7 — if more are present, this is a bad sync. msg_start = 0 for i, sym in enumerate(symbols): if 120 <= sym <= 126: msg_start = i + 1 else: break + if msg_start > 7: + logger.debug("DSC bad sync: >7 phasing symbols stripped") + return None symbols = symbols[msg_start:] if len(symbols) < 5: return None # Look for EOS (End of Sequence) - symbols 117, 122, or 127 + # EOS must appear after at least MIN_SYMBOLS_FOR_FORMAT symbols eos_found = False eos_index = -1 for i, sym in enumerate(symbols): if sym in VALID_EOS: + if i < MIN_SYMBOLS_FOR_FORMAT: + continue # Too early — not a real EOS eos_found = True eos_index = i break @@ -300,7 +313,9 @@ class DSCDecoder: Convert 10 bits to symbol value. DSC uses 10-bit symbols: 7 information bits + 3 error bits. - We extract the 7-bit value. + The 3 check bits provide parity such that the total number of + '1' bits across all 10 bits should be even (even parity). + Returns -1 if the check bits are invalid. """ if len(bits) != 10: return -1 @@ -311,6 +326,11 @@ class DSCDecoder: if bits[i]: value |= (1 << i) + # Validate check bits: total number of 1s should be even + ones = sum(bits) + if ones % 2 != 0: + return -1 + return value def _decode_symbols(self, symbols: list[int]) -> dict | None: @@ -356,9 +376,13 @@ class DSCDecoder: # Decode MMSI from symbols 1-5 (destination/address) dest_mmsi = self._decode_mmsi(symbols[1:6]) + if dest_mmsi is None: + return None # Decode self-ID from symbols 6-10 (source) source_mmsi = self._decode_mmsi(symbols[6:11]) + if source_mmsi is None: + return None message = { 'type': 'dsc', @@ -387,8 +411,9 @@ class DSCDecoder: if position: message['position'] = position - # Telecommand fields (usually last two before EOS) - if len(remaining) >= 2: + # Telecommand fields (last two before EOS) — only for formats + # that carry telecommand fields per ITU-R M.493 + if format_code in TELECOMMAND_FORMATS and len(remaining) >= 2: message['telecommand1'] = remaining[-2] message['telecommand2'] = remaining[-1] @@ -402,20 +427,21 @@ class DSCDecoder: logger.warning(f"DSC decode error: {e}") return None - def _decode_mmsi(self, symbols: list[int]) -> str: + def _decode_mmsi(self, symbols: list[int]) -> str | None: """ Decode MMSI from 5 DSC symbols. Each symbol represents 2 BCD digits (00-99). 5 symbols = 10 digits, but MMSI is 9 digits (first symbol has leading 0). + Returns None if any symbol is out of valid BCD range. """ if len(symbols) < 5: - return '000000000' + return None digits = [] for sym in symbols: if sym < 0 or sym > 99: - sym = 0 + return None # Each symbol is 2 BCD digits digits.append(f'{sym:02d}') diff --git a/utils/dsc/parser.py b/utils/dsc/parser.py index 30f7dd3..0e1f87e 100644 --- a/utils/dsc/parser.py +++ b/utils/dsc/parser.py @@ -248,7 +248,10 @@ def parse_dsc_message(raw_line: str) -> dict[str, Any] | None: msg['priority'] = get_category_priority(msg['category']) # Mark if this is a critical alert - msg['is_critical'] = msg['category'] in ('DISTRESS', 'ALL_SHIPS_URGENCY_SAFETY') + msg['is_critical'] = msg['category'] in ( + 'DISTRESS', 'DISTRESS_ACK', 'DISTRESS_RELAY', + 'URGENCY', 'SAFETY', 'ALL_SHIPS_URGENCY_SAFETY', + ) return msg