From a397271553e5a64d5bb89a74286a77a2c85e9436 Mon Sep 17 00:00:00 2001 From: Smittix Date: Thu, 19 Feb 2026 10:43:16 +0000 Subject: [PATCH] fix: Slant correction via post-processing shear, not in-decoder sync fixup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previous attempts to correct slant by altering R-channel placement and buffer consumption caused cascading failures: a false positive in B pixel data would misplace R, then the wrong consumed value misaligned the next line's G, and the error compounded across all 256 lines. New approach (safe by design): - Sync search is measurement-only: never touches pos or consumed, so a noisy or wrong measurement cannot corrupt the current or future lines. - Per-line deviation (measured sync position minus expected) is recorded in self._sync_deviations throughout the decode. - get_image() fits a line through the deviations (linear regression) to estimate the per-line SDR clock drift rate, then applies a horizontal shear to the assembled PIL image: each row is shifted by -round(row × drift_rate × width / channel_samples) pixels. - Worst case (all measurements fail): no correction applied, image quality identical to the pre-change baseline. Co-Authored-By: Claude Sonnet 4.6 --- utils/sstv/image_decoder.py | 111 ++++++++++++++++++++++-------------- 1 file changed, 68 insertions(+), 43 deletions(-) diff --git a/utils/sstv/image_decoder.py b/utils/sstv/image_decoder.py index e16985f..f4ddd88 100644 --- a/utils/sstv/image_decoder.py +++ b/utils/sstv/image_decoder.py @@ -102,6 +102,12 @@ class SSTVImageDecoder: self._expected_line_start = 0 # Sample offset within buffer self._synced = False + # Per-line mid-sync deviation measurements (Scottie modes only). + # Each entry is the measured offset (in samples) of the sync pulse + # relative to its expected position: negative = early, positive = late. + # Used by get_image() to apply post-processing slant correction. + self._sync_deviations: list[float | None] = [] + @property def is_complete(self) -> bool: return self._complete @@ -194,9 +200,6 @@ class SSTVImageDecoder: search_margin = max(100, self._line_samples // 10) line_start = 0 - # Set True when the Scottie mid-line sync is found precisely so - # that we can use consumed=pos to stop timing errors accumulating. - _sync_corrected = False if self._mode.sync_position in (SyncPosition.FRONT, SyncPosition.FRONT_PD): # Sync is at the beginning of each line @@ -239,22 +242,16 @@ class SSTVImageDecoder: pos += self._porch_samples else: # Scottie: sync + porch between B and R. - # Use a vectorised fine scan (step=1) so the sync - # pulse is located with single-sample accuracy. - # The coarse _find_sync (step=49) introduced up to - # ±24 sample errors that made colour registration - # worse than no correction at all. - # - # Backward margin: 50 samples (≈ 4.5 ms), enough to - # cover >130 ppm SDR clock error over a full image. - # Forward margin: bounded by available buffer so the - # R channel never overflows. + # Measure the actual sync position for post-processing + # slant correction without touching pos or consumed — + # so a noisy/false measurement never corrupts the decode. r_samples = self._channel_samples[-1] bwd = min(50, pos) fwd = max(0, len(self._buffer) - pos - self._sync_samples - self._porch_samples - r_samples) fwd = min(fwd, self._sync_samples) + deviation: float | None = None if bwd + fwd > 0: region_start = pos - bwd sync_region = self._buffer[ @@ -270,24 +267,13 @@ class SSTVImageDecoder: self._sample_rate) sync_e = energies[:, 0] black_e = energies[:, 1] - valid = sync_e > black_e * 2 - if valid.any(): + valid_mask = sync_e > black_e * 2 + if valid_mask.any(): fine_best = int( - np.argmax(np.where(valid, sync_e, 0.0))) - candidate = (region_start + fine_best - + self._sync_samples - + self._porch_samples) - if candidate + r_samples <= len(self._buffer): - pos = candidate - _sync_corrected = True - else: - pos += self._sync_samples + self._porch_samples - else: - pos += self._sync_samples + self._porch_samples - else: - pos += self._sync_samples + self._porch_samples - else: - pos += self._sync_samples + self._porch_samples + np.argmax(np.where(valid_mask, sync_e, 0.0))) + deviation = float(fine_best - bwd) + self._sync_deviations.append(deviation) + pos += self._sync_samples + self._porch_samples elif self._separator_samples > 0: # Robot: separator + porch between channels pos += self._separator_samples @@ -296,15 +282,8 @@ class SSTVImageDecoder: # Martin: porch between channels pos += self._porch_samples - # Advance buffer past this line. - # For Scottie modes where the mid-line sync was precisely located, - # consume exactly to the end of R so the next line starts at its - # correct separator — this stops per-line timing errors accumulating - # into a slant without overcorrecting into the next line's data. - if _sync_corrected: - consumed = pos - else: - consumed = max(pos, self._line_samples) + # Advance buffer past this line + consumed = max(pos, self._line_samples) self._buffer = self._buffer[consumed:] self._current_line += 1 @@ -390,13 +369,59 @@ class SSTVImageDecoder: mode = self._mode if mode.color_model == ColorModel.RGB: - return self._assemble_rgb() + img = self._assemble_rgb() elif mode.color_model == ColorModel.YCRCB: - return self._assemble_ycrcb() + img = self._assemble_ycrcb() elif mode.color_model == ColorModel.YCRCB_DUAL: - return self._assemble_ycrcb_dual() + img = self._assemble_ycrcb_dual() + else: + return None - return None + return self._apply_slant_correction(img) + + def _apply_slant_correction(self, img: Image.Image) -> Image.Image: + """Apply per-row horizontal correction based on measured sync drift. + + Uses the sync deviation measurements collected during decoding to + estimate the per-line SDR clock drift rate via linear regression, + then shears the image to compensate. Noisy individual measurements + are averaged out; if fewer than 10 valid measurements exist the + image is returned unchanged. + """ + valid = [(i, d) for i, d in enumerate(self._sync_deviations) + if d is not None] + if len(valid) < 10: + return img + + lines = np.array([x[0] for x in valid], dtype=np.float64) + devs = np.array([x[1] for x in valid], dtype=np.float64) + + # Linear fit: deviation[i] ≈ slope × i + intercept. + # slope < 0 → fast SDR (sync arrives early, image leans left). + # slope > 0 → slow SDR (sync arrives late, image leans right). + slope, _ = np.polyfit(lines, devs, 1) + + # Convert samples/line drift to pixels/line for the channel width. + pixels_per_line = slope * self._mode.width / self._channel_samples[0] + + # Skip correction if drift is negligible (< 1 px over 20 lines). + if abs(pixels_per_line) < 0.05: + return img + + arr = np.array(img) + height, width = arr.shape[:2] + corrected = np.zeros_like(arr) + + for row in range(height): + shift = -int(round(row * pixels_per_line)) + if shift == 0: + corrected[row] = arr[row] + elif shift > 0: + corrected[row, shift:] = arr[row, :width - shift] + else: + corrected[row, :width + shift] = arr[row, -shift:] + + return Image.fromarray(corrected, 'RGB') def _assemble_rgb(self) -> Image.Image: """Assemble RGB image from sequential R, G, B channel data.