Remove waterfall from all modes except listening post

Reverts IQ pipeline and removes syncWaterfallToFrequency calls from
pager, sensor, rtlamr, DMR, SSTV, and SSTV general modes. Waterfall
is now exclusive to listening post mode.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-07 23:29:56 +00:00
parent f04ba7f143
commit beb38b6b98
13 changed files with 172 additions and 838 deletions

14
app.py
View File

@@ -232,13 +232,6 @@ cleanup_manager.register(ais_vessels)
cleanup_manager.register(dsc_messages)
cleanup_manager.register(deauth_alerts)
# ============================================
# WATERFALL SOURCE TRACKING
# ============================================
# Tracks whether waterfall data is being produced by a decoder's IQ pipeline
# None = no active source, 'rtl_power' = standalone, 'pager'/'sensor' = decoder-driven
waterfall_source: str | None = None
# ============================================
# SDR DEVICE REGISTRY
# ============================================
@@ -666,7 +659,7 @@ def kill_all() -> Response:
"""Kill all decoder, WiFi, and Bluetooth processes."""
global current_process, sensor_process, wifi_process, adsb_process, ais_process, acars_process
global aprs_process, aprs_rtl_process, dsc_process, dsc_rtl_process, bt_process
global dmr_process, dmr_rtl_process, waterfall_source
global dmr_process, dmr_rtl_process
# Import adsb and ais modules to reset their state
from routes import adsb as adsb_module
@@ -675,7 +668,7 @@ def kill_all() -> Response:
killed = []
processes_to_kill = [
'rtl_fm', 'multimon-ng', 'rtl_433', 'rtl_sdr',
'rtl_fm', 'multimon-ng', 'rtl_433',
'airodump-ng', 'aireplay-ng', 'airmon-ng',
'dump1090', 'acarsdec', 'direwolf', 'AIS-catcher',
'hcitool', 'bluetoothctl', 'dsd',
@@ -748,10 +741,9 @@ def kill_all() -> Response:
except Exception:
pass
# Clear SDR device registry and waterfall source
# Clear SDR device registry
with sdr_device_registry_lock:
sdr_device_registry.clear()
waterfall_source = None
return jsonify({'status': 'killed', 'processes': killed})

View File

@@ -1725,16 +1725,6 @@ def start_waterfall() -> Response:
"""Start the waterfall/spectrogram display."""
global waterfall_thread, waterfall_running, waterfall_config, waterfall_active_device
# Check if a decoder is already producing FFT data via IQ pipeline
if app_module.waterfall_source in ('pager', 'sensor'):
# Decoder-driven waterfall: data is already flowing into waterfall_queue
waterfall_running = True
return jsonify({
'status': 'started',
'source': 'decoder',
'decoder': app_module.waterfall_source,
})
with waterfall_lock:
if waterfall_running:
return jsonify({'status': 'error', 'message': 'Waterfall already running'}), 409
@@ -1789,13 +1779,6 @@ def start_waterfall() -> Response:
@listening_post_bp.route('/waterfall/stop', methods=['POST'])
def stop_waterfall() -> Response:
"""Stop the waterfall display."""
global waterfall_running
# If waterfall is decoder-driven, just disconnect (don't stop the decoder)
if app_module.waterfall_source in ('pager', 'sensor'):
waterfall_running = False
return jsonify({'status': 'stopped', 'source': 'decoder'})
_stop_waterfall_internal()
return jsonify({'status': 'stopped'})

View File

@@ -22,8 +22,8 @@ from utils.validation import (
validate_frequency, validate_device_index, validate_gain, validate_ppm,
validate_rtl_tcp_host, validate_rtl_tcp_port
)
from utils.sse import format_sse
from utils.event_pipeline import process_event
from utils.sse import format_sse
from utils.event_pipeline import process_event
from utils.process import safe_terminate, register_process, unregister_process
from utils.sdr import SDRFactory, SDRType, SDRValidationError
from utils.dependencies import get_tool_path
@@ -32,8 +32,6 @@ pager_bp = Blueprint('pager', __name__)
# Track which device is being used
pager_active_device: int | None = None
# IQ pipeline stop event (set to signal IQ processor thread to exit)
pager_iq_stop_event: threading.Event | None = None
def parse_multimon_output(line: str) -> dict[str, str] | None:
@@ -149,18 +147,12 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None:
except Exception as e:
app_module.output_queue.put({'type': 'error', 'text': str(e)})
finally:
global pager_active_device, pager_iq_stop_event
global pager_active_device
try:
os.close(master_fd)
except OSError:
pass
# Stop IQ pipeline if running
if pager_iq_stop_event is not None:
pager_iq_stop_event.set()
pager_iq_stop_event = None
if app_module.waterfall_source == 'pager':
app_module.waterfall_source = None
# Cleanup companion rtl_sdr/rtl_fm process and decoder
# Cleanup companion rtl_fm process and decoder
with app_module.process_lock:
rtl_proc = getattr(app_module.current_process, '_rtl_process', None)
for proc in [rtl_proc, process]:
@@ -183,28 +175,6 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None:
pager_active_device = None
def _cleanup_failed_start(rtl_process: subprocess.Popen | None) -> None:
"""Clean up after a failed start attempt."""
global pager_active_device, pager_iq_stop_event
if rtl_process:
try:
rtl_process.terminate()
rtl_process.wait(timeout=2)
except Exception:
try:
rtl_process.kill()
except Exception:
pass
if pager_iq_stop_event is not None:
pager_iq_stop_event.set()
pager_iq_stop_event = None
if app_module.waterfall_source == 'pager':
app_module.waterfall_source = None
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device)
pager_active_device = None
@pager_bp.route('/start', methods=['POST'])
def start_decoding() -> Response:
global pager_active_device
@@ -302,196 +272,115 @@ def start_decoding() -> Response:
builder = SDRFactory.get_builder(sdr_device.sdr_type)
# Build FM demodulation command
bias_t = data.get('bias_t', False)
rtl_cmd = builder.build_fm_demod_command(
device=sdr_device,
frequency_mhz=freq,
sample_rate=22050,
gain=float(gain) if gain and gain != '0' else None,
ppm=int(ppm) if ppm and ppm != '0' else None,
modulation='fm',
squelch=squelch if squelch and squelch != 0 else None,
bias_t=bias_t
)
multimon_path = get_tool_path('multimon-ng')
if not multimon_path:
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device)
pager_active_device = None
return jsonify({'status': 'error', 'message': 'multimon-ng not found'}), 400
multimon_cmd = [multimon_path, '-t', 'raw'] + decoders + ['-f', 'alpha', '-']
bias_t = data.get('bias_t', False)
gain_val = float(gain) if gain and gain != '0' else None
ppm_val = int(ppm) if ppm and ppm != '0' else None
full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd)
logger.info(f"Running: {full_cmd}")
# Determine if we can use IQ pipeline for live waterfall
use_iq_pipeline = (
sdr_type == SDRType.RTL_SDR
and not rtl_tcp_host
and get_tool_path('rtl_sdr') is not None
)
if use_iq_pipeline:
# IQ pipeline: rtl_sdr -> Python IQ processor -> multimon-ng
iq_sample_rate = 220500 # 22050 * 10 for exact decimation
rtl_cmd = builder.build_raw_capture_command(
device=sdr_device,
frequency_mhz=freq,
sample_rate=iq_sample_rate,
gain=gain_val,
ppm=ppm_val,
bias_t=bias_t,
try:
# Create pipe: rtl_fm | multimon-ng
rtl_process = subprocess.Popen(
rtl_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
register_process(rtl_process)
full_cmd = ' '.join(rtl_cmd) + ' | [iq_processor] | ' + ' '.join(multimon_cmd)
logger.info(f"Running (IQ pipeline): {full_cmd}")
# Start a thread to monitor rtl_fm stderr for errors
def monitor_rtl_stderr():
for line in rtl_process.stderr:
err_text = line.decode('utf-8', errors='replace').strip()
if err_text:
logger.debug(f"[RTL_FM] {err_text}")
app_module.output_queue.put({'type': 'raw', 'text': f'[rtl_fm] {err_text}'})
try:
rtl_process = subprocess.Popen(
rtl_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
register_process(rtl_process)
rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr)
rtl_stderr_thread.daemon = True
rtl_stderr_thread.start()
# Monitor rtl_sdr stderr
def monitor_rtl_stderr():
for line in rtl_process.stderr:
err_text = line.decode('utf-8', errors='replace').strip()
if err_text:
logger.debug(f"[rtl_sdr] {err_text}")
app_module.output_queue.put({'type': 'raw', 'text': f'[rtl_sdr] {err_text}'})
# Create a pseudo-terminal for multimon-ng output
master_fd, slave_fd = pty.openpty()
rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr, daemon=True)
rtl_stderr_thread.start()
# Create PTY for multimon-ng output
master_fd, slave_fd = pty.openpty()
multimon_process = subprocess.Popen(
multimon_cmd,
stdin=subprocess.PIPE,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True,
)
register_process(multimon_process)
os.close(slave_fd)
# Start IQ processor thread
from routes.listening_post import waterfall_queue
from utils.iq_processor import run_fm_iq_pipeline
stop_event = threading.Event()
pager_iq_stop_event = stop_event
app_module.waterfall_source = 'pager'
iq_thread = threading.Thread(
target=run_fm_iq_pipeline,
args=(
rtl_process.stdout,
multimon_process.stdin,
waterfall_queue,
freq,
iq_sample_rate,
stop_event,
),
daemon=True,
)
iq_thread.start()
app_module.current_process = multimon_process
app_module.current_process._rtl_process = rtl_process
app_module.current_process._master_fd = master_fd
# Start decoder output thread
thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process), daemon=True)
thread.start()
app_module.output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'})
return jsonify({'status': 'started', 'command': full_cmd, 'waterfall_source': 'pager'})
except FileNotFoundError as e:
_cleanup_failed_start(rtl_process)
return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'})
except Exception as e:
_cleanup_failed_start(rtl_process)
return jsonify({'status': 'error', 'message': str(e)})
else:
# Legacy pipeline: rtl_fm | multimon-ng
rtl_cmd = builder.build_fm_demod_command(
device=sdr_device,
frequency_mhz=freq,
sample_rate=22050,
gain=gain_val,
ppm=ppm_val,
modulation='fm',
squelch=squelch if squelch and squelch != 0 else None,
bias_t=bias_t,
multimon_process = subprocess.Popen(
multimon_cmd,
stdin=rtl_process.stdout,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True
)
register_process(multimon_process)
full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd)
logger.info(f"Running: {full_cmd}")
os.close(slave_fd)
rtl_process.stdout.close()
app_module.current_process = multimon_process
app_module.current_process._rtl_process = rtl_process
app_module.current_process._master_fd = master_fd
# Start output thread with PTY master fd
thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process))
thread.daemon = True
thread.start()
app_module.output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'})
return jsonify({'status': 'started', 'command': full_cmd})
except FileNotFoundError as e:
# Kill orphaned rtl_fm process
try:
rtl_process = subprocess.Popen(
rtl_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
register_process(rtl_process)
# Start a thread to monitor rtl_fm stderr for errors
def monitor_rtl_stderr():
for line in rtl_process.stderr:
err_text = line.decode('utf-8', errors='replace').strip()
if err_text:
logger.debug(f"[RTL_FM] {err_text}")
app_module.output_queue.put({'type': 'raw', 'text': f'[rtl_fm] {err_text}'})
rtl_stderr_thread = threading.Thread(target=monitor_rtl_stderr, daemon=True)
rtl_stderr_thread.start()
# Create a pseudo-terminal for multimon-ng output
master_fd, slave_fd = pty.openpty()
multimon_process = subprocess.Popen(
multimon_cmd,
stdin=rtl_process.stdout,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True,
)
register_process(multimon_process)
os.close(slave_fd)
rtl_process.stdout.close()
app_module.current_process = multimon_process
app_module.current_process._rtl_process = rtl_process
app_module.current_process._master_fd = master_fd
# Start output thread with PTY master fd
thread = threading.Thread(target=stream_decoder, args=(master_fd, multimon_process), daemon=True)
thread.start()
app_module.output_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'})
return jsonify({'status': 'started', 'command': full_cmd})
except FileNotFoundError as e:
_cleanup_failed_start(rtl_process)
return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'})
except Exception as e:
_cleanup_failed_start(rtl_process)
return jsonify({'status': 'error', 'message': str(e)})
rtl_process.terminate()
rtl_process.wait(timeout=2)
except Exception:
try:
rtl_process.kill()
except Exception:
pass
# Release device on failure
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device)
pager_active_device = None
return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'})
except Exception as e:
# Kill orphaned rtl_fm process if it was started
try:
rtl_process.terminate()
rtl_process.wait(timeout=2)
except Exception:
try:
rtl_process.kill()
except Exception:
pass
# Release device on failure
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device)
pager_active_device = None
return jsonify({'status': 'error', 'message': str(e)})
@pager_bp.route('/stop', methods=['POST'])
def stop_decoding() -> Response:
global pager_active_device, pager_iq_stop_event
global pager_active_device
with app_module.process_lock:
if app_module.current_process:
# Stop IQ pipeline if running
if pager_iq_stop_event is not None:
pager_iq_stop_event.set()
pager_iq_stop_event = None
if app_module.waterfall_source == 'pager':
app_module.waterfall_source = None
# Kill rtl_sdr/rtl_fm process first
# Kill rtl_fm process first
if hasattr(app_module.current_process, '_rtl_process'):
try:
app_module.current_process._rtl_process.terminate()
@@ -580,14 +469,14 @@ def stream() -> Response:
keepalive_interval = 30.0 # Send keepalive every 30 seconds instead of 1 second
while True:
try:
msg = app_module.output_queue.get(timeout=1)
last_keepalive = time.time()
try:
process_event('pager', msg, msg.get('type'))
except Exception:
pass
yield format_sse(msg)
try:
msg = app_module.output_queue.get(timeout=1)
last_keepalive = time.time()
try:
process_event('pager', msg, msg.get('type'))
except Exception:
pass
yield format_sse(msg)
except queue.Empty:
now = time.time()
if now - last_keepalive >= keepalive_interval:

