diff --git a/daemon/src/qmdl_store.rs b/daemon/src/qmdl_store.rs index bf197ec..0bb2ac4 100644 --- a/daemon/src/qmdl_store.rs +++ b/daemon/src/qmdl_store.rs @@ -1,3 +1,4 @@ +use std::fmt::Display; use std::io::{self, ErrorKind}; use std::os::unix::fs::MetadataExt; use std::path::{Path, PathBuf}; @@ -43,6 +44,40 @@ pub enum RecordingStoreError { SerializationError(#[from] serde_json::Error), } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FileKind { + Qmdl, + Analysis, + Gps, +} + +impl FileKind { + // List of all possible physical files on disk. + pub const ALL: &'static [FileKind] = &[FileKind::Qmdl, FileKind::Analysis, FileKind::Gps]; + + pub fn get_filename(&self, entry_name: &str) -> String { + match self { + FileKind::Qmdl => format!("{}.qmdl", entry_name), + FileKind::Analysis => format!("{}.ndjson", entry_name), + FileKind::Gps => format!("{}-gps.ndjson", entry_name), + } + } + + pub fn get_filepath>(&self, entry_name: &str, base_path: P) -> PathBuf { + base_path.as_ref().join(self.get_filename(entry_name)) + } +} + +impl Display for FileKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FileKind::Qmdl => write!(f, "QMDL"), + FileKind::Analysis => write!(f, "analysis"), + FileKind::Gps => write!(f, "GPS"), + } + } +} + pub struct RecordingStore { pub path: PathBuf, pub manifest: Manifest, @@ -102,19 +137,15 @@ impl ManifestEntry { } pub fn get_qmdl_filepath>(&self, path: P) -> PathBuf { - let mut filepath = path.as_ref().join(&self.name); - filepath.set_extension("qmdl"); - filepath + FileKind::Qmdl.get_filepath(&self.name, path) } pub fn get_analysis_filepath>(&self, path: P) -> PathBuf { - let mut filepath = path.as_ref().join(&self.name); - filepath.set_extension("ndjson"); - filepath + FileKind::Analysis.get_filepath(&self.name, path) } pub fn get_gps_filepath>(&self, path: P) -> PathBuf { - path.as_ref().join(format!("{}-gps.ndjson", self.name)) + FileKind::Gps.get_filepath(&self.name, path) } } @@ -315,9 +346,19 @@ impl RecordingStore { pub async fn open_entry_gps( &self, entry_index: usize, + ) -> Result, RecordingStoreError> { + self.open_file(entry_index, FileKind::Gps).await + } + + pub async fn open_file( + &self, + entry_index: usize, + file_kind: FileKind, ) -> Result, RecordingStoreError> { let entry = &self.manifest.entries[entry_index]; - match File::open(entry.get_gps_filepath(&self.path)).await { + let filepath = file_kind.get_filepath(&entry.name, &self.path); + + match File::open(&filepath).await { Ok(file) => Ok(Some(file)), Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), Err(e) => Err(RecordingStoreError::ReadFileError(e)), diff --git a/daemon/src/webdav.rs b/daemon/src/webdav.rs index a23a443..6369f9f 100644 --- a/daemon/src/webdav.rs +++ b/daemon/src/webdav.rs @@ -1,18 +1,17 @@ -use std::fmt::Display; use std::{sync::Arc, time::Duration}; use chrono::TimeDelta; +use futures::future::join_all; use log::{info, warn}; use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE}; use reqwest::{Body, Client, Response}; use tokio::fs::File; -use tokio::join; use tokio::{select, sync::RwLock, time}; use tokio_util::io::ReaderStream; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use crate::config::WebdavConfig; -use crate::qmdl_store::RecordingStore; +use crate::qmdl_store::{FileKind, RecordingStore}; pub struct WebdavUploadWorkerConfig { poll_interval: Duration, @@ -38,29 +37,6 @@ impl From for WebdavUploadWorkerConfig { } } -enum FileKind { - Analysis, - Qmdl, -} - -impl FileKind { - fn as_extension(&self) -> &'static str { - match self { - FileKind::Analysis => ".ndjson", - FileKind::Qmdl => ".qmdl", - } - } -} - -impl Display for FileKind { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - FileKind::Analysis => write!(f, "analysis"), - FileKind::Qmdl => write!(f, "QMDL"), - } - } -} - #[derive(Debug, Clone)] struct WebDavClient { client: Client, @@ -127,22 +103,22 @@ async fn try_upload_entry( ) -> Option<()> { let read_lock = store.read().await; let entry_idx = read_lock.entry_for_name(&entry_name)?.0; - let file = match file_kind { - FileKind::Analysis => read_lock.open_entry_analysis(entry_idx).await, - FileKind::Qmdl => read_lock.open_entry_qmdl(entry_idx).await, - }; + let file = read_lock.open_file(entry_idx, file_kind).await; drop(read_lock); - let Ok(file) = file.map_err(|err| { - warn!( - "Unable to open entry: {} {} file: {:?}", - entry_name, file_kind, err - ) - }) else { - return None; + let file = match file { + Ok(Some(f)) => f, + Ok(None) => return Some(()), // File doesn't exist (e.g., GPS for old recordings) + Err(err) => { + warn!( + "Unable to open entry: {} {} file: {:?}", + entry_name, file_kind, err + ); + return None; + } }; - let file_name = format!("{}{}", entry_name, file_kind.as_extension()); + let file_name = file_kind.get_filename(&entry_name); let res = select! { _ = shutdown_token.cancelled() => { @@ -205,24 +181,23 @@ pub fn run_webdav_upload_worker( break; }; - let (Some(()), Some(())) = join!( - try_upload_entry( - webdav_client.clone(), - qmdl_store_lock.clone(), - unuploaded_entry.clone(), - FileKind::Qmdl, - shutdown_token.clone(), - ), - try_upload_entry( - webdav_client.clone(), - qmdl_store_lock.clone(), - unuploaded_entry.clone(), - FileKind::Analysis, - shutdown_token.clone() - ), - ) else { + let upload_futures: Vec<_> = FileKind::ALL + .iter() + .map(|&file_kind| { + try_upload_entry( + webdav_client.clone(), + qmdl_store_lock.clone(), + unuploaded_entry.clone(), + file_kind, + shutdown_token.clone(), + ) + }) + .collect(); + + let results = join_all(upload_futures).await; + if !results.iter().all(|r| r.is_some()) { break; - }; + } if config.delete_on_upload { match qmdl_store_lock.write().await.delete_entry(&unuploaded_entry).await { @@ -354,12 +329,14 @@ mod tests { cleanup_worker(shutdown, tracker).await; let recorded = captured.lock().await; - assert_eq!(recorded.len(), 2); + assert_eq!(recorded.len(), 3); let paths: Vec<&str> = recorded.iter().map(|r| r.path.as_str()).collect(); let qmdl_path = format!("dav/{}.qmdl", entry_name); let ndjson_path = format!("dav/{}.ndjson", entry_name); + let gps_path = format!("dav/{}-gps.ndjson", entry_name); assert!(paths.contains(&qmdl_path.as_str())); assert!(paths.contains(&ndjson_path.as_str())); + assert!(paths.contains(&gps_path.as_str())); for put in recorded.iter() { assert_eq!(put.auth.as_deref(), Some("Basic dXNlcjpwYXNzd29yZA==")); } @@ -408,7 +385,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(500)).await; cleanup_worker(shutdown, tracker).await; - assert_eq!(captured.lock().await.len(), 2); + assert_eq!(captured.lock().await.len(), 3); let store_read = store.read().await; assert!(store_read.entry_for_name(&entry_name).is_none());