mirror of
https://github.com/EFForg/rayhunter.git
synced 2026-06-03 11:43:34 -07:00
Fix WebDAV not uploading GPS files
When merging WebDAV and GPS features, we forgot to update the WebDAV feature to also upload the GPS files. WebDAV had hardcoded knowledge of which files exist and its own FileKind enum. Move the FileKind enum into QMDL store so that webdav can be agnostic over which files belong to a recording, so this is less likely to happen again. (This refactor was AI-assisted)
This commit is contained in:
committed by
Markus Unterwaditzer
parent
517a17db14
commit
e3e84a0185
@@ -1,3 +1,4 @@
|
|||||||
|
use std::fmt::Display;
|
||||||
use std::io::{self, ErrorKind};
|
use std::io::{self, ErrorKind};
|
||||||
use std::os::unix::fs::MetadataExt;
|
use std::os::unix::fs::MetadataExt;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
@@ -43,6 +44,40 @@ pub enum RecordingStoreError {
|
|||||||
SerializationError(#[from] serde_json::Error),
|
SerializationError(#[from] serde_json::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum FileKind {
|
||||||
|
Qmdl,
|
||||||
|
Analysis,
|
||||||
|
Gps,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FileKind {
|
||||||
|
// List of all possible physical files on disk.
|
||||||
|
pub const ALL: &'static [FileKind] = &[FileKind::Qmdl, FileKind::Analysis, FileKind::Gps];
|
||||||
|
|
||||||
|
pub fn get_filename(&self, entry_name: &str) -> String {
|
||||||
|
match self {
|
||||||
|
FileKind::Qmdl => format!("{}.qmdl", entry_name),
|
||||||
|
FileKind::Analysis => format!("{}.ndjson", entry_name),
|
||||||
|
FileKind::Gps => format!("{}-gps.ndjson", entry_name),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_filepath<P: AsRef<Path>>(&self, entry_name: &str, base_path: P) -> PathBuf {
|
||||||
|
base_path.as_ref().join(self.get_filename(entry_name))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for FileKind {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
FileKind::Qmdl => write!(f, "QMDL"),
|
||||||
|
FileKind::Analysis => write!(f, "analysis"),
|
||||||
|
FileKind::Gps => write!(f, "GPS"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct RecordingStore {
|
pub struct RecordingStore {
|
||||||
pub path: PathBuf,
|
pub path: PathBuf,
|
||||||
pub manifest: Manifest,
|
pub manifest: Manifest,
|
||||||
@@ -102,19 +137,15 @@ impl ManifestEntry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_qmdl_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
|
pub fn get_qmdl_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
|
||||||
let mut filepath = path.as_ref().join(&self.name);
|
FileKind::Qmdl.get_filepath(&self.name, path)
|
||||||
filepath.set_extension("qmdl");
|
|
||||||
filepath
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_analysis_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
|
pub fn get_analysis_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
|
||||||
let mut filepath = path.as_ref().join(&self.name);
|
FileKind::Analysis.get_filepath(&self.name, path)
|
||||||
filepath.set_extension("ndjson");
|
|
||||||
filepath
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_gps_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
|
pub fn get_gps_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
|
||||||
path.as_ref().join(format!("{}-gps.ndjson", self.name))
|
FileKind::Gps.get_filepath(&self.name, path)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -315,9 +346,19 @@ impl RecordingStore {
|
|||||||
pub async fn open_entry_gps(
|
pub async fn open_entry_gps(
|
||||||
&self,
|
&self,
|
||||||
entry_index: usize,
|
entry_index: usize,
|
||||||
|
) -> Result<Option<File>, RecordingStoreError> {
|
||||||
|
self.open_file(entry_index, FileKind::Gps).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn open_file(
|
||||||
|
&self,
|
||||||
|
entry_index: usize,
|
||||||
|
file_kind: FileKind,
|
||||||
) -> Result<Option<File>, RecordingStoreError> {
|
) -> Result<Option<File>, RecordingStoreError> {
|
||||||
let entry = &self.manifest.entries[entry_index];
|
let entry = &self.manifest.entries[entry_index];
|
||||||
match File::open(entry.get_gps_filepath(&self.path)).await {
|
let filepath = file_kind.get_filepath(&entry.name, &self.path);
|
||||||
|
|
||||||
|
match File::open(&filepath).await {
|
||||||
Ok(file) => Ok(Some(file)),
|
Ok(file) => Ok(Some(file)),
|
||||||
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
|
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
|
||||||
Err(e) => Err(RecordingStoreError::ReadFileError(e)),
|
Err(e) => Err(RecordingStoreError::ReadFileError(e)),
|
||||||
|
|||||||
+34
-57
@@ -1,18 +1,17 @@
|
|||||||
use std::fmt::Display;
|
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use chrono::TimeDelta;
|
use chrono::TimeDelta;
|
||||||
|
use futures::future::join_all;
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE};
|
use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE};
|
||||||
use reqwest::{Body, Client, Response};
|
use reqwest::{Body, Client, Response};
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use tokio::join;
|
|
||||||
use tokio::{select, sync::RwLock, time};
|
use tokio::{select, sync::RwLock, time};
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
use tokio_util::{sync::CancellationToken, task::TaskTracker};
|
use tokio_util::{sync::CancellationToken, task::TaskTracker};
|
||||||
|
|
||||||
use crate::config::WebdavConfig;
|
use crate::config::WebdavConfig;
|
||||||
use crate::qmdl_store::RecordingStore;
|
use crate::qmdl_store::{FileKind, RecordingStore};
|
||||||
|
|
||||||
pub struct WebdavUploadWorkerConfig {
|
pub struct WebdavUploadWorkerConfig {
|
||||||
poll_interval: Duration,
|
poll_interval: Duration,
|
||||||
@@ -38,29 +37,6 @@ impl From<WebdavConfig> for WebdavUploadWorkerConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum FileKind {
|
|
||||||
Analysis,
|
|
||||||
Qmdl,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FileKind {
|
|
||||||
fn as_extension(&self) -> &'static str {
|
|
||||||
match self {
|
|
||||||
FileKind::Analysis => ".ndjson",
|
|
||||||
FileKind::Qmdl => ".qmdl",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Display for FileKind {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
match self {
|
|
||||||
FileKind::Analysis => write!(f, "analysis"),
|
|
||||||
FileKind::Qmdl => write!(f, "QMDL"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
struct WebDavClient {
|
struct WebDavClient {
|
||||||
client: Client,
|
client: Client,
|
||||||
@@ -127,22 +103,22 @@ async fn try_upload_entry(
|
|||||||
) -> Option<()> {
|
) -> Option<()> {
|
||||||
let read_lock = store.read().await;
|
let read_lock = store.read().await;
|
||||||
let entry_idx = read_lock.entry_for_name(&entry_name)?.0;
|
let entry_idx = read_lock.entry_for_name(&entry_name)?.0;
|
||||||
let file = match file_kind {
|
let file = read_lock.open_file(entry_idx, file_kind).await;
|
||||||
FileKind::Analysis => read_lock.open_entry_analysis(entry_idx).await,
|
|
||||||
FileKind::Qmdl => read_lock.open_entry_qmdl(entry_idx).await,
|
|
||||||
};
|
|
||||||
drop(read_lock);
|
drop(read_lock);
|
||||||
|
|
||||||
let Ok(file) = file.map_err(|err| {
|
let file = match file {
|
||||||
warn!(
|
Ok(Some(f)) => f,
|
||||||
"Unable to open entry: {} {} file: {:?}",
|
Ok(None) => return Some(()), // File doesn't exist (e.g., GPS for old recordings)
|
||||||
entry_name, file_kind, err
|
Err(err) => {
|
||||||
)
|
warn!(
|
||||||
}) else {
|
"Unable to open entry: {} {} file: {:?}",
|
||||||
return None;
|
entry_name, file_kind, err
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let file_name = format!("{}{}", entry_name, file_kind.as_extension());
|
let file_name = file_kind.get_filename(&entry_name);
|
||||||
|
|
||||||
let res = select! {
|
let res = select! {
|
||||||
_ = shutdown_token.cancelled() => {
|
_ = shutdown_token.cancelled() => {
|
||||||
@@ -205,24 +181,23 @@ pub fn run_webdav_upload_worker(
|
|||||||
break;
|
break;
|
||||||
};
|
};
|
||||||
|
|
||||||
let (Some(()), Some(())) = join!(
|
let upload_futures: Vec<_> = FileKind::ALL
|
||||||
try_upload_entry(
|
.iter()
|
||||||
webdav_client.clone(),
|
.map(|&file_kind| {
|
||||||
qmdl_store_lock.clone(),
|
try_upload_entry(
|
||||||
unuploaded_entry.clone(),
|
webdav_client.clone(),
|
||||||
FileKind::Qmdl,
|
qmdl_store_lock.clone(),
|
||||||
shutdown_token.clone(),
|
unuploaded_entry.clone(),
|
||||||
),
|
file_kind,
|
||||||
try_upload_entry(
|
shutdown_token.clone(),
|
||||||
webdav_client.clone(),
|
)
|
||||||
qmdl_store_lock.clone(),
|
})
|
||||||
unuploaded_entry.clone(),
|
.collect();
|
||||||
FileKind::Analysis,
|
|
||||||
shutdown_token.clone()
|
let results = join_all(upload_futures).await;
|
||||||
),
|
if !results.iter().all(|r| r.is_some()) {
|
||||||
) else {
|
|
||||||
break;
|
break;
|
||||||
};
|
}
|
||||||
|
|
||||||
if config.delete_on_upload {
|
if config.delete_on_upload {
|
||||||
match qmdl_store_lock.write().await.delete_entry(&unuploaded_entry).await {
|
match qmdl_store_lock.write().await.delete_entry(&unuploaded_entry).await {
|
||||||
@@ -354,12 +329,14 @@ mod tests {
|
|||||||
cleanup_worker(shutdown, tracker).await;
|
cleanup_worker(shutdown, tracker).await;
|
||||||
|
|
||||||
let recorded = captured.lock().await;
|
let recorded = captured.lock().await;
|
||||||
assert_eq!(recorded.len(), 2);
|
assert_eq!(recorded.len(), 3);
|
||||||
let paths: Vec<&str> = recorded.iter().map(|r| r.path.as_str()).collect();
|
let paths: Vec<&str> = recorded.iter().map(|r| r.path.as_str()).collect();
|
||||||
let qmdl_path = format!("dav/{}.qmdl", entry_name);
|
let qmdl_path = format!("dav/{}.qmdl", entry_name);
|
||||||
let ndjson_path = format!("dav/{}.ndjson", entry_name);
|
let ndjson_path = format!("dav/{}.ndjson", entry_name);
|
||||||
|
let gps_path = format!("dav/{}-gps.ndjson", entry_name);
|
||||||
assert!(paths.contains(&qmdl_path.as_str()));
|
assert!(paths.contains(&qmdl_path.as_str()));
|
||||||
assert!(paths.contains(&ndjson_path.as_str()));
|
assert!(paths.contains(&ndjson_path.as_str()));
|
||||||
|
assert!(paths.contains(&gps_path.as_str()));
|
||||||
for put in recorded.iter() {
|
for put in recorded.iter() {
|
||||||
assert_eq!(put.auth.as_deref(), Some("Basic dXNlcjpwYXNzd29yZA=="));
|
assert_eq!(put.auth.as_deref(), Some("Basic dXNlcjpwYXNzd29yZA=="));
|
||||||
}
|
}
|
||||||
@@ -408,7 +385,7 @@ mod tests {
|
|||||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||||
cleanup_worker(shutdown, tracker).await;
|
cleanup_worker(shutdown, tracker).await;
|
||||||
|
|
||||||
assert_eq!(captured.lock().await.len(), 2);
|
assert_eq!(captured.lock().await.len(), 3);
|
||||||
|
|
||||||
let store_read = store.read().await;
|
let store_read = store.read().await;
|
||||||
assert!(store_read.entry_for_name(&entry_name).is_none());
|
assert!(store_read.entry_for_name(&entry_name).is_none());
|
||||||
|
|||||||
Reference in New Issue
Block a user