View File

@@ -18,20 +18,15 @@ from utils.validation import (
validate_frequency, validate_device_index, validate_gain, validate_ppm,
validate_rtl_tcp_host, validate_rtl_tcp_port
)
from utils.sse import format_sse
from utils.event_pipeline import process_event
from utils.sse import format_sse
from utils.event_pipeline import process_event
from utils.process import safe_terminate, register_process, unregister_process
from utils.sdr import SDRFactory, SDRType
from utils.dependencies import get_tool_path
sensor_bp = Blueprint('sensor', __name__)
# Track which device is being used
sensor_active_device: int | None = None
# IQ pipeline stop event
sensor_iq_stop_event: threading.Event | None = None
# Companion rtl_sdr process when using IQ pipeline
sensor_rtl_process: subprocess.Popen | None = None
def stream_sensor_output(process: subprocess.Popen[bytes]) -> None:
@@ -65,26 +60,8 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None:
except Exception as e:
app_module.sensor_queue.put({'type': 'error', 'text': str(e)})
finally:
global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process
# Stop IQ pipeline if running
if sensor_iq_stop_event is not None:
sensor_iq_stop_event.set()
sensor_iq_stop_event = None
if app_module.waterfall_source == 'sensor':
app_module.waterfall_source = None
# Terminate companion rtl_sdr process
if sensor_rtl_process is not None:
try:
sensor_rtl_process.terminate()
sensor_rtl_process.wait(timeout=2)
except Exception:
try:
sensor_rtl_process.kill()
except Exception:
pass
unregister_process(sensor_rtl_process)
sensor_rtl_process = None
# Ensure decoder process is terminated
global sensor_active_device
# Ensure process is terminated
try:
process.terminate()
process.wait(timeout=2)
@@ -103,32 +80,9 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None:
sensor_active_device = None
def _cleanup_sensor_failed_start(rtl_process: subprocess.Popen | None) -> None:
"""Clean up after a failed sensor start attempt."""
global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process
if rtl_process:
try:
rtl_process.terminate()
rtl_process.wait(timeout=2)
except Exception:
try:
rtl_process.kill()
except Exception:
pass
if sensor_iq_stop_event is not None:
sensor_iq_stop_event.set()
sensor_iq_stop_event = None
if app_module.waterfall_source == 'sensor':
app_module.waterfall_source = None
sensor_rtl_process = None
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device)
sensor_active_device = None
@sensor_bp.route('/start_sensor', methods=['POST'])
def start_sensor() -> Response:
global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process
global sensor_active_device
with app_module.sensor_lock:
if app_module.sensor_process:
@@ -190,187 +144,69 @@ def start_sensor() -> Response:
sdr_device = SDRFactory.create_default_device(sdr_type, index=device)
builder = SDRFactory.get_builder(sdr_device.sdr_type)
bias_t = data.get('bias_t', False)
gain_val = float(gain) if gain and gain != 0 else None
ppm_val = int(ppm) if ppm and ppm != 0 else None
# Determine if we can use IQ pipeline for live waterfall
use_iq_pipeline = (
sdr_type == SDRType.RTL_SDR
and not rtl_tcp_host
and get_tool_path('rtl_sdr') is not None
# Build ISM band decoder command
bias_t = data.get('bias_t', False)
cmd = builder.build_ism_command(
device=sdr_device,
frequency_mhz=freq,
gain=float(gain) if gain and gain != 0 else None,
ppm=int(ppm) if ppm and ppm != 0 else None,
bias_t=bias_t
)
if use_iq_pipeline:
# IQ pipeline: rtl_sdr -> Python IQ tee -> rtl_433 -r -
iq_sample_rate = 250000 # rtl_433 default
full_cmd = ' '.join(cmd)
logger.info(f"Running: {full_cmd}")
rtl_cmd = builder.build_raw_capture_command(
device=sdr_device,
frequency_mhz=freq,
sample_rate=iq_sample_rate,
gain=gain_val,
ppm=ppm_val,
bias_t=bias_t,
try:
app_module.sensor_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
register_process(app_module.sensor_process)
rtl_433_path = get_tool_path('rtl_433') or 'rtl_433'
decoder_cmd = [rtl_433_path, '-r', '-', '-s', str(iq_sample_rate), '-F', 'json']
# Start output thread
thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,))
thread.daemon = True
thread.start()
full_cmd = ' '.join(rtl_cmd) + ' | [iq_processor] | ' + ' '.join(decoder_cmd)
logger.info(f"Running (IQ pipeline): {full_cmd}")
# Monitor stderr
def monitor_stderr():
for line in app_module.sensor_process.stderr:
err = line.decode('utf-8', errors='replace').strip()
if err:
logger.debug(f"[rtl_433] {err}")
app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'})
try:
rtl_process = subprocess.Popen(
rtl_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
register_process(rtl_process)
sensor_rtl_process = rtl_process
stderr_thread = threading.Thread(target=monitor_stderr)
stderr_thread.daemon = True
stderr_thread.start()
# Monitor rtl_sdr stderr
def monitor_rtl_stderr():
for line in rtl_process.stderr:
err = line.decode('utf-8', errors='replace').strip()
if err:
logger.debug(f"[rtl_sdr] {err}")
app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_sdr] {err}'})
app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'})
threading.Thread(target=monitor_rtl_stderr, daemon=True).start()
return jsonify({'status': 'started', 'command': full_cmd})
# Start rtl_433 reading from stdin
decoder_process = subprocess.Popen(
decoder_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
register_process(decoder_process)
# Start IQ processor thread
from routes.listening_post import waterfall_queue
from utils.iq_processor import run_passthrough_iq_pipeline
stop_event = threading.Event()
sensor_iq_stop_event = stop_event
app_module.waterfall_source = 'sensor'
iq_thread = threading.Thread(
target=run_passthrough_iq_pipeline,
args=(
rtl_process.stdout,
decoder_process.stdin,
waterfall_queue,
freq,
iq_sample_rate,
stop_event,
),
daemon=True,
)
iq_thread.start()
app_module.sensor_process = decoder_process
# Monitor rtl_433 stderr
def monitor_decoder_stderr():
for line in decoder_process.stderr:
err = line.decode('utf-8', errors='replace').strip()
if err:
logger.debug(f"[rtl_433] {err}")
app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'})
threading.Thread(target=monitor_decoder_stderr, daemon=True).start()
# Start output thread
thread = threading.Thread(target=stream_sensor_output, args=(decoder_process,), daemon=True)
thread.start()
app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'})
return jsonify({'status': 'started', 'command': full_cmd, 'waterfall_source': 'sensor'})
except FileNotFoundError:
_cleanup_sensor_failed_start(rtl_process)
return jsonify({'status': 'error', 'message': 'rtl_sdr or rtl_433 not found'})
except Exception as e:
_cleanup_sensor_failed_start(rtl_process)
return jsonify({'status': 'error', 'message': str(e)})
else:
# Legacy pipeline: rtl_433 directly
cmd = builder.build_ism_command(
device=sdr_device,
frequency_mhz=freq,
gain=gain_val,
ppm=ppm_val,
bias_t=bias_t,
)
full_cmd = ' '.join(cmd)
logger.info(f"Running: {full_cmd}")
try:
app_module.sensor_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
register_process(app_module.sensor_process)
# Start output thread
thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,), daemon=True)
thread.start()
# Monitor stderr
def monitor_stderr():
for line in app_module.sensor_process.stderr:
err = line.decode('utf-8', errors='replace').strip()
if err:
logger.debug(f"[rtl_433] {err}")
app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'})
threading.Thread(target=monitor_stderr, daemon=True).start()
app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'})
return jsonify({'status': 'started', 'command': full_cmd})
except FileNotFoundError:
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device)
sensor_active_device = None
return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'})
except Exception as e:
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device)
sensor_active_device = None
return jsonify({'status': 'error', 'message': str(e)})
except FileNotFoundError:
# Release device on failure
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device)
sensor_active_device = None
return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'})
except Exception as e:
# Release device on failure
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device)
sensor_active_device = None
return jsonify({'status': 'error', 'message': str(e)})
@sensor_bp.route('/stop_sensor', methods=['POST'])
def stop_sensor() -> Response:
global sensor_active_device, sensor_iq_stop_event, sensor_rtl_process
global sensor_active_device
with app_module.sensor_lock:
if app_module.sensor_process:
# Stop IQ pipeline if running
if sensor_iq_stop_event is not None:
sensor_iq_stop_event.set()
sensor_iq_stop_event = None
if app_module.waterfall_source == 'sensor':
app_module.waterfall_source = None
# Kill companion rtl_sdr process
if sensor_rtl_process is not None:
try:
sensor_rtl_process.terminate()
sensor_rtl_process.wait(timeout=2)
except (subprocess.TimeoutExpired, OSError):
try:
sensor_rtl_process.kill()
except OSError:
pass
sensor_rtl_process = None
app_module.sensor_process.terminate()
try:
app_module.sensor_process.wait(timeout=2)
@@ -396,13 +232,13 @@ def stream_sensor() -> Response:
while True:
try:
msg = app_module.sensor_queue.get(timeout=1)
last_keepalive = time.time()
try:
process_event('sensor', msg, msg.get('type'))
except Exception:
pass
yield format_sse(msg)
msg = app_module.sensor_queue.get(timeout=1)
last_keepalive = time.time()
try:
process_event('sensor', msg, msg.get('type'))
except Exception:
pass
yield format_sse(msg)
except queue.Empty:
now = time.time()
if now - last_keepalive >= keepalive_interval:

