From 775cbcda1ee12c7592afe62308fdbb2f0ae07b8a Mon Sep 17 00:00:00 2001 From: Will Greenberg Date: Tue, 13 Feb 2024 20:23:02 -0800 Subject: [PATCH] Transition to async I/O for most things Mixing async and sync I/O leads to a multitude of complications, and generally speaking it's much more convenient to stick to one paradigm or the other. Since axum (and many other HTTP servers) use async, and since async is a convenient model for performing operations like "handle an MPSC message or file read, whichever happens first", let's commit to an async interface. --- Cargo.lock | 338 +++++++++++++++++++++++------------------ bin/Cargo.toml | 2 + bin/src/diag.rs | 119 +++++++-------- bin/src/error.rs | 2 - bin/src/main.rs | 7 +- bin/src/pcap.rs | 113 +++++--------- lib/Cargo.toml | 5 +- lib/src/diag.rs | 145 ++++++++++++++++++ lib/src/diag_device.rs | 113 +++++++------- lib/src/diag_reader.rs | 223 --------------------------- lib/src/hdlc.rs | 4 +- lib/src/lib.rs | 1 - lib/src/pcap.rs | 28 ++-- lib/src/qmdl.rs | 79 +++++----- 14 files changed, 550 insertions(+), 629 deletions(-) delete mode 100644 lib/src/diag_reader.rs diff --git a/Cargo.lock b/Cargo.lock index af42c7d..c3a5591 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "ahash" -version = "0.8.7" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" +checksum = "d713b3834d76b85304d4d525563c1276e2e30dc97cc67bfb4585a4a29fc2c89f" dependencies = [ "cfg-if", "once_cell", @@ -61,9 +61,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.11" +version = "0.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" +checksum = "96b09b5178381e0874812a9b157f7fe84982617e48f71f4e3235482775e5b540" dependencies = [ "anstyle", "anstyle-parse", @@ -159,7 +159,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] [[package]] @@ -244,12 +244,6 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "bitflags" -version = "2.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" - [[package]] name = "bitvec" version = "1.0.1" @@ -265,9 +259,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.14.0" +version = "3.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +checksum = "8ea184aa71bb362a1157c896979544cc23974e08fd265f29ea96b59f0b4a555b" [[package]] name = "byteorder" @@ -292,12 +286,9 @@ checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" [[package]] name = "cc" -version = "1.0.83" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" -dependencies = [ - "libc", -] +checksum = "7f9fa1897e4325be0d68d48df6aa1a71ac2ed4d27723887e7754192705350730" [[package]] name = "cfg-if" @@ -307,9 +298,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.33" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", @@ -317,14 +308,14 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.0", + "windows-targets 0.52.3", ] [[package]] name = "clap" -version = "4.5.0" +version = "4.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80c21025abd42669a92efc996ef13cfb2c5c627858421ea58d5c3b331a6c134f" +checksum = "c918d541ef2913577a0f9566e9ce27cb35b6df072075769e0b26cb5a554520da" dependencies = [ "clap_builder", "clap_derive", @@ -332,9 +323,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.0" +version = "4.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "458bf1f341769dfcf849846f65dffdf9146daa56bcd2a47cb4e1de9915567c99" +checksum = "9f3e7391dad68afb0c2ede1bf619f579a3dc9c2ec67f089baa397123a2f3d1eb" dependencies = [ "anstream", "anstyle", @@ -351,7 +342,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] [[package]] @@ -476,16 +467,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" -[[package]] -name = "errno" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" -dependencies = [ - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "fnv" version = "1.0.7" @@ -513,6 +494,21 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -520,6 +516,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -528,6 +525,23 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + [[package]] name = "futures-macro" version = "0.3.30" @@ -536,7 +550,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] [[package]] @@ -557,9 +571,13 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -608,9 +626,9 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermit-abi" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" +checksum = "bd5256b483761cd23699d0da46cc6fd2ee3be420bbe6d020ae4a091e70b7e9fd" [[package]] name = "http" @@ -666,9 +684,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a" dependencies = [ "bytes", "futures-channel", @@ -680,6 +698,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", + "smallvec", "tokio", ] @@ -701,9 +720,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.59" +version = "0.1.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -749,9 +768,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.2" +version = "2.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" +checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177" dependencies = [ "equivalent", "hashbrown", @@ -759,12 +778,12 @@ dependencies = [ [[package]] name = "is-terminal" -version = "0.4.10" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" dependencies = [ "hermit-abi", - "rustix", + "libc", "windows-sys 0.52.0", ] @@ -776,9 +795,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" +checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" dependencies = [ "wasm-bindgen", ] @@ -807,12 +826,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "linux-raw-sys" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" - [[package]] name = "lock_api" version = "0.4.11" @@ -859,9 +872,9 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" dependencies = [ "adler", ] @@ -879,9 +892,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" dependencies = [ "autocfg", ] @@ -945,6 +958,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "pcap-file-tokio" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ee4f08e375f9aabbb17f4c031a2f0af1397835ce8d7909b167ada1dd8b572e6" +dependencies = [ + "async-trait", + "byteorder", + "derive-into-owned", + "pcap-file", + "thiserror", + "tokio", + "tokio-byteorder", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -968,7 +996,7 @@ checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] [[package]] @@ -985,9 +1013,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" [[package]] name = "proc-macro-crate" @@ -1060,11 +1088,14 @@ dependencies = [ "crc", "deku", "env_logger", + "futures", + "futures-core", "libc", "log", - "pcap-file", + "pcap-file-tokio", "telcom-parser", "thiserror", + "tokio", ] [[package]] @@ -1074,6 +1105,7 @@ dependencies = [ "axum", "chrono", "env_logger", + "futures", "futures-core", "futures-macro", "include_dir", @@ -1084,6 +1116,7 @@ dependencies = [ "tempdir", "thiserror", "tokio", + "tokio-stream", "tokio-util", "toml", ] @@ -1103,7 +1136,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" dependencies = [ - "bitflags 1.3.2", + "bitflags", ] [[package]] @@ -1164,19 +1197,6 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" -[[package]] -name = "rustix" -version = "0.38.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" -dependencies = [ - "bitflags 2.4.2", - "errno", - "libc", - "linux-raw-sys", - "windows-sys 0.52.0", -] - [[package]] name = "rustversion" version = "1.0.14" @@ -1185,9 +1205,9 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "ryu" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" +checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "scopeguard" @@ -1197,29 +1217,29 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] [[package]] name = "serde_json" -version = "1.0.113" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" +checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ "itoa", "ryu", @@ -1323,9 +1343,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.48" +version = "2.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +checksum = "74f1bdc9872430ce9b75da68329d1c1746faf50ffac5f19e02b71e37ff881ffb" dependencies = [ "proc-macro2", "quote", @@ -1378,29 +1398,29 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] [[package]] name = "tokio" -version = "1.35.1" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -1415,6 +1435,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-byteorder" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cf347e8ae1d1ffd16c8aed569172a71bd81098a001d0f4964d476c0097aba4a" +dependencies = [ + "byteorder", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -1423,7 +1453,18 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", +] + +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", ] [[package]] @@ -1444,14 +1485,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6a4b9e8023eb94392d3dca65d717c53abc5dad49c07cb65bb8fcd87115fa325" +checksum = "9a9aad4a3066010876e8dcf5a8a06e70a558751117a145c6ce2b82c2e2054290" dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.21.1", + "toml_edit 0.22.6", ] [[package]] @@ -1471,20 +1512,20 @@ checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ "indexmap", "toml_datetime", - "winnow", + "winnow 0.5.40", ] [[package]] name = "toml_edit" -version = "0.21.1" +version = "0.22.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +checksum = "2c1b5fd4128cc8d3e0cb74d4ed9a9cc7c7284becd4df68f5f940e1ad123606f6" dependencies = [ "indexmap", "serde", "serde_spanned", "toml_datetime", - "winnow", + "winnow 0.6.2", ] [[package]] @@ -1582,9 +1623,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" +checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1592,24 +1633,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" +checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" +checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1617,22 +1658,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" +checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" +checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" [[package]] name = "winapi" @@ -1671,7 +1712,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.3", ] [[package]] @@ -1689,7 +1730,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.3", ] [[package]] @@ -1709,17 +1750,17 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +checksum = "d380ba1dc7187569a8a9e91ed34b8ccfc33123bbacb8c0aed2d1ad7f3ef2dc5f" dependencies = [ - "windows_aarch64_gnullvm 0.52.0", - "windows_aarch64_msvc 0.52.0", - "windows_i686_gnu 0.52.0", - "windows_i686_msvc 0.52.0", - "windows_x86_64_gnu 0.52.0", - "windows_x86_64_gnullvm 0.52.0", - "windows_x86_64_msvc 0.52.0", + "windows_aarch64_gnullvm 0.52.3", + "windows_aarch64_msvc 0.52.3", + "windows_i686_gnu 0.52.3", + "windows_i686_msvc 0.52.3", + "windows_x86_64_gnu 0.52.3", + "windows_x86_64_gnullvm 0.52.3", + "windows_x86_64_msvc 0.52.3", ] [[package]] @@ -1730,9 +1771,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" +checksum = "68e5dcfb9413f53afd9c8f86e56a7b4d86d9a2fa26090ea2dc9e40fba56c6ec6" [[package]] name = "windows_aarch64_msvc" @@ -1742,9 +1783,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" +checksum = "8dab469ebbc45798319e69eebf92308e541ce46760b49b18c6b3fe5e8965b30f" [[package]] name = "windows_i686_gnu" @@ -1754,9 +1795,9 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" +checksum = "2a4e9b6a7cac734a8b4138a4e1044eac3404d8326b6c0f939276560687a033fb" [[package]] name = "windows_i686_msvc" @@ -1766,9 +1807,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" +checksum = "28b0ec9c422ca95ff34a78755cfa6ad4a51371da2a5ace67500cf7ca5f232c58" [[package]] name = "windows_x86_64_gnu" @@ -1778,9 +1819,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" +checksum = "704131571ba93e89d7cd43482277d6632589b18ecf4468f591fbae0a8b101614" [[package]] name = "windows_x86_64_gnullvm" @@ -1790,9 +1831,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" +checksum = "42079295511643151e98d61c38c0acc444e52dd42ab456f7ccfd5152e8ecf21c" [[package]] name = "windows_x86_64_msvc" @@ -1802,15 +1843,24 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +checksum = "0770833d60a970638e989b3fa9fd2bb1aaadcf88963d1659fd7d9990196ed2d6" [[package]] name = "winnow" -version = "0.5.36" +version = "0.5.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "818ce546a11a9986bc24f93d0cdf38a8a1a400f1473ea8c82e59f6e0ffab9249" +checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" +dependencies = [ + "memchr", +] + +[[package]] +name = "winnow" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a4191c47f15cc3ec71fcb4913cb83d58def65dd3787610213c649283b5ce178" dependencies = [ "memchr", ] @@ -1841,5 +1891,5 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] diff --git a/bin/Cargo.toml b/bin/Cargo.toml index 5a58814..334f2f9 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -23,3 +23,5 @@ include_dir = "0.7.3" mime_guess = "2.0.4" tempdir = "0.3.7" chrono = { version = "0.4.31", features = ["serde"] } +tokio-stream = "0.1.14" +futures = "0.3.30" diff --git a/bin/src/diag.rs b/bin/src/diag.rs index f318465..4c2e43e 100644 --- a/bin/src/diag.rs +++ b/bin/src/diag.rs @@ -1,87 +1,78 @@ +use std::pin::pin; use std::sync::Arc; use axum::extract::State; use axum::http::StatusCode; +use rayhunter::diag::DataType; use rayhunter::diag_device::DiagDevice; -use rayhunter::diag_reader::DiagReader; use tokio::sync::RwLock; -use tokio::sync::mpsc::{Receiver, self}; +use tokio::sync::mpsc::Receiver; use rayhunter::qmdl::QmdlWriter; -use log::{debug, info}; -use tokio::sync::mpsc::error::TryRecvError; -use tokio::task::JoinHandle; +use log::{debug, error, info}; +use tokio::fs::File; use tokio_util::task::TaskTracker; +use futures::{StreamExt, TryStreamExt}; -use crate::error::RayhunterError; use crate::qmdl_store::QmdlStore; use crate::server::ServerState; pub enum DiagDeviceCtrlMessage { StopRecording, - StartRecording(QmdlWriter), + StartRecording(QmdlWriter), Exit, } -pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut qmdl_file_rx: Receiver, qmdl_store_lock: Arc>) -> JoinHandle> { - // mpsc channel for updating QmdlStore entry filesizes. First usize is the - // index, second is the size in bytes - let (tx, mut rx) = mpsc::channel::<(usize, usize)>(1); - - // Spawn a thread to monitor the (usize, usize) channel for updates, - // triggering QmdlStore updates - let qmdl_store_lock_clone = qmdl_store_lock.clone(); +pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut qmdl_file_rx: Receiver, qmdl_store_lock: Arc>) { task_tracker.spawn(async move { - while let Some((entry_idx, new_size)) = rx.recv().await { - let mut qmdl_store = qmdl_store_lock_clone.write().await; - qmdl_store.update_entry(entry_idx, new_size).await - .expect("failed to update qmdl file size"); - } - info!("QMDL store size updater thread exiting..."); - }); - - // Spawn a thread to drive the DiagDevice reading loop. Since DiagDevice - // works via synchronous I/O, we have to spawn a "blocking" thread to avoid - // gumming up tokio's event loop. - task_tracker.spawn_blocking(move || { + let initial_file = qmdl_store_lock.write().await.new_entry().await.expect("failed creating QMDL file entry"); + let mut qmdl_writer: Option> = Some(QmdlWriter::new(initial_file)); + let mut diag_stream = pin!(dev.as_stream().into_stream()); loop { - // First check if we've gotten any control meesages - match qmdl_file_rx.try_recv() { - Ok(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)) => { - dev.qmdl_writer = Some(qmdl_writer); - }, - Ok(DiagDeviceCtrlMessage::StopRecording) => dev.qmdl_writer = None, - // Disconnected means all the Senders have been dropped, so it's - // time to go - Ok(DiagDeviceCtrlMessage::Exit) | Err(TryRecvError::Disconnected) => { - info!("Diag reader thread exiting..."); - return Ok(()) - }, - // empty just means there's no message for us, so continue as normal - Err(TryRecvError::Empty) => {}, - } - - // remember the QmdlStore current entry index so we can update its size later - let qmdl_store_index = qmdl_store_lock.blocking_read().current_entry; - - // TODO: once we're actually doing analysis, we'll wanna use the messages - // returned here. Until then, the DiagDevice has already written those messages - // to the QMDL file, so we can just ignore them. - debug!("reading response from diag device..."); - let _messages = dev.read_response().map_err(RayhunterError::DiagReadError)?; - debug!("got diag response ({} messages)", _messages.len()); - - // keep track of how many bytes were written to the QMDL file so we can read - // a valid block of data from it in the HTTP server - if let Some(qmdl_writer) = dev.qmdl_writer.as_ref() { - debug!("total QMDL bytes written: {}, sending update...", qmdl_writer.total_written); - let index = qmdl_store_index.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); - tx.blocking_send((index, qmdl_writer.total_written)).unwrap(); - debug!("done!"); - } else { - debug!("no qmdl_writer set, continuing..."); + tokio::select! { + msg = qmdl_file_rx.recv() => { + match msg { + Some(DiagDeviceCtrlMessage::StartRecording(new_writer)) => { + qmdl_writer = Some(new_writer); + }, + Some(DiagDeviceCtrlMessage::StopRecording) => qmdl_writer = None, + // None means all the Senders have been dropped, so it's + // time to go + Some(DiagDeviceCtrlMessage::Exit) | None => { + info!("Diag reader thread exiting..."); + return Ok(()) + }, + } + } + maybe_container = diag_stream.next() => { + match maybe_container.unwrap() { + Ok(container) => { + if container.data_type != DataType::UserSpace { + debug!("skipping non-userspace diag messages..."); + continue; + } + // keep track of how many bytes were written to the QMDL file so we can read + // a valid block of data from it in the HTTP server + if let Some(writer) = qmdl_writer.as_mut() { + writer.write_container(&container).await.expect("failed to write to QMDL writer"); + debug!("total QMDL bytes written: {}, updating manifest...", writer.total_written); + let mut qmdl_store = qmdl_store_lock.write().await; + let index = qmdl_store.current_entry.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); + qmdl_store.update_entry(index, writer.total_written).await + .expect("failed to update qmdl file size"); + debug!("done!"); + } else { + debug!("no qmdl_writer set, continuing..."); + } + }, + Err(err) => { + error!("error reading diag device: {}", err); + return Err(err); + } + } + } } } - }) + }); } pub async fn start_recording(State(state): State>) -> Result<(StatusCode, String), (StatusCode, String)> { @@ -91,7 +82,7 @@ pub async fn start_recording(State(state): State>) -> Result<(S let mut qmdl_store = state.qmdl_store_lock.write().await; let qmdl_file = qmdl_store.new_entry().await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't create new qmdl entry: {}", e)))?; - let qmdl_writer = QmdlWriter::new(qmdl_file.into_std().await); + let qmdl_writer = QmdlWriter::new(qmdl_file); state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)).await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?; Ok((StatusCode::ACCEPTED, format!("ok"))) diff --git a/bin/src/error.rs b/bin/src/error.rs index 78d15a8..f7a3470 100644 --- a/bin/src/error.rs +++ b/bin/src/error.rs @@ -9,8 +9,6 @@ pub enum RayhunterError{ ConfigFileParsingError(#[from] toml::de::Error), #[error("Diag intialization error: {0}")] DiagInitError(DiagDeviceError), - #[error("Diag read error: {0}")] - DiagReadError(DiagDeviceError), #[error("Tokio error: {0}")] TokioError(#[from] tokio::io::Error), #[error("QmdlStore error: {0}")] diff --git a/bin/src/main.rs b/bin/src/main.rs index 55fab69..892e914 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -20,7 +20,6 @@ use log::{info, error}; use rayhunter::diag_device::DiagDevice; use axum::routing::{get, post}; use axum::Router; -use rayhunter::qmdl::QmdlWriter; use stats::get_qmdl_manifest; use tokio::sync::mpsc::{self, Sender}; use tokio::task::JoinHandle; @@ -127,11 +126,9 @@ async fn main() -> Result<(), RayhunterError> { let qmdl_store_lock = Arc::new(RwLock::new(init_qmdl_store(&config).await?)); let (tx, rx) = mpsc::channel::(1); if !config.readonly_mode { - let qmdl_file = qmdl_store_lock.write().await.new_entry().await?; - let qmdl_writer = QmdlWriter::new(qmdl_file.into_std().await); - let mut dev = DiagDevice::new(Some(qmdl_writer)) + let mut dev = DiagDevice::new().await .map_err(RayhunterError::DiagInitError)?; - dev.config_logs() + dev.config_logs().await .map_err(RayhunterError::DiagInitError)?; run_diag_read_thread(&task_tracker, dev, rx, qmdl_store_lock.clone()); diff --git a/bin/src/pcap.rs b/bin/src/pcap.rs index 7694c39..5dadd24 100644 --- a/bin/src/pcap.rs +++ b/bin/src/pcap.rs @@ -1,26 +1,24 @@ use crate::ServerState; +use rayhunter::diag::DataType; use rayhunter::gsmtap_parser::GsmtapParser; use rayhunter::pcap::GsmtapPcapWriter; -use rayhunter::qmdl::{QmdlReader, QmdlReaderError}; -use rayhunter::diag_reader::DiagReader; +use rayhunter::qmdl::QmdlReader; use axum::body::Body; use axum::http::header::CONTENT_TYPE; use axum::extract::{State, Path}; use axum::http::StatusCode; use axum::response::{Response, IntoResponse}; -use std::io::Write; -use std::pin::Pin; +use tokio::io::duplex; +use tokio_util::io::ReaderStream; +use std::{future, pin::pin}; use std::sync::Arc; -use std::task::{Poll, Context}; -use futures_core::Stream; use log::error; -use tokio::sync::mpsc; +use futures::TryStreamExt; // Streams a pcap file chunk-by-chunk to the client by reading the QMDL data -// written so far. This is done by spawning a blocking thread (a tokio thread -// capable of handling blocking operations) which streams chunks of pcap data to -// a channel that's piped to the client. +// written so far. This is done by spawning a thread which streams chunks of +// pcap data to a channel that's piped to the client. pub async fn get_pcap(State(state): State>, Path(qmdl_name): Path) -> Result { let qmdl_store = state.qmdl_store_lock.read().await; let entry = qmdl_store.entry_for_name(&qmdl_name) @@ -31,82 +29,39 @@ pub async fn get_pcap(State(state): State>, Path(qmdl_name): Pa "QMDL file is empty, try again in a bit!".to_string() )); } + let qmdl_file = qmdl_store.open_entry(&entry).await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))? - .into_std().await; + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?; + // the QMDL reader should stop at the last successfully written data chunk + // (entry.size_bytes) + let mut gsmtap_parser = GsmtapParser::new(); + let (reader, writer) = duplex(1024); + let mut pcap_writer = GsmtapPcapWriter::new(writer).await.unwrap(); + pcap_writer.write_iface_header().await.unwrap(); - let (tx, rx) = mpsc::channel(1); - let channel_reader = ChannelReader { rx }; - let channel_writer = ChannelWriter { tx }; - tokio::task::spawn_blocking(move || { - // the QMDL reader should stop at the last successfully written data - // chunk (qmdl_bytes_written) - let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(entry.size_bytes)); - - let mut gsmtap_parser = GsmtapParser::new(); - let mut pcap_writer = GsmtapPcapWriter::new(channel_writer).unwrap(); - pcap_writer.write_iface_header().unwrap(); - loop { - match qmdl_reader.read_response() { - Ok(messages) => { - for maybe_msg in messages { - match maybe_msg { - Ok(msg) => { - let maybe_gsmtap_msg = gsmtap_parser.recv_message(msg) - .expect("error parsing gsmtap message"); - if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg { - pcap_writer.write_gsmtap_message(gsmtap_msg, timestamp) - .expect("error writing pcap packet"); - } - }, - Err(e) => error!("error parsing message: {:?}", e), + tokio::spawn(async move { + let mut reader = QmdlReader::new(qmdl_file, Some(entry.size_bytes)); + let mut messages_stream = pin!(reader.as_stream() + .try_filter(|container| future::ready(container.data_type == DataType::UserSpace))); + + while let Some(container) = messages_stream.try_next().await.expect("failed getting QMDL container") { + for maybe_msg in container.into_messages() { + match maybe_msg { + Ok(msg) => { + let maybe_gsmtap_msg = gsmtap_parser.recv_message(msg) + .expect("error parsing gsmtap message"); + if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg { + pcap_writer.write_gsmtap_message(gsmtap_msg, timestamp).await + .expect("error writing pcap packet"); } - } - }, - // this is expected, and just means we've reached the end of the - // safely written QMDL data - Err(QmdlReaderError::MaxBytesReached(_)) => break, - Err(e) => { - error!("error reading qmdl file: {:?}", e); - break; - }, + }, + Err(e) => error!("error parsing message: {:?}", e), + } } } }); let headers = [(CONTENT_TYPE, "application/vnd.tcpdump.pcap")]; - let body = Body::from_stream(channel_reader); + let body = Body::from_stream(ReaderStream::new(reader)); Ok((headers, body).into_response()) } - -struct ChannelWriter { - tx: mpsc::Sender>, -} - -impl Write for ChannelWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.tx.blocking_send(buf.to_vec()) - .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "channel closed"))?; - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -struct ChannelReader { - rx: mpsc::Receiver>, -} - -impl Stream for ChannelReader { - type Item = Result, String>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.rx.poll_recv(cx) { - Poll::Ready(Some(msg)) => Poll::Ready(Some(Ok(msg))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -} diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 94a30ee..88b72ac 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -12,6 +12,9 @@ deku = { version = "0.16.0", features = ["logging"] } env_logger = "0.10.1" libc = "0.2.150" log = "0.4.20" -pcap-file = "2.0.0" +pcap-file-tokio = "0.1.0" thiserror = "1.0.50" telcom-parser = { path = "../telcom-parser" } +tokio = { version = "1.35.1", features = ["full"] } +futures-core = "0.3.30" +futures = "0.3.30" diff --git a/lib/src/diag.rs b/lib/src/diag.rs index 786476d..51c7aec 100644 --- a/lib/src/diag.rs +++ b/lib/src/diag.rs @@ -1,8 +1,13 @@ //! Diag protocol serialization/deserialization use chrono::{DateTime, FixedOffset}; +use crc::{Algorithm, Crc}; use deku::prelude::*; +use crate::hdlc::{self, hdlc_decapsulate}; +use log::{warn, error}; +use thiserror::Error; + pub const MESSAGE_TERMINATOR: u8 = 0x7e; pub const MESSAGE_ESCAPE_CHAR: u8 = 0x7d; @@ -49,6 +54,28 @@ pub enum DataType { Other(u32), } +#[derive(Debug, Clone, PartialEq, Error)] +pub enum DiagParsingError { + #[error("Failed to parse Message: {0}, data: {1:?}")] + MessageParsingError(deku::DekuError, Vec), + #[error("HDLC decapsulation of message failed: {0}, data: {1:?}")] + HdlcDecapsulationError(hdlc::HdlcError, Vec), +} + +// this is sorta based on the params qcsuper uses, plus what seems to be used in +// https://github.com/fgsect/scat/blob/f1538b397721df3ab8ba12acd26716abcf21f78b/util.py#L47 +pub const CRC_CCITT_ALG: Algorithm = Algorithm { + poly: 0x1021, + init: 0xffff, + refin: true, + refout: true, + width: 16, + xorout: 0xffff, + check: 0x2189, + residue: 0x0000, +}; + +pub const CRC_CCITT: Crc = Crc::::new(&CRC_CCITT_ALG); #[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)] pub struct MessagesContainer { pub data_type: DataType, @@ -57,6 +84,29 @@ pub struct MessagesContainer { pub messages: Vec, } +impl MessagesContainer { + pub fn into_messages(self) -> Vec> { + let mut result = Vec::new(); + for msg in self.messages { + for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) { + match hdlc_decapsulate(sub_msg, &CRC_CCITT) { + Ok(data) => match Message::from_bytes((&data, 0)) { + Ok(((leftover_bytes, _), res)) => { + if !leftover_bytes.is_empty() { + warn!("warning: {} leftover bytes when parsing Message", leftover_bytes.len()); + } + result.push(Ok(res)); + }, + Err(e) => result.push(Err(DiagParsingError::MessageParsingError(e, data))), + }, + Err(err) => result.push(Err(DiagParsingError::HdlcDecapsulationError(err, sub_msg.to_vec()))), + } + } + } + result + } +} + #[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)] pub struct HdlcEncapsulatedMessage { pub len: u32, @@ -431,4 +481,99 @@ mod test { }, }); } + + fn make_container(data_type: DataType, message: HdlcEncapsulatedMessage) -> MessagesContainer { + MessagesContainer { + data_type, + num_messages: 1, + messages: vec![message], + } + } + + // this log is based on one captured on a real device -- if it fails to + // serialize or deserialize, that's probably a problem with this mock, not + // the DiagReader implementation + fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) { + let length_with_payload = 31 + payload.len() as u16; + let message = Message::Log { + pending_msgs: 0, + outer_length: length_with_payload, + inner_length: length_with_payload, + log_type: 0xb0c0, + timestamp: Timestamp { ts: 72659535985485082 }, + body: LogBody::LteRrcOtaMessage { + ext_header_version: 20, + packet: LteRrcOtaPacket::V8 { + rrc_rel_maj: 14, + rrc_rel_min: 48, + bearer_id: 0, + phy_cell_id: 160, + earfcn: 2050, + sfn_subfn: 4057, + pdu_num: 5, + sib_mask: 0, + len: payload.len() as u16, + packet: payload.to_vec(), + }, + }, + }; + let serialized = message.to_bytes().expect("failed to serialize test message"); + let encapsulated_data = hdlc::hdlc_encapsulate(&serialized, &CRC_CCITT); + let encapsulated = HdlcEncapsulatedMessage { + len: encapsulated_data.len() as u32, + data: encapsulated_data, + }; + (encapsulated, message) + } + + #[test] + fn test_containers_with_multiple_messages() { + let (encapsulated1, message1) = get_test_message(&[1]); + let (encapsulated2, message2) = get_test_message(&[2]); + let mut container = make_container(DataType::UserSpace, encapsulated1); + container.messages.push(encapsulated2); + container.num_messages += 1; + assert_eq!(container.into_messages(), vec![Ok(message1), Ok(message2)]); + } + + #[test] + fn test_containers_with_concatenated_message() { + let (mut encapsulated1, message1) = get_test_message(&[1]); + let (encapsulated2, message2) = get_test_message(&[2]); + encapsulated1.data.extend(encapsulated2.data); + encapsulated1.len += encapsulated2.len; + let container = make_container(DataType::UserSpace, encapsulated1); + assert_eq!(container.into_messages(), vec![Ok(message1), Ok(message2)]); + } + + #[test] + fn test_handles_parsing_errors() { + let (encapsulated1, message1) = get_test_message(&[1]); + let bad_message = hdlc::hdlc_encapsulate(&[0x01, 0x02, 0x03, 0x04], &CRC_CCITT); + let encapsulated2 = HdlcEncapsulatedMessage { + len: bad_message.len() as u32, + data: bad_message, + }; + let mut container = make_container(DataType::UserSpace, encapsulated1); + container.messages.push(encapsulated2); + container.num_messages += 1; + let result = container.into_messages(); + assert_eq!(result[0], Ok(message1)); + assert!(matches!(result[1], Err(DiagParsingError::MessageParsingError(_, _)))); + } + + #[test] + fn test_handles_encapsulation_errors() { + let (encapsulated1, message1) = get_test_message(&[1]); + let bad_encapsulation = HdlcEncapsulatedMessage { + len: 4, + data: vec![0x01, 0x02, 0x03, 0x04], + }; + let mut container = make_container(DataType::UserSpace, encapsulated1); + container.messages.push(bad_encapsulation); + container.num_messages += 1; + let result = container.into_messages(); + assert_eq!(result[0], Ok(message1)); + assert!(matches!(result[1], Err(DiagParsingError::HdlcDecapsulationError(_, _)))); + } } diff --git a/lib/src/diag_device.rs b/lib/src/diag_device.rs index a9d5a31..3737379 100644 --- a/lib/src/diag_device.rs +++ b/lib/src/diag_device.rs @@ -1,15 +1,15 @@ use crate::hdlc::hdlc_encapsulate; -use crate::diag::{Message, ResponsePayload, Request, LogConfigRequest, LogConfigResponse, build_log_mask_request, RequestContainer, DataType, MessagesContainer}; -use crate::diag_reader::{DiagReader, CRC_CCITT}; -use crate::qmdl::QmdlWriter; +use crate::diag::{build_log_mask_request, DataType, DiagParsingError, LogConfigRequest, LogConfigResponse, Message, MessagesContainer, Request, RequestContainer, ResponsePayload, CRC_CCITT}; use crate::log_codes; -use std::fs::File; -use std::io::Read; +use std::io::ErrorKind; use std::os::fd::AsRawFd; +use futures_core::TryStream; use thiserror::Error; use log::{info, warn, error}; use deku::prelude::*; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; pub type DiagResult = Result; @@ -20,7 +20,7 @@ pub enum DiagDeviceError { #[error("Failed to read diag device: {0}")] DeviceReadFailed(std::io::Error), #[error("Failed to write diag device: {0}")] - DeviceWriteFailed(String), + DeviceWriteFailed(std::io::Error), #[error("Nonzero status code {0} for diag request: {1:?}")] RequestFailed(u32, Request), #[error("Didn't receive response for request: {0:?}")] @@ -71,43 +71,18 @@ const DIAG_IOCTL_SWITCH_LOGGING: u64 = 7; pub struct DiagDevice { file: File, - pub qmdl_writer: Option>, fully_initialized: bool, read_buf: Vec, use_mdm: i32, } -impl DiagReader for DiagDevice { - type Err = DiagDeviceError; - - fn get_next_messages_container(&mut self) -> DiagResult { - let mut bytes_read = 0; - while bytes_read == 0 { - bytes_read = self.file.read(&mut self.read_buf) - .map_err(DiagDeviceError::DeviceReadFailed)?; - } - let ((leftover_bytes, _), container) = MessagesContainer::from_bytes((&self.read_buf[0..bytes_read], 0)) - .map_err(DiagDeviceError::ParseMessagesContainerError)?; - if !leftover_bytes.is_empty() { - warn!("warning: {} leftover bytes when parsing MessagesContainer", leftover_bytes.len()); - } - - if let Some(qmdl_writer) = self.qmdl_writer.as_mut() { - if self.fully_initialized { - qmdl_writer.write_container(&container) - .map_err(DiagDeviceError::QmdlFileWriteError)?; - } - } - Ok(container) - } -} - impl DiagDevice { - pub fn new(qmdl_writer: Option>) -> DiagResult { - let diag_file = std::fs::File::options() + pub async fn new() -> DiagResult { + let diag_file = File::options() .read(true) .write(true) .open("/dev/diag") + .await .map_err(DiagDeviceError::OpenDiagDeviceError)?; let fd = diag_file.as_raw_fd(); @@ -118,12 +93,32 @@ impl DiagDevice { read_buf: vec![0; BUFFER_LEN], file: diag_file, fully_initialized: false, - qmdl_writer, use_mdm, }) } - fn write_request(&mut self, req: &Request) -> DiagResult<()> { + pub fn as_stream(&mut self) -> impl TryStream + '_ { + futures::stream::try_unfold(self, |dev| async { + let container = dev.get_next_messages_container().await?; + Ok(Some((container, dev))) + }) + } + + async fn get_next_messages_container(&mut self) -> Result { + let mut bytes_read = 0; + while bytes_read == 0 { + bytes_read = self.file.read(&mut self.read_buf).await + .map_err(DiagDeviceError::DeviceReadFailed)?; + } + let ((leftover_bytes, _), container) = MessagesContainer::from_bytes((&self.read_buf[0..bytes_read], 0)) + .map_err(DiagDeviceError::ParseMessagesContainerError)?; + if !leftover_bytes.is_empty() { + warn!("warning: {} leftover bytes when parsing MessagesContainer", leftover_bytes.len()); + } + Ok(container) + } + + async fn write_request(&mut self, req: &Request) -> DiagResult<()> { let req_bytes = &req.to_bytes().expect("Failed to serialize Request"); let buf = RequestContainer { data_type: DataType::UserSpace, @@ -131,23 +126,35 @@ impl DiagDevice { mdm_field: -1, hdlc_encapsulated_request: hdlc_encapsulate(req_bytes, &CRC_CCITT), }.to_bytes().expect("Failed to serialize RequestContainer"); - unsafe { - let fd = self.file.as_raw_fd(); - let buf_ptr = buf.as_ptr() as *const libc::c_void; - let ret = libc::write(fd, buf_ptr, buf.len()); - if ret < 0 { - let msg = format!("write failed with error code {}", ret); - return Err(DiagDeviceError::DeviceWriteFailed(msg)); + if let Err(err) = self.file.write(&buf).await { + // For reasons I don't entirely understand, calls to write(2) on + // /dev/diag always return 0 bytes written, though the written + // requests end up being interpreted. As such, we're not concerned + // about WriteZero errors + if err.kind() != ErrorKind::WriteZero { + return Err(DiagDeviceError::DeviceWriteFailed(err)); } } + self.file.flush().await + .map_err(DiagDeviceError::DeviceWriteFailed)?; Ok(()) } - fn retrieve_id_ranges(&mut self) -> DiagResult<[u32; 16]> { - let req = Request::LogConfig(LogConfigRequest::RetrieveIdRanges); - self.write_request(&req)?; + async fn read_response(&mut self) -> DiagResult>> { + loop { + let container = self.get_next_messages_container().await?; + if container.data_type != DataType::UserSpace { + continue; + } + return Ok(container.into_messages()); + } + } - for msg in self.read_response()? { + async fn retrieve_id_ranges(&mut self) -> DiagResult<[u32; 16]> { + let req = Request::LogConfig(LogConfigRequest::RetrieveIdRanges); + self.write_request(&req).await?; + + for msg in self.read_response().await? { match msg { Ok(Message::Log { .. }) => info!("skipping log response..."), Ok(Message::Response { payload, status, .. }) => match payload { @@ -166,11 +173,11 @@ impl DiagDevice { Err(DiagDeviceError::NoResponse(req)) } - fn set_log_mask(&mut self, log_type: u32, log_mask_bitsize: u32) -> DiagResult<()> { + async fn set_log_mask(&mut self, log_type: u32, log_mask_bitsize: u32) -> DiagResult<()> { let req = build_log_mask_request(log_type, log_mask_bitsize, &LOG_CODES_FOR_RAW_PACKET_LOGGING); - self.write_request(&req)?; + self.write_request(&req).await?; - for msg in self.read_response()? { + for msg in self.read_response().await? { match msg { Ok(Message::Log { .. }) => info!("skipping log response..."), Ok(Message::Response { payload, status, .. }) => { @@ -188,13 +195,13 @@ impl DiagDevice { Err(DiagDeviceError::NoResponse(req)) } - pub fn config_logs(&mut self) -> DiagResult<()> { + pub async fn config_logs(&mut self) -> DiagResult<()> { info!("retrieving diag logging capabilities..."); - let log_mask_sizes = self.retrieve_id_ranges()?; + let log_mask_sizes = self.retrieve_id_ranges().await?; for (log_type, &log_mask_bitsize) in log_mask_sizes.iter().enumerate() { if log_mask_bitsize > 0 { - self.set_log_mask(log_type as u32, log_mask_bitsize)?; + self.set_log_mask(log_type as u32, log_mask_bitsize).await?; info!("enabled logging for log type {}", log_type); } } diff --git a/lib/src/diag_reader.rs b/lib/src/diag_reader.rs deleted file mode 100644 index b8ef308..0000000 --- a/lib/src/diag_reader.rs +++ /dev/null @@ -1,223 +0,0 @@ -use crate::diag; -use crate::{diag::*, hdlc::hdlc_decapsulate}; -use crate::hdlc; - -use crc::{Crc, Algorithm}; -use deku::prelude::*; -use log::{info, warn, error}; -use thiserror::Error; - -// this is sorta based on the params qcsuper uses, plus what seems to be used in -// https://github.com/fgsect/scat/blob/f1538b397721df3ab8ba12acd26716abcf21f78b/util.py#L47 -pub const CRC_CCITT_ALG: Algorithm = Algorithm { - poly: 0x1021, - init: 0xffff, - refin: true, - refout: true, - width: 16, - xorout: 0xffff, - check: 0x2189, - residue: 0x0000, -}; -pub const CRC_CCITT: Crc = Crc::::new(&CRC_CCITT_ALG); - -#[derive(Debug, PartialEq, Error)] -pub enum DiagParsingError { - #[error("Failed to parse Message: {0}, data: {1:?}")] - MessageParsingError(deku::DekuError, Vec), - #[error("HDLC decapsulation of message failed: {0}, data: {1:?}")] - HdlcDecapsulationError(hdlc::HdlcError, Vec), -} - -type MaybeMessage = Result; - -pub trait DiagReader { - type Err; - - fn get_next_messages_container(&mut self) -> Result; - - fn read_response(&mut self) -> Result, Self::Err> { - loop { - let container = self.get_next_messages_container()?; - if container.data_type == DataType::UserSpace { - return self.parse_response_container(container); - } else { - info!("skipping non-userspace message...") - } - } - } - - fn parse_response_container(&self, container: MessagesContainer) -> Result, Self::Err> { - let mut result = Vec::new(); - for msg in container.messages { - for sub_msg in msg.data.split_inclusive(|&b| b == diag::MESSAGE_TERMINATOR) { - match hdlc_decapsulate(sub_msg, &CRC_CCITT) { - Ok(data) => match Message::from_bytes((&data, 0)) { - Ok(((leftover_bytes, _), res)) => { - if !leftover_bytes.is_empty() { - warn!("warning: {} leftover bytes when parsing Message", leftover_bytes.len()); - } - result.push(Ok(res)); - }, - Err(e) => { - result.push(Err(DiagParsingError::MessageParsingError(e, data))); - }, - }, - Err(err) => { - result.push(Err(DiagParsingError::HdlcDecapsulationError(err, sub_msg.to_vec()))); - } - } - } - } - Ok(result) - } -} - -#[cfg(test)] -mod test { - use super::*; - - struct MockReader { - containers: Vec, - } - - impl DiagReader for MockReader { - type Err = (); - - fn get_next_messages_container(&mut self) -> Result { - Ok(self.containers.remove(0)) - } - } - - fn make_container(data_type: DataType, message: HdlcEncapsulatedMessage) -> MessagesContainer { - MessagesContainer { - data_type, - num_messages: 1, - messages: vec![message], - } - } - - // this log is based on one captured on a real device -- if it fails to - // serialize or deserialize, that's probably a problem with this mock, not - // the DiagReader implementation - fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) { - let length_with_payload = 31 + payload.len() as u16; - let message = Message::Log { - pending_msgs: 0, - outer_length: length_with_payload, - inner_length: length_with_payload, - log_type: 0xb0c0, - timestamp: Timestamp { ts: 72659535985485082 }, - body: LogBody::LteRrcOtaMessage { - ext_header_version: 20, - packet: LteRrcOtaPacket::V8 { - rrc_rel_maj: 14, - rrc_rel_min: 48, - bearer_id: 0, - phy_cell_id: 160, - earfcn: 2050, - sfn_subfn: 4057, - pdu_num: 5, - sib_mask: 0, - len: payload.len() as u16, - packet: payload.to_vec(), - }, - }, - }; - let serialized = message.to_bytes().expect("failed to serialize test message"); - let encapsulated_data = hdlc::hdlc_encapsulate(&serialized, &CRC_CCITT); - let encapsulated = HdlcEncapsulatedMessage { - len: encapsulated_data.len() as u32, - data: encapsulated_data, - }; - (encapsulated, message) - } - - #[test] - fn test_skipping_nonuser_containers() { - let (encapsulated1, message1) = get_test_message(&[1]); - let (encapsulated2, _) = get_test_message(&[2]); - let (encapsulated3, message3) = get_test_message(&[3]); - let mut reader = MockReader { - containers: vec![ - make_container(DataType::UserSpace, encapsulated1), - make_container(DataType::Other(0), encapsulated2), - make_container(DataType::UserSpace, encapsulated3), - ], - }; - assert_eq!(reader.read_response(), Ok(vec![Ok(message1)])); - assert_eq!(reader.read_response(), Ok(vec![Ok(message3)])); - } - - #[test] - fn test_containers_with_multiple_messages() { - let (encapsulated1, message1) = get_test_message(&[1]); - let (encapsulated2, message2) = get_test_message(&[2]); - let mut container1 = make_container(DataType::UserSpace, encapsulated1); - container1.messages.push(encapsulated2); - container1.num_messages += 1; - let (encapsulated3, message3) = get_test_message(&[3]); - let mut reader = MockReader { - containers: vec![ - container1, - make_container(DataType::UserSpace, encapsulated3), - ], - }; - assert_eq!(reader.read_response(), Ok(vec![Ok(message1), Ok(message2)])); - assert_eq!(reader.read_response(), Ok(vec![Ok(message3)])); - } - - #[test] - fn test_containers_with_concatenated_message() { - let (mut encapsulated1, message1) = get_test_message(&[1]); - let (encapsulated2, message2) = get_test_message(&[2]); - encapsulated1.data.extend(encapsulated2.data); - encapsulated1.len += encapsulated2.len; - let (encapsulated3, message3) = get_test_message(&[3]); - let mut reader = MockReader { - containers: vec![ - make_container(DataType::UserSpace, encapsulated1), - make_container(DataType::UserSpace, encapsulated3), - ], - }; - assert_eq!(reader.read_response(), Ok(vec![Ok(message1), Ok(message2)])); - assert_eq!(reader.read_response(), Ok(vec![Ok(message3)])); - } - - #[test] - fn test_handles_parsing_errors() { - let (encapsulated1, message1) = get_test_message(&[1]); - let bad_message = hdlc::hdlc_encapsulate(&[0x01, 0x02, 0x03, 0x04], &CRC_CCITT); - let encapsulated2 = HdlcEncapsulatedMessage { - len: bad_message.len() as u32, - data: bad_message, - }; - let mut container = make_container(DataType::UserSpace, encapsulated1); - container.messages.push(encapsulated2); - container.num_messages += 1; - let mut reader = MockReader { - containers: vec![container], - }; - let result = reader.read_response().unwrap(); - assert_eq!(result[0], Ok(message1)); - assert!(matches!(result[1], Err(DiagParsingError::MessageParsingError(_, _)))); - } - - #[test] - fn test_handles_encapsulation_errors() { - let (encapsulated1, message1) = get_test_message(&[1]); - let bad_encapsulation = HdlcEncapsulatedMessage { - len: 4, - data: vec![0x01, 0x02, 0x03, 0x04], - }; - let mut container = make_container(DataType::UserSpace, encapsulated1); - container.messages.push(bad_encapsulation); - container.num_messages += 1; - let mut reader = MockReader { - containers: vec![container], - }; - let result = reader.read_response().unwrap(); - assert_eq!(result[0], Ok(message1)); - assert!(matches!(result[1], Err(DiagParsingError::HdlcDecapsulationError(_, _)))); - } -} diff --git a/lib/src/hdlc.rs b/lib/src/hdlc.rs index 3c99e1b..22456bd 100644 --- a/lib/src/hdlc.rs +++ b/lib/src/hdlc.rs @@ -9,7 +9,7 @@ use thiserror::Error; use crate::diag::{MESSAGE_ESCAPE_CHAR, MESSAGE_TERMINATOR, ESCAPED_MESSAGE_ESCAPE_CHAR, ESCAPED_MESSAGE_TERMINATOR}; -#[derive(Debug, Error, PartialEq)] +#[derive(Debug, Clone, Error, PartialEq)] pub enum HdlcError { #[error("Invalid checksum (expected {0}, got {1})")] InvalidChecksum(u16, u16), @@ -89,7 +89,7 @@ mod tests { #[test] fn test_hdlc_encapsulate() { - let crc = Crc::::new(&crate::diag_reader::CRC_CCITT_ALG); + let crc = Crc::::new(&crate::diag::CRC_CCITT_ALG); let data = vec![0x01, 0x02, 0x03, 0x04]; let expected = vec![1, 2, 3, 4, 145, 57, 126]; let encapsulated = hdlc_encapsulate(&data, &crc); diff --git a/lib/src/lib.rs b/lib/src/lib.rs index dda6d21..982f406 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,7 +1,6 @@ pub mod hdlc; pub mod diag; pub mod diag_device; -pub mod diag_reader; pub mod qmdl; pub mod log_codes; pub mod gsmtap; diff --git a/lib/src/pcap.rs b/lib/src/pcap.rs index 9ae59ca..7bd5197 100644 --- a/lib/src/pcap.rs +++ b/lib/src/pcap.rs @@ -1,14 +1,14 @@ use crate::gsmtap::GsmtapMessage; use crate::diag::Timestamp; -use std::io::Write; +use tokio::io::AsyncWrite; use std::borrow::Cow; use chrono::prelude::*; use deku::prelude::*; -use pcap_file::pcapng::blocks::enhanced_packet::EnhancedPacketBlock; -use pcap_file::pcapng::blocks::interface_description::InterfaceDescriptionBlock; -use pcap_file::pcapng::PcapNgWriter; -use pcap_file::PcapError; +use pcap_file_tokio::pcapng::blocks::enhanced_packet::EnhancedPacketBlock; +use pcap_file_tokio::pcapng::blocks::interface_description::InterfaceDescriptionBlock; +use pcap_file_tokio::pcapng::PcapNgWriter; +use pcap_file_tokio::PcapError; use thiserror::Error; #[derive(Error, Debug)] @@ -23,7 +23,7 @@ pub enum GsmtapPcapError { Deku(#[from] DekuError), } -pub struct GsmtapPcapWriter where T: Write { +pub struct GsmtapPcapWriter where T: AsyncWrite { writer: PcapNgWriter, ip_id: u16, } @@ -56,23 +56,23 @@ struct UdpHeader { checksum: u16, } -impl GsmtapPcapWriter where T: Write { - pub fn new(writer: T) -> Result { - let writer = PcapNgWriter::new(writer)?; +impl GsmtapPcapWriter where T: AsyncWrite + Unpin + Send { + pub async fn new(writer: T) -> Result { + let writer = PcapNgWriter::new(writer).await?; Ok(GsmtapPcapWriter { writer, ip_id: 0 }) } - pub fn write_iface_header(&mut self) -> Result<(), GsmtapPcapError> { + pub async fn write_iface_header(&mut self) -> Result<(), GsmtapPcapError> { let interface = InterfaceDescriptionBlock { - linktype: pcap_file::DataLink::IPV4, + linktype: pcap_file_tokio::DataLink::IPV4, snaplen: 0xffff, options: vec![], }; - self.writer.write_pcapng_block(interface)?; + self.writer.write_pcapng_block(interface).await?; Ok(()) } - pub fn write_gsmtap_message(&mut self, msg: GsmtapMessage, timestamp: Timestamp) -> Result<(), GsmtapPcapError> { + pub async fn write_gsmtap_message(&mut self, msg: GsmtapMessage, timestamp: Timestamp) -> Result<(), GsmtapPcapError> { let duration = timestamp.to_datetime() .signed_duration_since(DateTime::UNIX_EPOCH) .to_std()?; @@ -113,7 +113,7 @@ impl GsmtapPcapWriter where T: Write { data: Cow::Owned(data), options: vec![], }; - self.writer.write_pcapng_block(packet)?; + self.writer.write_pcapng_block(packet).await?; self.ip_id = self.ip_id.wrapping_add(1); Ok(()) } diff --git a/lib/src/qmdl.rs b/lib/src/qmdl.rs index c9ff5d6..e6863fc 100644 --- a/lib/src/qmdl.rs +++ b/lib/src/qmdl.rs @@ -3,19 +3,18 @@ //! QmdlReader and QmdlWriter can read and write MessagesContainers to and from //! QMDL files. -use crate::diag_reader::DiagReader; use crate::diag::{MessagesContainer, MESSAGE_TERMINATOR, HdlcEncapsulatedMessage, DataType}; -use std::io::{Write, BufReader, BufRead, Read}; -use thiserror::Error; +use futures::TryStream; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, AsyncBufReadExt}; use log::error; -pub struct QmdlWriter where T: Write { +pub struct QmdlWriter where T: AsyncWrite + Unpin { writer: T, pub total_written: usize, } -impl QmdlWriter where T: Write { +impl QmdlWriter where T: AsyncWrite + Unpin { pub fn new(writer: T) -> Self { QmdlWriter::new_with_existing_size(writer, 0) } @@ -27,30 +26,22 @@ impl QmdlWriter where T: Write { } } - pub fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> { + pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> { for msg in &container.messages { - self.writer.write_all(&msg.data)?; + self.writer.write_all(&msg.data).await?; self.total_written += msg.data.len(); } Ok(()) } } -#[derive(Debug, Error)] -pub enum QmdlReaderError { - #[error("IO error: {0}")] - IoError(#[from] std::io::Error), - #[error("Reached max_bytes count {0}")] - MaxBytesReached(usize), -} - -pub struct QmdlReader where T: Read { +pub struct QmdlReader where T: AsyncRead { reader: BufReader, bytes_read: usize, max_bytes: Option, } -impl QmdlReader where T: Read { +impl QmdlReader where T: AsyncRead + Unpin { pub fn new(reader: T, max_bytes: Option) -> Self { QmdlReader { reader: BufReader::new(reader), @@ -58,23 +49,29 @@ impl QmdlReader where T: Read { max_bytes, } } -} -impl DiagReader for QmdlReader where T: Read { - type Err = QmdlReaderError; + pub fn as_stream(&mut self) -> impl TryStream + '_ { + futures::stream::try_unfold(self, |reader| async { + let maybe_container = reader.get_next_messages_container().await?; + match maybe_container { + Some(container) => Ok(Some((container, reader))), + None => Ok(None) + } + }) + } - fn get_next_messages_container(&mut self) -> Result { + async fn get_next_messages_container(&mut self) -> Result, std::io::Error> { if let Some(max_bytes) = self.max_bytes { if self.bytes_read >= max_bytes { if self.bytes_read > max_bytes { error!("warning: {} bytes read, but max_bytes was {}", self.bytes_read, max_bytes); } - return Err(QmdlReaderError::MaxBytesReached(max_bytes)); + return Ok(None); } } let mut buf = Vec::new(); - let bytes_read = self.reader.read_until(MESSAGE_TERMINATOR, &mut buf)?; + let bytes_read = self.reader.read_until(MESSAGE_TERMINATOR, &mut buf).await?; self.bytes_read += bytes_read; // Since QMDL is just a flat list of messages, we can't actually @@ -82,7 +79,7 @@ impl DiagReader for QmdlReader where T: Read { // read. So we'll just pretend that all containers had exactly one // message. As far as I know, the number of messages per container // doesn't actually affect anything, so this should be fine. - Ok(MessagesContainer { + Ok(Some(MessagesContainer { data_type: DataType::UserSpace, num_messages: 1, messages: vec![ @@ -91,7 +88,7 @@ impl DiagReader for QmdlReader where T: Read { data: buf, }, ] - }) + })) } } @@ -100,7 +97,7 @@ mod test { use std::io::Cursor; use crate::hdlc::hdlc_encapsulate; - use crate::diag_reader::CRC_CCITT; + use crate::diag::CRC_CCITT; use super::*; @@ -140,8 +137,8 @@ mod test { ] } - #[test] - fn test_unbounded_qmdl_reader() { + #[tokio::test] + async fn test_unbounded_qmdl_reader() { let mut buf = Cursor::new(get_test_message_bytes()); let mut reader = QmdlReader::new(&mut buf, None); let expected_messages = get_test_messages(); @@ -151,12 +148,12 @@ mod test { num_messages: 1, messages: vec![message], }; - assert_eq!(expected_container, reader.get_next_messages_container().unwrap()); + assert_eq!(expected_container, reader.get_next_messages_container().await.unwrap().unwrap()); } } - #[test] - fn test_bounded_qmdl_reader() { + #[tokio::test] + async fn test_bounded_qmdl_reader() { let mut buf = Cursor::new(get_test_message_bytes()); // bound the reader to the first two messages @@ -170,30 +167,30 @@ mod test { num_messages: 1, messages: vec![message], }; - assert_eq!(expected_container, reader.get_next_messages_container().unwrap()); + assert_eq!(expected_container, reader.get_next_messages_container().await.unwrap().unwrap()); } - assert!(matches!(reader.get_next_messages_container(), Err(QmdlReaderError::MaxBytesReached(_)))); + assert!(matches!(reader.get_next_messages_container().await, Ok(None))); } - #[test] - fn test_qmdl_writer() { + #[tokio::test] + async fn test_qmdl_writer() { let mut buf = Vec::new(); let mut writer = QmdlWriter::new(&mut buf); let expected_containers = get_test_containers(); for container in &expected_containers { - writer.write_container(container).unwrap(); + writer.write_container(container).await.unwrap(); } assert_eq!(writer.total_written, buf.len()); assert_eq!(buf, get_test_message_bytes()); } - #[test] - fn test_writing_and_reading() { + #[tokio::test] + async fn test_writing_and_reading() { let mut buf = Vec::new(); let mut writer = QmdlWriter::new(&mut buf); let expected_containers = get_test_containers(); for container in &expected_containers { - writer.write_container(container).unwrap(); + writer.write_container(container).await.unwrap(); } let limit = Some(buf.len()); @@ -205,8 +202,8 @@ mod test { num_messages: 1, messages: vec![message], }; - assert_eq!(expected_container, reader.get_next_messages_container().unwrap()); + assert_eq!(expected_container, reader.get_next_messages_container().await.unwrap().unwrap()); } - assert!(matches!(reader.get_next_messages_container(), Err(QmdlReaderError::MaxBytesReached(_)))); + assert!(matches!(reader.get_next_messages_container().await, Ok(None))); } }