mirror of
https://github.com/EFForg/rayhunter.git
synced 2026-04-27 16:09:58 -07:00
Add disk space monitoring to recording lifecycle
This commit is contained in:
@@ -27,6 +27,9 @@ use crate::display;
|
||||
use crate::notifications::{Notification, NotificationType};
|
||||
use crate::qmdl_store::{RecordingStore, RecordingStoreError};
|
||||
use crate::server::ServerState;
|
||||
use crate::stats::DiskStats;
|
||||
|
||||
const SPACE_CHECK_INTERVAL_CONTAINERS: usize = 100;
|
||||
|
||||
pub enum DiagDeviceCtrlMessage {
|
||||
StopRecording,
|
||||
@@ -46,8 +49,11 @@ pub struct DiagTask {
|
||||
analysis_sender: Sender<AnalysisCtrlMessage>,
|
||||
analyzer_config: AnalyzerConfig,
|
||||
notification_channel: tokio::sync::mpsc::Sender<Notification>,
|
||||
min_space_to_start_mb: u64,
|
||||
min_space_to_continue_mb: u64,
|
||||
state: DiagState,
|
||||
max_type_seen: EventType,
|
||||
container_count: usize,
|
||||
}
|
||||
|
||||
enum DiagState {
|
||||
@@ -64,30 +70,90 @@ impl DiagTask {
|
||||
analysis_sender: Sender<AnalysisCtrlMessage>,
|
||||
analyzer_config: AnalyzerConfig,
|
||||
notification_channel: tokio::sync::mpsc::Sender<Notification>,
|
||||
min_space_to_start_mb: u64,
|
||||
min_space_to_continue_mb: u64,
|
||||
) -> Self {
|
||||
Self {
|
||||
ui_update_sender,
|
||||
analysis_sender,
|
||||
analyzer_config,
|
||||
notification_channel,
|
||||
min_space_to_start_mb,
|
||||
min_space_to_continue_mb,
|
||||
state: DiagState::Stopped,
|
||||
max_type_seen: EventType::Informational,
|
||||
container_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Start recording
|
||||
async fn start(&mut self, qmdl_store: &mut RecordingStore) {
|
||||
self.max_type_seen = EventType::Informational;
|
||||
let (qmdl_file, analysis_file) = qmdl_store
|
||||
.new_entry()
|
||||
.await
|
||||
.expect("failed creating QMDL file entry");
|
||||
self.container_count = 0;
|
||||
|
||||
let min_space_bytes = self.min_space_to_start_mb * 1024 * 1024;
|
||||
match DiskStats::new(qmdl_store.path.to_str().unwrap()) {
|
||||
Ok(disk_stats) if disk_stats.available_bytes.unwrap_or(0) < min_space_bytes => {
|
||||
let available_mb = disk_stats.available_bytes.unwrap_or(0) / 1024 / 1024;
|
||||
error!(
|
||||
"Insufficient disk space to start recording: {}MB available, {}MB required",
|
||||
available_mb, self.min_space_to_start_mb
|
||||
);
|
||||
|
||||
if let Err(e) = self
|
||||
.notification_channel
|
||||
.send(Notification::new(
|
||||
NotificationType::Warning,
|
||||
format!(
|
||||
"Cannot start recording: only {}MB free (need {}MB minimum)",
|
||||
available_mb, self.min_space_to_start_mb
|
||||
),
|
||||
None,
|
||||
))
|
||||
.await
|
||||
{
|
||||
warn!("Failed to send notification: {e}");
|
||||
}
|
||||
|
||||
if let Err(e) = self
|
||||
.ui_update_sender
|
||||
.send(display::DisplayState::Paused)
|
||||
.await
|
||||
{
|
||||
warn!("couldn't send ui update message: {e}");
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
Ok(disk_stats) => {
|
||||
let available_mb = disk_stats.available_bytes.unwrap_or(0) / 1024 / 1024;
|
||||
info!(
|
||||
"Starting recording with {}MB disk space available",
|
||||
available_mb
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to check disk space: {e}, starting recording anyway");
|
||||
}
|
||||
}
|
||||
|
||||
let (qmdl_file, analysis_file) = match qmdl_store.new_entry().await {
|
||||
Ok(files) => files,
|
||||
Err(e) => {
|
||||
error!("failed creating QMDL file entry: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
self.stop_current_recording().await;
|
||||
let qmdl_writer = QmdlWriter::new(qmdl_file);
|
||||
let analysis_writer = AnalysisWriter::new(analysis_file, &self.analyzer_config)
|
||||
.await
|
||||
.map(Box::new)
|
||||
.expect("failed to write to analysis file");
|
||||
let analysis_writer = match AnalysisWriter::new(analysis_file, &self.analyzer_config).await
|
||||
{
|
||||
Ok(writer) => Box::new(writer),
|
||||
Err(e) => {
|
||||
error!("failed to create analysis writer: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
self.state = DiagState::Recording {
|
||||
qmdl_writer,
|
||||
analysis_writer,
|
||||
@@ -183,10 +249,60 @@ impl DiagTask {
|
||||
analysis_writer,
|
||||
} = &mut self.state
|
||||
{
|
||||
qmdl_writer
|
||||
.write_container(&container)
|
||||
.await
|
||||
.expect("failed to write to QMDL writer");
|
||||
self.container_count += 1;
|
||||
|
||||
if self.container_count % SPACE_CHECK_INTERVAL_CONTAINERS == 0 {
|
||||
let min_continue_bytes = self.min_space_to_continue_mb * 1024 * 1024;
|
||||
let min_start_bytes = self.min_space_to_start_mb * 1024 * 1024;
|
||||
match DiskStats::new(qmdl_store.path.to_str().unwrap()) {
|
||||
Ok(disk_stats)
|
||||
if disk_stats.available_bytes.unwrap_or(0) < min_continue_bytes =>
|
||||
{
|
||||
let available_mb = disk_stats.available_bytes.unwrap_or(0) / 1024 / 1024;
|
||||
error!(
|
||||
"Disk space critically low ({}MB), stopping recording",
|
||||
available_mb
|
||||
);
|
||||
|
||||
self.notification_channel.send(Notification::new(
|
||||
NotificationType::Warning,
|
||||
format!(
|
||||
"Disk space critically low ({}MB), recording stopped automatically",
|
||||
available_mb
|
||||
),
|
||||
None,
|
||||
)).await.ok();
|
||||
|
||||
self.stop(qmdl_store).await;
|
||||
return;
|
||||
}
|
||||
Ok(disk_stats) if disk_stats.available_bytes.unwrap_or(0) < min_start_bytes => {
|
||||
if self.container_count % (SPACE_CHECK_INTERVAL_CONTAINERS * 10) == 0 {
|
||||
let available_mb =
|
||||
disk_stats.available_bytes.unwrap_or(0) / 1024 / 1024;
|
||||
warn!("Disk space low: {}MB remaining", available_mb);
|
||||
self.notification_channel
|
||||
.send(Notification::new(
|
||||
NotificationType::Warning,
|
||||
format!("Disk space low: {}MB free", available_mb),
|
||||
Some(Duration::from_secs(30)),
|
||||
))
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to check disk space: {e}");
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = qmdl_writer.write_container(&container).await {
|
||||
error!("failed to write to QMDL (disk full?): {e}");
|
||||
self.stop(qmdl_store).await;
|
||||
return;
|
||||
}
|
||||
debug!(
|
||||
"total QMDL bytes written: {}, updating manifest...",
|
||||
qmdl_writer.total_written
|
||||
@@ -194,15 +310,22 @@ impl DiagTask {
|
||||
let index = qmdl_store
|
||||
.current_entry
|
||||
.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
|
||||
qmdl_store
|
||||
if let Err(e) = qmdl_store
|
||||
.update_entry_qmdl_size(index, qmdl_writer.total_written)
|
||||
.await
|
||||
.expect("failed to update qmdl file size");
|
||||
{
|
||||
error!("failed to update manifest (disk full?): {e}");
|
||||
self.stop(qmdl_store).await;
|
||||
return;
|
||||
}
|
||||
debug!("done!");
|
||||
let max_type = analysis_writer
|
||||
.analyze(container)
|
||||
.await
|
||||
.expect("failed to analyze container");
|
||||
let max_type = match analysis_writer.analyze(container).await {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
warn!("failed to analyze container: {e}");
|
||||
EventType::Informational
|
||||
}
|
||||
};
|
||||
|
||||
if max_type > EventType::Informational {
|
||||
info!("a heuristic triggered on this run!");
|
||||
@@ -244,10 +367,12 @@ pub fn run_diag_read_thread(
|
||||
analysis_sender: Sender<AnalysisCtrlMessage>,
|
||||
analyzer_config: AnalyzerConfig,
|
||||
notification_channel: tokio::sync::mpsc::Sender<Notification>,
|
||||
min_space_to_start_mb: u64,
|
||||
min_space_to_continue_mb: u64,
|
||||
) {
|
||||
task_tracker.spawn(async move {
|
||||
let mut diag_stream = pin!(dev.as_stream().into_stream());
|
||||
let mut diag_task = DiagTask::new(ui_update_sender, analysis_sender, analyzer_config, notification_channel);
|
||||
let mut diag_task = DiagTask::new(ui_update_sender, analysis_sender, analyzer_config, notification_channel, min_space_to_start_mb, min_space_to_continue_mb);
|
||||
qmdl_file_tx
|
||||
.send(DiagDeviceCtrlMessage::StartRecording)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user