View File

@@ -88,9 +88,6 @@ function startDmr() {
if (typeof reserveDevice === 'function') {
reserveDevice(parseInt(device), 'dmr');
}
if (typeof syncWaterfallToFrequency === 'function') {
syncWaterfallToFrequency(frequency, { autoStart: true, restartIfRunning: true, silent: true });
}
if (typeof showNotification === 'function') {
showNotification('DMR', `Decoding ${frequency} MHz (${protocol.toUpperCase()})`);
}

View File

@@ -3589,8 +3589,7 @@ async function startWaterfall(options = {}) {
lastWaterfallDraw = 0;
initWaterfallCanvas();
connectWaterfallSSE();
// Only reserve device if not decoder-driven (decoder already owns the device)
if (data.source !== 'decoder' && typeof reserveDevice === 'function') {
if (typeof reserveDevice === 'function') {
reserveDevice(parseInt(device), 'waterfall');
}
if (resume || resumeRfWaterfallAfterListening) {
@@ -3619,14 +3618,11 @@ async function stopWaterfall() {
}
try {
const resp = await fetch('/listening/waterfall/stop', { method: 'POST' });
let stopData = {};
try { stopData = await resp.json(); } catch (e) {}
await fetch('/listening/waterfall/stop', { method: 'POST' });
isWaterfallRunning = false;
if (waterfallEventSource) { waterfallEventSource.close(); waterfallEventSource = null; }
setWaterfallControlButtons(false);
// Only release device if it was a standalone waterfall (not decoder-driven)
if (stopData.source !== 'decoder' && typeof releaseDevice === 'function') {
if (typeof releaseDevice === 'function') {
releaseDevice('waterfall');
}
} catch (err) {

View File

@@ -98,9 +98,6 @@ const SSTVGeneral = (function() {
updateStatusUI('listening', `${frequency} MHz ${modulation.toUpperCase()}`);
startStream();
showNotification('SSTV', `Listening on ${frequency} MHz ${modulation.toUpperCase()}`);
if (typeof syncWaterfallToFrequency === 'function') {
syncWaterfallToFrequency(frequency, { autoStart: true, restartIfRunning: true, silent: true });
}
// Update strip
const stripFreq = document.getElementById('sstvGeneralStripFreq');

View File

@@ -537,18 +537,15 @@ const SSTV = (function() {
const data = await response.json();
if (data.status === 'started' || data.status === 'already_running') {
isRunning = true;
if (typeof reserveDevice === 'function') {
reserveDevice(device, 'sstv');
}
updateStatusUI('listening', `${frequency} MHz`);
startStream();
if (typeof syncWaterfallToFrequency === 'function') {
syncWaterfallToFrequency(frequency, { autoStart: true, restartIfRunning: true, silent: true });
}
showNotification('SSTV', `Listening on ${frequency} MHz`);
} else {
if (data.status === 'started' || data.status === 'already_running') {
isRunning = true;
if (typeof reserveDevice === 'function') {
reserveDevice(device, 'sstv');
}
updateStatusUI('listening', `${frequency} MHz`);
startStream();
showNotification('SSTV', `Listening on ${frequency} MHz`);
} else {
updateStatusUI('idle', 'Start failed');
showStatusMessage(data.message || 'Failed to start decoder', 'error');
}

View File

@@ -2859,23 +2859,6 @@
}
}
function getModeWaterfallFrequency(mode) {
const lookup = {
pager: 'frequency',
sensor: 'sensorFrequency',
rtlamr: 'rtlamrFrequency',
dmr: 'dmrFrequency',
sstv: 'sstvFrequency',
sstv_general: 'sstvGeneralFrequency',
listening: 'radioScanStart'
};
const id = lookup[mode];
if (!id) return NaN;
const el = document.getElementById(id);
const value = parseFloat(el?.value);
return Number.isFinite(value) ? value : NaN;
}
// Mode switching
function switchMode(mode, options = {}) {
const { updateUrl = true } = options;
@@ -3079,19 +3062,13 @@
// Show shared waterfall controls for supported modes
const waterfallControlsSection = document.getElementById('waterfallControlsSection');
const waterfallPanel = document.getElementById('waterfallPanel');
const waterfallModes = ['pager', 'sensor', 'rtlamr', 'dmr', 'sstv', 'sstv_general', 'listening'];
const waterfallModes = ['listening'];
const waterfallSupported = waterfallModes.includes(mode);
if (waterfallControlsSection) waterfallControlsSection.style.display = waterfallSupported ? 'block' : 'none';
if (waterfallPanel) {
const running = (typeof isWaterfallRunning !== 'undefined' && isWaterfallRunning);
waterfallPanel.style.display = (waterfallSupported && running) ? 'block' : 'none';
}
if (waterfallSupported && typeof syncWaterfallToFrequency === 'function' && typeof isWaterfallRunning !== 'undefined' && isWaterfallRunning) {
const modeFreq = getModeWaterfallFrequency(mode);
if (Number.isFinite(modeFreq)) {
syncWaterfallToFrequency(modeFreq, { autoStart: true, restartIfRunning: true, silent: true });
}
}
// Toggle mode-specific tool status displays
const toolStatusPager = document.getElementById('toolStatusPager');
@@ -3187,9 +3164,6 @@
// Sensor frequency
function setSensorFreq(freq) {
document.getElementById('sensorFrequency').value = freq;
if (typeof syncWaterfallToFrequency === 'function') {
syncWaterfallToFrequency(freq, { autoStart: typeof isWaterfallRunning !== 'undefined' && isWaterfallRunning });
}
if (isSensorRunning) {
fetch('/stop_sensor', { method: 'POST' })
.then(() => setTimeout(() => startSensorDecoding(), 500));
@@ -3274,9 +3248,6 @@
reserveDevice(parseInt(device), 'sensor');
setSensorRunning(true);
startSensorStream();
if (!remoteConfig && typeof syncWaterfallToFrequency === 'function') {
syncWaterfallToFrequency(freq, { autoStart: true, restartIfRunning: true, silent: true });
}
// Initialize sensor filter bar
const filterContainer = document.getElementById('filterBarContainer');
@@ -3530,9 +3501,6 @@
function setRtlamrFreq(freq) {
document.getElementById('rtlamrFrequency').value = freq;
if (typeof syncWaterfallToFrequency === 'function') {
syncWaterfallToFrequency(freq, { autoStart: typeof isWaterfallRunning !== 'undefined' && isWaterfallRunning });
}
}
// RTLAMR mode polling timer for agent mode
@@ -3588,9 +3556,6 @@
}
setRtlamrRunning(true);
startRtlamrStream(isAgentMode);
if (!isAgentMode && typeof syncWaterfallToFrequency === 'function') {
syncWaterfallToFrequency(freq, { autoStart: true, restartIfRunning: true, silent: true });
}
// Initialize meter filter bar (reuse sensor filter bar since same structure)
const filterContainer = document.getElementById('filterBarContainer');
@@ -4362,9 +4327,6 @@
}
setRunning(true);
startStream(isAgentMode);
if (!isAgentMode && !remoteConfig && typeof syncWaterfallToFrequency === 'function') {
syncWaterfallToFrequency(freq, { autoStart: true, restartIfRunning: true, silent: true });
}
// Initialize filter bar
const filterContainer = document.getElementById('filterBarContainer');

View File

@@ -121,15 +121,6 @@ TOOL_DEPENDENCIES = {
'manual': 'https://github.com/EliasOenal/multimon-ng'
}
},
'rtl_sdr': {
'required': False,
'description': 'Raw IQ capture for live waterfall during decoding',
'install': {
'apt': 'sudo apt install rtl-sdr',
'brew': 'brew install librtlsdr',
'manual': 'https://osmocom.org/projects/rtl-sdr/wiki'
}
},
'rtl_test': {
'required': False,
'description': 'RTL-SDR device detection',
@@ -152,15 +143,6 @@ TOOL_DEPENDENCIES = {
'brew': 'brew install rtl_433',
'manual': 'https://github.com/merbanan/rtl_433'
}
},
'rtl_sdr': {
'required': False,
'description': 'Raw IQ capture for live waterfall during decoding',
'install': {
'apt': 'sudo apt install rtl-sdr',
'brew': 'brew install librtlsdr',
'manual': 'https://osmocom.org/projects/rtl-sdr/wiki'
}
}
}
},

View File

@@ -1,230 +0,0 @@
"""IQ processing pipelines for live waterfall during SDR decoding.
Provides two pipeline functions:
- run_fm_iq_pipeline: FM demodulates IQ for pager decoding + FFT for waterfall
- run_passthrough_iq_pipeline: Passes raw IQ to rtl_433 + FFT for waterfall
"""
from __future__ import annotations
import logging
import struct
import threading
import queue
from datetime import datetime
from typing import IO, Optional
import numpy as np
logger = logging.getLogger('intercept.iq_processor')
# FFT parameters
FFT_SIZE = 2048
FFT_INTERVAL_SECONDS = 0.1 # ~10 updates/sec
def iq_to_complex(buf: bytes) -> np.ndarray:
"""Convert raw uint8 IQ bytes to complex float samples.
RTL-SDR outputs interleaved uint8 I/Q pairs centered at 127.5.
"""
raw = np.frombuffer(buf, dtype=np.uint8).astype(np.float32)
raw = (raw - 127.5) / 127.5
return raw[0::2] + 1j * raw[1::2]
def compute_fft_bins(samples: np.ndarray, fft_size: int = FFT_SIZE) -> list[float]:
"""Compute power spectral density in dB from complex IQ samples.
Returns a list of power values (dB) for each frequency bin.
"""
if len(samples) < fft_size:
# Pad with zeros if not enough samples
padded = np.zeros(fft_size, dtype=np.complex64)
padded[:len(samples)] = samples[:fft_size]
samples = padded
else:
samples = samples[:fft_size]
# Apply Hanning window to reduce spectral leakage
window = np.hanning(fft_size).astype(np.float32)
windowed = samples * window
# FFT and shift DC to center
spectrum = np.fft.fftshift(np.fft.fft(windowed))
# Power in dB (avoid log of zero)
power = np.abs(spectrum) ** 2
power = np.maximum(power, 1e-20)
power_db = 10.0 * np.log10(power)
return power_db.tolist()
def _push_waterfall(waterfall_queue: queue.Queue, bins: list[float],
center_freq_mhz: float, sample_rate: int) -> None:
"""Push a waterfall sweep message to the queue."""
half_span = (sample_rate / 1e6) / 2.0
msg = {
'type': 'waterfall_sweep',
'start_freq': center_freq_mhz - half_span,
'end_freq': center_freq_mhz + half_span,
'bins': bins,
'timestamp': datetime.now().isoformat(),
}
try:
waterfall_queue.put_nowait(msg)
except queue.Full:
# Drop oldest and retry
try:
waterfall_queue.get_nowait()
except queue.Empty:
pass
try:
waterfall_queue.put_nowait(msg)
except queue.Full:
pass
def run_fm_iq_pipeline(
iq_stdout: IO[bytes],
audio_stdin: IO[bytes],
waterfall_queue: queue.Queue,
center_freq_mhz: float,
sample_rate: int,
stop_event: threading.Event,
) -> None:
"""FM demodulation pipeline: IQ -> FFT + FM demod -> 22050 Hz PCM.
Reads raw uint8 IQ from rtl_sdr stdout, computes FFT for waterfall,
FM demodulates, decimates to 22050 Hz, and writes 16-bit PCM to
multimon-ng stdin.
Args:
iq_stdout: rtl_sdr stdout (raw uint8 IQ)
audio_stdin: multimon-ng stdin (16-bit PCM)
waterfall_queue: Queue for waterfall sweep messages
center_freq_mhz: Center frequency in MHz
sample_rate: IQ sample rate (should be 220500 for 10x decimation to 22050)
stop_event: Threading event to signal shutdown
"""
from scipy.signal import decimate as scipy_decimate
# Decimation factor: sample_rate / 22050
decim_factor = sample_rate // 22050
if decim_factor < 1:
decim_factor = 1
# Read in chunks: ~100ms worth of IQ data (2 bytes per sample: I + Q)
chunk_bytes = int(sample_rate * FFT_INTERVAL_SECONDS) * 2
# Align to even number of bytes (I/Q pairs)
chunk_bytes = (chunk_bytes // 2) * 2
# Previous sample for FM demod continuity
prev_sample = np.complex64(0)
logger.info(f"FM IQ pipeline started: {center_freq_mhz} MHz, "
f"sr={sample_rate}, decim={decim_factor}")
try:
while not stop_event.is_set():
raw = iq_stdout.read(chunk_bytes)
if not raw:
break
# Convert to complex IQ
iq = iq_to_complex(raw)
if len(iq) == 0:
continue
# Compute FFT for waterfall
bins = compute_fft_bins(iq, FFT_SIZE)
_push_waterfall(waterfall_queue, bins, center_freq_mhz, sample_rate)
# FM demodulation via instantaneous phase difference
# Prepend previous sample for continuity
iq_with_prev = np.concatenate(([prev_sample], iq))
prev_sample = iq[-1]
phase_diff = np.angle(iq_with_prev[1:] * np.conj(iq_with_prev[:-1]))
# Decimate to 22050 Hz
if decim_factor > 1:
audio = scipy_decimate(phase_diff, decim_factor, ftype='fir')
else:
audio = phase_diff
# Scale to 16-bit PCM range
audio = np.clip(audio * 10000, -32767, 32767).astype(np.int16)
# Write to multimon-ng
try:
audio_stdin.write(audio.tobytes())
audio_stdin.flush()
except (BrokenPipeError, OSError):
break
except Exception as e:
logger.error(f"FM IQ pipeline error: {e}")
finally:
logger.info("FM IQ pipeline stopped")
try:
audio_stdin.close()
except Exception:
pass
def run_passthrough_iq_pipeline(
iq_stdout: IO[bytes],
decoder_stdin: IO[bytes],
waterfall_queue: queue.Queue,
center_freq_mhz: float,
sample_rate: int,
stop_event: threading.Event,
) -> None:
"""Passthrough pipeline: IQ -> FFT + raw bytes to decoder.
Reads raw uint8 IQ from rtl_sdr stdout, computes FFT for waterfall,
and writes raw IQ bytes unchanged to rtl_433 stdin.
Args:
iq_stdout: rtl_sdr stdout (raw uint8 IQ)
decoder_stdin: rtl_433 stdin (raw cu8 IQ)
waterfall_queue: Queue for waterfall sweep messages
center_freq_mhz: Center frequency in MHz
sample_rate: IQ sample rate (should be 250000 for rtl_433)
stop_event: Threading event to signal shutdown
"""
# Read in chunks: ~100ms worth of IQ data
chunk_bytes = int(sample_rate * FFT_INTERVAL_SECONDS) * 2
chunk_bytes = (chunk_bytes // 2) * 2
logger.info(f"Passthrough IQ pipeline started: {center_freq_mhz} MHz, sr={sample_rate}")
try:
while not stop_event.is_set():
raw = iq_stdout.read(chunk_bytes)
if not raw:
break
# Compute FFT for waterfall
iq = iq_to_complex(raw)
if len(iq) > 0:
bins = compute_fft_bins(iq, FFT_SIZE)
_push_waterfall(waterfall_queue, bins, center_freq_mhz, sample_rate)
# Pass raw bytes unchanged to decoder
try:
decoder_stdin.write(raw)
decoder_stdin.flush()
except (BrokenPipeError, OSError):
break
except Exception as e:
logger.error(f"Passthrough IQ pipeline error: {e}")
finally:
logger.info("Passthrough IQ pipeline stopped")
try:
decoder_stdin.close()
except Exception:
pass

View File

@@ -186,36 +186,6 @@ class CommandBuilder(ABC):
"""Return hardware capabilities for this SDR type."""
pass
def build_raw_capture_command(
self,
device: SDRDevice,
frequency_mhz: float,
sample_rate: int,
gain: Optional[float] = None,
ppm: Optional[int] = None,
bias_t: bool = False
) -> list[str]:
"""
Build raw IQ capture command (for IQ-based waterfall during decoding).
Args:
device: The SDR device to use
frequency_mhz: Center frequency in MHz
sample_rate: Sample rate in Hz
gain: Gain in dB (None for auto)
ppm: PPM frequency correction
bias_t: Enable bias-T power (for active antennas)
Returns:
Command as list of strings for subprocess
Raises:
NotImplementedError: If the SDR type does not support raw capture
"""
raise NotImplementedError(
f"Raw IQ capture not supported for {self.get_sdr_type().value}"
)
@classmethod
@abstractmethod
def get_sdr_type(cls) -> SDRType:

View File

@@ -197,43 +197,6 @@ class RTLSDRCommandBuilder(CommandBuilder):
return cmd
def build_raw_capture_command(
self,
device: SDRDevice,
frequency_mhz: float,
sample_rate: int,
gain: Optional[float] = None,
ppm: Optional[int] = None,
bias_t: bool = False
) -> list[str]:
"""
Build rtl_sdr command for raw IQ capture.
Outputs raw uint8 IQ to stdout for processing by IQ pipelines.
"""
rtl_sdr_path = get_tool_path('rtl_sdr') or 'rtl_sdr'
freq_hz = int(frequency_mhz * 1e6)
cmd = [
rtl_sdr_path,
'-d', self._get_device_arg(device),
'-f', str(freq_hz),
'-s', str(sample_rate),
]
if gain is not None and gain > 0:
cmd.extend(['-g', str(gain)])
if ppm is not None and ppm != 0:
cmd.extend(['-p', str(ppm)])
if bias_t:
cmd.extend(['-T'])
# Output to stdout
cmd.append('-')
return cmd
def build_ais_command(
self,
device: SDRDevice,