diff --git a/Cargo.lock b/Cargo.lock index af42c7d..c3a5591 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "ahash" -version = "0.8.7" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" +checksum = "d713b3834d76b85304d4d525563c1276e2e30dc97cc67bfb4585a4a29fc2c89f" dependencies = [ "cfg-if", "once_cell", @@ -61,9 +61,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.11" +version = "0.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" +checksum = "96b09b5178381e0874812a9b157f7fe84982617e48f71f4e3235482775e5b540" dependencies = [ "anstyle", "anstyle-parse", @@ -159,7 +159,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] [[package]] @@ -244,12 +244,6 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "bitflags" -version = "2.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" - [[package]] name = "bitvec" version = "1.0.1" @@ -265,9 +259,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.14.0" +version = "3.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +checksum = "8ea184aa71bb362a1157c896979544cc23974e08fd265f29ea96b59f0b4a555b" [[package]] name = "byteorder" @@ -292,12 +286,9 @@ checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" [[package]] name = "cc" -version = "1.0.83" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" -dependencies = [ - "libc", -] +checksum = "7f9fa1897e4325be0d68d48df6aa1a71ac2ed4d27723887e7754192705350730" [[package]] name = "cfg-if" @@ -307,9 +298,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.33" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", @@ -317,14 +308,14 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.0", + "windows-targets 0.52.3", ] [[package]] name = "clap" -version = "4.5.0" +version = "4.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80c21025abd42669a92efc996ef13cfb2c5c627858421ea58d5c3b331a6c134f" +checksum = "c918d541ef2913577a0f9566e9ce27cb35b6df072075769e0b26cb5a554520da" dependencies = [ "clap_builder", "clap_derive", @@ -332,9 +323,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.0" +version = "4.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "458bf1f341769dfcf849846f65dffdf9146daa56bcd2a47cb4e1de9915567c99" +checksum = "9f3e7391dad68afb0c2ede1bf619f579a3dc9c2ec67f089baa397123a2f3d1eb" dependencies = [ "anstream", "anstyle", @@ -351,7 +342,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] [[package]] @@ -476,16 +467,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" -[[package]] -name = "errno" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" -dependencies = [ - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "fnv" version = "1.0.7" @@ -513,6 +494,21 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -520,6 +516,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -528,6 +525,23 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + [[package]] name = "futures-macro" version = "0.3.30" @@ -536,7 +550,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] [[package]] @@ -557,9 +571,13 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -608,9 +626,9 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermit-abi" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" +checksum = "bd5256b483761cd23699d0da46cc6fd2ee3be420bbe6d020ae4a091e70b7e9fd" [[package]] name = "http" @@ -666,9 +684,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a" dependencies = [ "bytes", "futures-channel", @@ -680,6 +698,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", + "smallvec", "tokio", ] @@ -701,9 +720,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.59" +version = "0.1.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -749,9 +768,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.2" +version = "2.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" +checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177" dependencies = [ "equivalent", "hashbrown", @@ -759,12 +778,12 @@ dependencies = [ [[package]] name = "is-terminal" -version = "0.4.10" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" dependencies = [ "hermit-abi", - "rustix", + "libc", "windows-sys 0.52.0", ] @@ -776,9 +795,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" +checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" dependencies = [ "wasm-bindgen", ] @@ -807,12 +826,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "linux-raw-sys" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" - [[package]] name = "lock_api" version = "0.4.11" @@ -859,9 +872,9 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" dependencies = [ "adler", ] @@ -879,9 +892,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" dependencies = [ "autocfg", ] @@ -945,6 +958,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "pcap-file-tokio" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ee4f08e375f9aabbb17f4c031a2f0af1397835ce8d7909b167ada1dd8b572e6" +dependencies = [ + "async-trait", + "byteorder", + "derive-into-owned", + "pcap-file", + "thiserror", + "tokio", + "tokio-byteorder", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -968,7 +996,7 @@ checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] [[package]] @@ -985,9 +1013,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" [[package]] name = "proc-macro-crate" @@ -1060,11 +1088,14 @@ dependencies = [ "crc", "deku", "env_logger", + "futures", + "futures-core", "libc", "log", - "pcap-file", + "pcap-file-tokio", "telcom-parser", "thiserror", + "tokio", ] [[package]] @@ -1074,6 +1105,7 @@ dependencies = [ "axum", "chrono", "env_logger", + "futures", "futures-core", "futures-macro", "include_dir", @@ -1084,6 +1116,7 @@ dependencies = [ "tempdir", "thiserror", "tokio", + "tokio-stream", "tokio-util", "toml", ] @@ -1103,7 +1136,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" dependencies = [ - "bitflags 1.3.2", + "bitflags", ] [[package]] @@ -1164,19 +1197,6 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" -[[package]] -name = "rustix" -version = "0.38.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" -dependencies = [ - "bitflags 2.4.2", - "errno", - "libc", - "linux-raw-sys", - "windows-sys 0.52.0", -] - [[package]] name = "rustversion" version = "1.0.14" @@ -1185,9 +1205,9 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "ryu" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" +checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "scopeguard" @@ -1197,29 +1217,29 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] [[package]] name = "serde_json" -version = "1.0.113" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" +checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ "itoa", "ryu", @@ -1323,9 +1343,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.48" +version = "2.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +checksum = "74f1bdc9872430ce9b75da68329d1c1746faf50ffac5f19e02b71e37ff881ffb" dependencies = [ "proc-macro2", "quote", @@ -1378,29 +1398,29 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] [[package]] name = "tokio" -version = "1.35.1" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -1415,6 +1435,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-byteorder" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cf347e8ae1d1ffd16c8aed569172a71bd81098a001d0f4964d476c0097aba4a" +dependencies = [ + "byteorder", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -1423,7 +1453,18 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", +] + +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", ] [[package]] @@ -1444,14 +1485,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6a4b9e8023eb94392d3dca65d717c53abc5dad49c07cb65bb8fcd87115fa325" +checksum = "9a9aad4a3066010876e8dcf5a8a06e70a558751117a145c6ce2b82c2e2054290" dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.21.1", + "toml_edit 0.22.6", ] [[package]] @@ -1471,20 +1512,20 @@ checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ "indexmap", "toml_datetime", - "winnow", + "winnow 0.5.40", ] [[package]] name = "toml_edit" -version = "0.21.1" +version = "0.22.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +checksum = "2c1b5fd4128cc8d3e0cb74d4ed9a9cc7c7284becd4df68f5f940e1ad123606f6" dependencies = [ "indexmap", "serde", "serde_spanned", "toml_datetime", - "winnow", + "winnow 0.6.2", ] [[package]] @@ -1582,9 +1623,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" +checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1592,24 +1633,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" +checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" +checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1617,22 +1658,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" +checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" +checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" [[package]] name = "winapi" @@ -1671,7 +1712,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.3", ] [[package]] @@ -1689,7 +1730,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.3", ] [[package]] @@ -1709,17 +1750,17 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +checksum = "d380ba1dc7187569a8a9e91ed34b8ccfc33123bbacb8c0aed2d1ad7f3ef2dc5f" dependencies = [ - "windows_aarch64_gnullvm 0.52.0", - "windows_aarch64_msvc 0.52.0", - "windows_i686_gnu 0.52.0", - "windows_i686_msvc 0.52.0", - "windows_x86_64_gnu 0.52.0", - "windows_x86_64_gnullvm 0.52.0", - "windows_x86_64_msvc 0.52.0", + "windows_aarch64_gnullvm 0.52.3", + "windows_aarch64_msvc 0.52.3", + "windows_i686_gnu 0.52.3", + "windows_i686_msvc 0.52.3", + "windows_x86_64_gnu 0.52.3", + "windows_x86_64_gnullvm 0.52.3", + "windows_x86_64_msvc 0.52.3", ] [[package]] @@ -1730,9 +1771,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" +checksum = "68e5dcfb9413f53afd9c8f86e56a7b4d86d9a2fa26090ea2dc9e40fba56c6ec6" [[package]] name = "windows_aarch64_msvc" @@ -1742,9 +1783,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" +checksum = "8dab469ebbc45798319e69eebf92308e541ce46760b49b18c6b3fe5e8965b30f" [[package]] name = "windows_i686_gnu" @@ -1754,9 +1795,9 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" +checksum = "2a4e9b6a7cac734a8b4138a4e1044eac3404d8326b6c0f939276560687a033fb" [[package]] name = "windows_i686_msvc" @@ -1766,9 +1807,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" +checksum = "28b0ec9c422ca95ff34a78755cfa6ad4a51371da2a5ace67500cf7ca5f232c58" [[package]] name = "windows_x86_64_gnu" @@ -1778,9 +1819,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" +checksum = "704131571ba93e89d7cd43482277d6632589b18ecf4468f591fbae0a8b101614" [[package]] name = "windows_x86_64_gnullvm" @@ -1790,9 +1831,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" +checksum = "42079295511643151e98d61c38c0acc444e52dd42ab456f7ccfd5152e8ecf21c" [[package]] name = "windows_x86_64_msvc" @@ -1802,15 +1843,24 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.0" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +checksum = "0770833d60a970638e989b3fa9fd2bb1aaadcf88963d1659fd7d9990196ed2d6" [[package]] name = "winnow" -version = "0.5.36" +version = "0.5.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "818ce546a11a9986bc24f93d0cdf38a8a1a400f1473ea8c82e59f6e0ffab9249" +checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" +dependencies = [ + "memchr", +] + +[[package]] +name = "winnow" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a4191c47f15cc3ec71fcb4913cb83d58def65dd3787610213c649283b5ce178" dependencies = [ "memchr", ] @@ -1841,5 +1891,5 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.48", + "syn 2.0.50", ] diff --git a/bin/Cargo.toml b/bin/Cargo.toml index 5a58814..334f2f9 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -23,3 +23,5 @@ include_dir = "0.7.3" mime_guess = "2.0.4" tempdir = "0.3.7" chrono = { version = "0.4.31", features = ["serde"] } +tokio-stream = "0.1.14" +futures = "0.3.30" diff --git a/bin/src/diag.rs b/bin/src/diag.rs index f318465..4c2e43e 100644 --- a/bin/src/diag.rs +++ b/bin/src/diag.rs @@ -1,87 +1,78 @@ +use std::pin::pin; use std::sync::Arc; use axum::extract::State; use axum::http::StatusCode; +use rayhunter::diag::DataType; use rayhunter::diag_device::DiagDevice; -use rayhunter::diag_reader::DiagReader; use tokio::sync::RwLock; -use tokio::sync::mpsc::{Receiver, self}; +use tokio::sync::mpsc::Receiver; use rayhunter::qmdl::QmdlWriter; -use log::{debug, info}; -use tokio::sync::mpsc::error::TryRecvError; -use tokio::task::JoinHandle; +use log::{debug, error, info}; +use tokio::fs::File; use tokio_util::task::TaskTracker; +use futures::{StreamExt, TryStreamExt}; -use crate::error::RayhunterError; use crate::qmdl_store::QmdlStore; use crate::server::ServerState; pub enum DiagDeviceCtrlMessage { StopRecording, - StartRecording(QmdlWriter), + StartRecording(QmdlWriter), Exit, } -pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut qmdl_file_rx: Receiver, qmdl_store_lock: Arc>) -> JoinHandle> { - // mpsc channel for updating QmdlStore entry filesizes. First usize is the - // index, second is the size in bytes - let (tx, mut rx) = mpsc::channel::<(usize, usize)>(1); - - // Spawn a thread to monitor the (usize, usize) channel for updates, - // triggering QmdlStore updates - let qmdl_store_lock_clone = qmdl_store_lock.clone(); +pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut qmdl_file_rx: Receiver, qmdl_store_lock: Arc>) { task_tracker.spawn(async move { - while let Some((entry_idx, new_size)) = rx.recv().await { - let mut qmdl_store = qmdl_store_lock_clone.write().await; - qmdl_store.update_entry(entry_idx, new_size).await - .expect("failed to update qmdl file size"); - } - info!("QMDL store size updater thread exiting..."); - }); - - // Spawn a thread to drive the DiagDevice reading loop. Since DiagDevice - // works via synchronous I/O, we have to spawn a "blocking" thread to avoid - // gumming up tokio's event loop. - task_tracker.spawn_blocking(move || { + let initial_file = qmdl_store_lock.write().await.new_entry().await.expect("failed creating QMDL file entry"); + let mut qmdl_writer: Option> = Some(QmdlWriter::new(initial_file)); + let mut diag_stream = pin!(dev.as_stream().into_stream()); loop { - // First check if we've gotten any control meesages - match qmdl_file_rx.try_recv() { - Ok(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)) => { - dev.qmdl_writer = Some(qmdl_writer); - }, - Ok(DiagDeviceCtrlMessage::StopRecording) => dev.qmdl_writer = None, - // Disconnected means all the Senders have been dropped, so it's - // time to go - Ok(DiagDeviceCtrlMessage::Exit) | Err(TryRecvError::Disconnected) => { - info!("Diag reader thread exiting..."); - return Ok(()) - }, - // empty just means there's no message for us, so continue as normal - Err(TryRecvError::Empty) => {}, - } - - // remember the QmdlStore current entry index so we can update its size later - let qmdl_store_index = qmdl_store_lock.blocking_read().current_entry; - - // TODO: once we're actually doing analysis, we'll wanna use the messages - // returned here. Until then, the DiagDevice has already written those messages - // to the QMDL file, so we can just ignore them. - debug!("reading response from diag device..."); - let _messages = dev.read_response().map_err(RayhunterError::DiagReadError)?; - debug!("got diag response ({} messages)", _messages.len()); - - // keep track of how many bytes were written to the QMDL file so we can read - // a valid block of data from it in the HTTP server - if let Some(qmdl_writer) = dev.qmdl_writer.as_ref() { - debug!("total QMDL bytes written: {}, sending update...", qmdl_writer.total_written); - let index = qmdl_store_index.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); - tx.blocking_send((index, qmdl_writer.total_written)).unwrap(); - debug!("done!"); - } else { - debug!("no qmdl_writer set, continuing..."); + tokio::select! { + msg = qmdl_file_rx.recv() => { + match msg { + Some(DiagDeviceCtrlMessage::StartRecording(new_writer)) => { + qmdl_writer = Some(new_writer); + }, + Some(DiagDeviceCtrlMessage::StopRecording) => qmdl_writer = None, + // None means all the Senders have been dropped, so it's + // time to go + Some(DiagDeviceCtrlMessage::Exit) | None => { + info!("Diag reader thread exiting..."); + return Ok(()) + }, + } + } + maybe_container = diag_stream.next() => { + match maybe_container.unwrap() { + Ok(container) => { + if container.data_type != DataType::UserSpace { + debug!("skipping non-userspace diag messages..."); + continue; + } + // keep track of how many bytes were written to the QMDL file so we can read + // a valid block of data from it in the HTTP server + if let Some(writer) = qmdl_writer.as_mut() { + writer.write_container(&container).await.expect("failed to write to QMDL writer"); + debug!("total QMDL bytes written: {}, updating manifest...", writer.total_written); + let mut qmdl_store = qmdl_store_lock.write().await; + let index = qmdl_store.current_entry.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); + qmdl_store.update_entry(index, writer.total_written).await + .expect("failed to update qmdl file size"); + debug!("done!"); + } else { + debug!("no qmdl_writer set, continuing..."); + } + }, + Err(err) => { + error!("error reading diag device: {}", err); + return Err(err); + } + } + } } } - }) + }); } pub async fn start_recording(State(state): State>) -> Result<(StatusCode, String), (StatusCode, String)> { @@ -91,7 +82,7 @@ pub async fn start_recording(State(state): State>) -> Result<(S let mut qmdl_store = state.qmdl_store_lock.write().await; let qmdl_file = qmdl_store.new_entry().await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't create new qmdl entry: {}", e)))?; - let qmdl_writer = QmdlWriter::new(qmdl_file.into_std().await); + let qmdl_writer = QmdlWriter::new(qmdl_file); state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)).await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?; Ok((StatusCode::ACCEPTED, format!("ok"))) diff --git a/bin/src/error.rs b/bin/src/error.rs index 78d15a8..f7a3470 100644 --- a/bin/src/error.rs +++ b/bin/src/error.rs @@ -9,8 +9,6 @@ pub enum RayhunterError{ ConfigFileParsingError(#[from] toml::de::Error), #[error("Diag intialization error: {0}")] DiagInitError(DiagDeviceError), - #[error("Diag read error: {0}")] - DiagReadError(DiagDeviceError), #[error("Tokio error: {0}")] TokioError(#[from] tokio::io::Error), #[error("QmdlStore error: {0}")] diff --git a/bin/src/main.rs b/bin/src/main.rs index 55fab69..892e914 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -20,7 +20,6 @@ use log::{info, error}; use rayhunter::diag_device::DiagDevice; use axum::routing::{get, post}; use axum::Router; -use rayhunter::qmdl::QmdlWriter; use stats::get_qmdl_manifest; use tokio::sync::mpsc::{self, Sender}; use tokio::task::JoinHandle; @@ -127,11 +126,9 @@ async fn main() -> Result<(), RayhunterError> { let qmdl_store_lock = Arc::new(RwLock::new(init_qmdl_store(&config).await?)); let (tx, rx) = mpsc::channel::(1); if !config.readonly_mode { - let qmdl_file = qmdl_store_lock.write().await.new_entry().await?; - let qmdl_writer = QmdlWriter::new(qmdl_file.into_std().await); - let mut dev = DiagDevice::new(Some(qmdl_writer)) + let mut dev = DiagDevice::new().await .map_err(RayhunterError::DiagInitError)?; - dev.config_logs() + dev.config_logs().await .map_err(RayhunterError::DiagInitError)?; run_diag_read_thread(&task_tracker, dev, rx, qmdl_store_lock.clone()); diff --git a/bin/src/pcap.rs b/bin/src/pcap.rs index 7694c39..5dadd24 100644 --- a/bin/src/pcap.rs +++ b/bin/src/pcap.rs @@ -1,26 +1,24 @@ use crate::ServerState; +use rayhunter::diag::DataType; use rayhunter::gsmtap_parser::GsmtapParser; use rayhunter::pcap::GsmtapPcapWriter; -use rayhunter::qmdl::{QmdlReader, QmdlReaderError}; -use rayhunter::diag_reader::DiagReader; +use rayhunter::qmdl::QmdlReader; use axum::body::Body; use axum::http::header::CONTENT_TYPE; use axum::extract::{State, Path}; use axum::http::StatusCode; use axum::response::{Response, IntoResponse}; -use std::io::Write; -use std::pin::Pin; +use tokio::io::duplex; +use tokio_util::io::ReaderStream; +use std::{future, pin::pin}; use std::sync::Arc; -use std::task::{Poll, Context}; -use futures_core::Stream; use log::error; -use tokio::sync::mpsc; +use futures::TryStreamExt; // Streams a pcap file chunk-by-chunk to the client by reading the QMDL data -// written so far. This is done by spawning a blocking thread (a tokio thread -// capable of handling blocking operations) which streams chunks of pcap data to -// a channel that's piped to the client. +// written so far. This is done by spawning a thread which streams chunks of +// pcap data to a channel that's piped to the client. pub async fn get_pcap(State(state): State>, Path(qmdl_name): Path) -> Result { let qmdl_store = state.qmdl_store_lock.read().await; let entry = qmdl_store.entry_for_name(&qmdl_name) @@ -31,82 +29,39 @@ pub async fn get_pcap(State(state): State>, Path(qmdl_name): Pa "QMDL file is empty, try again in a bit!".to_string() )); } + let qmdl_file = qmdl_store.open_entry(&entry).await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))? - .into_std().await; + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?; + // the QMDL reader should stop at the last successfully written data chunk + // (entry.size_bytes) + let mut gsmtap_parser = GsmtapParser::new(); + let (reader, writer) = duplex(1024); + let mut pcap_writer = GsmtapPcapWriter::new(writer).await.unwrap(); + pcap_writer.write_iface_header().await.unwrap(); - let (tx, rx) = mpsc::channel(1); - let channel_reader = ChannelReader { rx }; - let channel_writer = ChannelWriter { tx }; - tokio::task::spawn_blocking(move || { - // the QMDL reader should stop at the last successfully written data - // chunk (qmdl_bytes_written) - let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(entry.size_bytes)); - - let mut gsmtap_parser = GsmtapParser::new(); - let mut pcap_writer = GsmtapPcapWriter::new(channel_writer).unwrap(); - pcap_writer.write_iface_header().unwrap(); - loop { - match qmdl_reader.read_response() { - Ok(messages) => { - for maybe_msg in messages { - match maybe_msg { - Ok(msg) => { - let maybe_gsmtap_msg = gsmtap_parser.recv_message(msg) - .expect("error parsing gsmtap message"); - if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg { - pcap_writer.write_gsmtap_message(gsmtap_msg, timestamp) - .expect("error writing pcap packet"); - } - }, - Err(e) => error!("error parsing message: {:?}", e), + tokio::spawn(async move { + let mut reader = QmdlReader::new(qmdl_file, Some(entry.size_bytes)); + let mut messages_stream = pin!(reader.as_stream() + .try_filter(|container| future::ready(container.data_type == DataType::UserSpace))); + + while let Some(container) = messages_stream.try_next().await.expect("failed getting QMDL container") { + for maybe_msg in container.into_messages() { + match maybe_msg { + Ok(msg) => { + let maybe_gsmtap_msg = gsmtap_parser.recv_message(msg) + .expect("error parsing gsmtap message"); + if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg { + pcap_writer.write_gsmtap_message(gsmtap_msg, timestamp).await + .expect("error writing pcap packet"); } - } - }, - // this is expected, and just means we've reached the end of the - // safely written QMDL data - Err(QmdlReaderError::MaxBytesReached(_)) => break, - Err(e) => { - error!("error reading qmdl file: {:?}", e); - break; - }, + }, + Err(e) => error!("error parsing message: {:?}", e), + } } } }); let headers = [(CONTENT_TYPE, "application/vnd.tcpdump.pcap")]; - let body = Body::from_stream(channel_reader); + let body = Body::from_stream(ReaderStream::new(reader)); Ok((headers, body).into_response()) } - -struct ChannelWriter { - tx: mpsc::Sender>, -} - -impl Write for ChannelWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.tx.blocking_send(buf.to_vec()) - .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "channel closed"))?; - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -struct ChannelReader { - rx: mpsc::Receiver>, -} - -impl Stream for ChannelReader { - type Item = Result, String>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.rx.poll_recv(cx) { - Poll::Ready(Some(msg)) => Poll::Ready(Some(Ok(msg))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -} diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 94a30ee..88b72ac 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -12,6 +12,9 @@ deku = { version = "0.16.0", features = ["logging"] } env_logger = "0.10.1" libc = "0.2.150" log = "0.4.20" -pcap-file = "2.0.0" +pcap-file-tokio = "0.1.0" thiserror = "1.0.50" telcom-parser = { path = "../telcom-parser" } +tokio = { version = "1.35.1", features = ["full"] } +futures-core = "0.3.30" +futures = "0.3.30" diff --git a/lib/src/diag.rs b/lib/src/diag.rs index 786476d..51c7aec 100644 --- a/lib/src/diag.rs +++ b/lib/src/diag.rs @@ -1,8 +1,13 @@ //! Diag protocol serialization/deserialization use chrono::{DateTime, FixedOffset}; +use crc::{Algorithm, Crc}; use deku::prelude::*; +use crate::hdlc::{self, hdlc_decapsulate}; +use log::{warn, error}; +use thiserror::Error; + pub const MESSAGE_TERMINATOR: u8 = 0x7e; pub const MESSAGE_ESCAPE_CHAR: u8 = 0x7d; @@ -49,6 +54,28 @@ pub enum DataType { Other(u32), } +#[derive(Debug, Clone, PartialEq, Error)] +pub enum DiagParsingError { + #[error("Failed to parse Message: {0}, data: {1:?}")] + MessageParsingError(deku::DekuError, Vec), + #[error("HDLC decapsulation of message failed: {0}, data: {1:?}")] + HdlcDecapsulationError(hdlc::HdlcError, Vec), +} + +// this is sorta based on the params qcsuper uses, plus what seems to be used in +// https://github.com/fgsect/scat/blob/f1538b397721df3ab8ba12acd26716abcf21f78b/util.py#L47 +pub const CRC_CCITT_ALG: Algorithm = Algorithm { + poly: 0x1021, + init: 0xffff, + refin: true, + refout: true, + width: 16, + xorout: 0xffff, + check: 0x2189, + residue: 0x0000, +}; + +pub const CRC_CCITT: Crc = Crc::::new(&CRC_CCITT_ALG); #[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)] pub struct MessagesContainer { pub data_type: DataType, @@ -57,6 +84,29 @@ pub struct MessagesContainer { pub messages: Vec, } +impl MessagesContainer { + pub fn into_messages(self) -> Vec> { + let mut result = Vec::new(); + for msg in self.messages { + for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) { + match hdlc_decapsulate(sub_msg, &CRC_CCITT) { + Ok(data) => match Message::from_bytes((&data, 0)) { + Ok(((leftover_bytes, _), res)) => { + if !leftover_bytes.is_empty() { + warn!("warning: {} leftover bytes when parsing Message", leftover_bytes.len()); + } + result.push(Ok(res)); + }, + Err(e) => result.push(Err(DiagParsingError::MessageParsingError(e, data))), + }, + Err(err) => result.push(Err(DiagParsingError::HdlcDecapsulationError(err, sub_msg.to_vec()))), + } + } + } + result + } +} + #[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)] pub struct HdlcEncapsulatedMessage { pub len: u32, @@ -431,4 +481,99 @@ mod test { }, }); } + + fn make_container(data_type: DataType, message: HdlcEncapsulatedMessage) -> MessagesContainer { + MessagesContainer { + data_type, + num_messages: 1, + messages: vec![message], + } + } + + // this log is based on one captured on a real device -- if it fails to + // serialize or deserialize, that's probably a problem with this mock, not + // the DiagReader implementation + fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) { + let length_with_payload = 31 + payload.len() as u16; + let message = Message::Log { + pending_msgs: 0, + outer_length: length_with_payload, + inner_length: length_with_payload, + log_type: 0xb0c0, + timestamp: Timestamp { ts: 72659535985485082 }, + body: LogBody::LteRrcOtaMessage { + ext_header_version: 20, + packet: LteRrcOtaPacket::V8 { + rrc_rel_maj: 14, + rrc_rel_min: 48, + bearer_id: 0, + phy_cell_id: 160, + earfcn: 2050, + sfn_subfn: 4057, + pdu_num: 5, + sib_mask: 0, + len: payload.len() as u16, + packet: payload.to_vec(), + }, + }, + }; + let serialized = message.to_bytes().expect("failed to serialize test message"); + let encapsulated_data = hdlc::hdlc_encapsulate(&serialized, &CRC_CCITT); + let encapsulated = HdlcEncapsulatedMessage { + len: encapsulated_data.len() as u32, + data: encapsulated_data, + }; + (encapsulated, message) + } + + #[test] + fn test_containers_with_multiple_messages() { + let (encapsulated1, message1) = get_test_message(&[1]); + let (encapsulated2, message2) = get_test_message(&[2]); + let mut container = make_container(DataType::UserSpace, encapsulated1); + container.messages.push(encapsulated2); + container.num_messages += 1; + assert_eq!(container.into_messages(), vec![Ok(message1), Ok(message2)]); + } + + #[test] + fn test_containers_with_concatenated_message() { + let (mut encapsulated1, message1) = get_test_message(&[1]); + let (encapsulated2, message2) = get_test_message(&[2]); + encapsulated1.data.extend(encapsulated2.data); + encapsulated1.len += encapsulated2.len; + let container = make_container(DataType::UserSpace, encapsulated1); + assert_eq!(container.into_messages(), vec![Ok(message1), Ok(message2)]); + } + + #[test] + fn test_handles_parsing_errors() { + let (encapsulated1, message1) = get_test_message(&[1]); + let bad_message = hdlc::hdlc_encapsulate(&[0x01, 0x02, 0x03, 0x04], &CRC_CCITT); + let encapsulated2 = HdlcEncapsulatedMessage { + len: bad_message.len() as u32, + data: bad_message, + }; + let mut container = make_container(DataType::UserSpace, encapsulated1); + container.messages.push(encapsulated2); + container.num_messages += 1; + let result = container.into_messages(); + assert_eq!(result[0], Ok(message1)); + assert!(matches!(result[1], Err(DiagParsingError::MessageParsingError(_, _)))); + } + + #[test] + fn test_handles_encapsulation_errors() { + let (encapsulated1, message1) = get_test_message(&[1]); + let bad_encapsulation = HdlcEncapsulatedMessage { + len: 4, + data: vec![0x01, 0x02, 0x03, 0x04], + }; + let mut container = make_container(DataType::UserSpace, encapsulated1); + container.messages.push(bad_encapsulation); + container.num_messages += 1; + let result = container.into_messages(); + assert_eq!(result[0], Ok(message1)); + assert!(matches!(result[1], Err(DiagParsingError::HdlcDecapsulationError(_, _)))); + } } diff --git a/lib/src/diag_device.rs b/lib/src/diag_device.rs index a9d5a31..3737379 100644 --- a/lib/src/diag_device.rs +++ b/lib/src/diag_device.rs @@ -1,15 +1,15 @@ use crate::hdlc::hdlc_encapsulate; -use crate::diag::{Message, ResponsePayload, Request, LogConfigRequest, LogConfigResponse, build_log_mask_request, RequestContainer, DataType, MessagesContainer}; -use crate::diag_reader::{DiagReader, CRC_CCITT}; -use crate::qmdl::QmdlWriter; +use crate::diag::{build_log_mask_request, DataType, DiagParsingError, LogConfigRequest, LogConfigResponse, Message, MessagesContainer, Request, RequestContainer, ResponsePayload, CRC_CCITT}; use crate::log_codes; -use std::fs::File; -use std::io::Read; +use std::io::ErrorKind; use std::os::fd::AsRawFd; +use futures_core::TryStream; use thiserror::Error; use log::{info, warn, error}; use deku::prelude::*; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; pub type DiagResult = Result; @@ -20,7 +20,7 @@ pub enum DiagDeviceError { #[error("Failed to read diag device: {0}")] DeviceReadFailed(std::io::Error), #[error("Failed to write diag device: {0}")] - DeviceWriteFailed(String), + DeviceWriteFailed(std::io::Error), #[error("Nonzero status code {0} for diag request: {1:?}")] RequestFailed(u32, Request), #[error("Didn't receive response for request: {0:?}")] @@ -71,43 +71,18 @@ const DIAG_IOCTL_SWITCH_LOGGING: u64 = 7; pub struct DiagDevice { file: File, - pub qmdl_writer: Option>, fully_initialized: bool, read_buf: Vec, use_mdm: i32, } -impl DiagReader for DiagDevice { - type Err = DiagDeviceError; - - fn get_next_messages_container(&mut self) -> DiagResult { - let mut bytes_read = 0; - while bytes_read == 0 { - bytes_read = self.file.read(&mut self.read_buf) - .map_err(DiagDeviceError::DeviceReadFailed)?; - } - let ((leftover_bytes, _), container) = MessagesContainer::from_bytes((&self.read_buf[0..bytes_read], 0)) - .map_err(DiagDeviceError::ParseMessagesContainerError)?; - if !leftover_bytes.is_empty() { - warn!("warning: {} leftover bytes when parsing MessagesContainer", leftover_bytes.len()); - } - - if let Some(qmdl_writer) = self.qmdl_writer.as_mut() { - if self.fully_initialized { - qmdl_writer.write_container(&container) - .map_err(DiagDeviceError::QmdlFileWriteError)?; - } - } - Ok(container) - } -} - impl DiagDevice { - pub fn new(qmdl_writer: Option>) -> DiagResult { - let diag_file = std::fs::File::options() + pub async fn new() -> DiagResult { + let diag_file = File::options() .read(true) .write(true) .open("/dev/diag") + .await .map_err(DiagDeviceError::OpenDiagDeviceError)?; let fd = diag_file.as_raw_fd(); @@ -118,12 +93,32 @@ impl DiagDevice { read_buf: vec![0; BUFFER_LEN], file: diag_file, fully_initialized: false, - qmdl_writer, use_mdm, }) } - fn write_request(&mut self, req: &Request) -> DiagResult<()> { + pub fn as_stream(&mut self) -> impl TryStream + '_ { + futures::stream::try_unfold(self, |dev| async { + let container = dev.get_next_messages_container().await?; + Ok(Some((container, dev))) + }) + } + + async fn get_next_messages_container(&mut self) -> Result { + let mut bytes_read = 0; + while bytes_read == 0 { + bytes_read = self.file.read(&mut self.read_buf).await + .map_err(DiagDeviceError::DeviceReadFailed)?; + } + let ((leftover_bytes, _), container) = MessagesContainer::from_bytes((&self.read_buf[0..bytes_read], 0)) + .map_err(DiagDeviceError::ParseMessagesContainerError)?; + if !leftover_bytes.is_empty() { + warn!("warning: {} leftover bytes when parsing MessagesContainer", leftover_bytes.len()); + } + Ok(container) + } + + async fn write_request(&mut self, req: &Request) -> DiagResult<()> { let req_bytes = &req.to_bytes().expect("Failed to serialize Request"); let buf = RequestContainer { data_type: DataType::UserSpace, @@ -131,23 +126,35 @@ impl DiagDevice { mdm_field: -1, hdlc_encapsulated_request: hdlc_encapsulate(req_bytes, &CRC_CCITT), }.to_bytes().expect("Failed to serialize RequestContainer"); - unsafe { - let fd = self.file.as_raw_fd(); - let buf_ptr = buf.as_ptr() as *const libc::c_void; - let ret = libc::write(fd, buf_ptr, buf.len()); - if ret < 0 { - let msg = format!("write failed with error code {}", ret); - return Err(DiagDeviceError::DeviceWriteFailed(msg)); + if let Err(err) = self.file.write(&buf).await { + // For reasons I don't entirely understand, calls to write(2) on + // /dev/diag always return 0 bytes written, though the written + // requests end up being interpreted. As such, we're not concerned + // about WriteZero errors + if err.kind() != ErrorKind::WriteZero { + return Err(DiagDeviceError::DeviceWriteFailed(err)); } } + self.file.flush().await + .map_err(DiagDeviceError::DeviceWriteFailed)?; Ok(()) } - fn retrieve_id_ranges(&mut self) -> DiagResult<[u32; 16]> { - let req = Request::LogConfig(LogConfigRequest::RetrieveIdRanges); - self.write_request(&req)?; + async fn read_response(&mut self) -> DiagResult>> { + loop { + let container = self.get_next_messages_container().await?; + if container.data_type != DataType::UserSpace { + continue; + } + return Ok(container.into_messages()); + } + } - for msg in self.read_response()? { + async fn retrieve_id_ranges(&mut self) -> DiagResult<[u32; 16]> { + let req = Request::LogConfig(LogConfigRequest::RetrieveIdRanges); + self.write_request(&req).await?; + + for msg in self.read_response().await? { match msg { Ok(Message::Log { .. }) => info!("skipping log response..."), Ok(Message::Response { payload, status, .. }) => match payload { @@ -166,11 +173,11 @@ impl DiagDevice { Err(DiagDeviceError::NoResponse(req)) } - fn set_log_mask(&mut self, log_type: u32, log_mask_bitsize: u32) -> DiagResult<()> { + async fn set_log_mask(&mut self, log_type: u32, log_mask_bitsize: u32) -> DiagResult<()> { let req = build_log_mask_request(log_type, log_mask_bitsize, &LOG_CODES_FOR_RAW_PACKET_LOGGING); - self.write_request(&req)?; + self.write_request(&req).await?; - for msg in self.read_response()? { + for msg in self.read_response().await? { match msg { Ok(Message::Log { .. }) => info!("skipping log response..."), Ok(Message::Response { payload, status, .. }) => { @@ -188,13 +195,13 @@ impl DiagDevice { Err(DiagDeviceError::NoResponse(req)) } - pub fn config_logs(&mut self) -> DiagResult<()> { + pub async fn config_logs(&mut self) -> DiagResult<()> { info!("retrieving diag logging capabilities..."); - let log_mask_sizes = self.retrieve_id_ranges()?; + let log_mask_sizes = self.retrieve_id_ranges().await?; for (log_type, &log_mask_bitsize) in log_mask_sizes.iter().enumerate() { if log_mask_bitsize > 0 { - self.set_log_mask(log_type as u32, log_mask_bitsize)?; + self.set_log_mask(log_type as u32, log_mask_bitsize).await?; info!("enabled logging for log type {}", log_type); } } diff --git a/lib/src/diag_reader.rs b/lib/src/diag_reader.rs deleted file mode 100644 index b8ef308..0000000 --- a/lib/src/diag_reader.rs +++ /dev/null @@ -1,223 +0,0 @@ -use crate::diag; -use crate::{diag::*, hdlc::hdlc_decapsulate}; -use crate::hdlc; - -use crc::{Crc, Algorithm}; -use deku::prelude::*; -use log::{info, warn, error}; -use thiserror::Error; - -// this is sorta based on the params qcsuper uses, plus what seems to be used in -// https://github.com/fgsect/scat/blob/f1538b397721df3ab8ba12acd26716abcf21f78b/util.py#L47 -pub const CRC_CCITT_ALG: Algorithm = Algorithm { - poly: 0x1021, - init: 0xffff, - refin: true, - refout: true, - width: 16, - xorout: 0xffff, - check: 0x2189, - residue: 0x0000, -}; -pub const CRC_CCITT: Crc = Crc::::new(&CRC_CCITT_ALG); - -#[derive(Debug, PartialEq, Error)] -pub enum DiagParsingError { - #[error("Failed to parse Message: {0}, data: {1:?}")] - MessageParsingError(deku::DekuError, Vec), - #[error("HDLC decapsulation of message failed: {0}, data: {1:?}")] - HdlcDecapsulationError(hdlc::HdlcError, Vec), -} - -type MaybeMessage = Result; - -pub trait DiagReader { - type Err; - - fn get_next_messages_container(&mut self) -> Result; - - fn read_response(&mut self) -> Result, Self::Err> { - loop { - let container = self.get_next_messages_container()?; - if container.data_type == DataType::UserSpace { - return self.parse_response_container(container); - } else { - info!("skipping non-userspace message...") - } - } - } - - fn parse_response_container(&self, container: MessagesContainer) -> Result, Self::Err> { - let mut result = Vec::new(); - for msg in container.messages { - for sub_msg in msg.data.split_inclusive(|&b| b == diag::MESSAGE_TERMINATOR) { - match hdlc_decapsulate(sub_msg, &CRC_CCITT) { - Ok(data) => match Message::from_bytes((&data, 0)) { - Ok(((leftover_bytes, _), res)) => { - if !leftover_bytes.is_empty() { - warn!("warning: {} leftover bytes when parsing Message", leftover_bytes.len()); - } - result.push(Ok(res)); - }, - Err(e) => { - result.push(Err(DiagParsingError::MessageParsingError(e, data))); - }, - }, - Err(err) => { - result.push(Err(DiagParsingError::HdlcDecapsulationError(err, sub_msg.to_vec()))); - } - } - } - } - Ok(result) - } -} - -#[cfg(test)] -mod test { - use super::*; - - struct MockReader { - containers: Vec, - } - - impl DiagReader for MockReader { - type Err = (); - - fn get_next_messages_container(&mut self) -> Result { - Ok(self.containers.remove(0)) - } - } - - fn make_container(data_type: DataType, message: HdlcEncapsulatedMessage) -> MessagesContainer { - MessagesContainer { - data_type, - num_messages: 1, - messages: vec![message], - } - } - - // this log is based on one captured on a real device -- if it fails to - // serialize or deserialize, that's probably a problem with this mock, not - // the DiagReader implementation - fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) { - let length_with_payload = 31 + payload.len() as u16; - let message = Message::Log { - pending_msgs: 0, - outer_length: length_with_payload, - inner_length: length_with_payload, - log_type: 0xb0c0, - timestamp: Timestamp { ts: 72659535985485082 }, - body: LogBody::LteRrcOtaMessage { - ext_header_version: 20, - packet: LteRrcOtaPacket::V8 { - rrc_rel_maj: 14, - rrc_rel_min: 48, - bearer_id: 0, - phy_cell_id: 160, - earfcn: 2050, - sfn_subfn: 4057, - pdu_num: 5, - sib_mask: 0, - len: payload.len() as u16, - packet: payload.to_vec(), - }, - }, - }; - let serialized = message.to_bytes().expect("failed to serialize test message"); - let encapsulated_data = hdlc::hdlc_encapsulate(&serialized, &CRC_CCITT); - let encapsulated = HdlcEncapsulatedMessage { - len: encapsulated_data.len() as u32, - data: encapsulated_data, - }; - (encapsulated, message) - } - - #[test] - fn test_skipping_nonuser_containers() { - let (encapsulated1, message1) = get_test_message(&[1]); - let (encapsulated2, _) = get_test_message(&[2]); - let (encapsulated3, message3) = get_test_message(&[3]); - let mut reader = MockReader { - containers: vec![ - make_container(DataType::UserSpace, encapsulated1), - make_container(DataType::Other(0), encapsulated2), - make_container(DataType::UserSpace, encapsulated3), - ], - }; - assert_eq!(reader.read_response(), Ok(vec![Ok(message1)])); - assert_eq!(reader.read_response(), Ok(vec![Ok(message3)])); - } - - #[test] - fn test_containers_with_multiple_messages() { - let (encapsulated1, message1) = get_test_message(&[1]); - let (encapsulated2, message2) = get_test_message(&[2]); - let mut container1 = make_container(DataType::UserSpace, encapsulated1); - container1.messages.push(encapsulated2); - container1.num_messages += 1; - let (encapsulated3, message3) = get_test_message(&[3]); - let mut reader = MockReader { - containers: vec![ - container1, - make_container(DataType::UserSpace, encapsulated3), - ], - }; - assert_eq!(reader.read_response(), Ok(vec![Ok(message1), Ok(message2)])); - assert_eq!(reader.read_response(), Ok(vec![Ok(message3)])); - } - - #[test] - fn test_containers_with_concatenated_message() { - let (mut encapsulated1, message1) = get_test_message(&[1]); - let (encapsulated2, message2) = get_test_message(&[2]); - encapsulated1.data.extend(encapsulated2.data); - encapsulated1.len += encapsulated2.len; - let (encapsulated3, message3) = get_test_message(&[3]); - let mut reader = MockReader { - containers: vec![ - make_container(DataType::UserSpace, encapsulated1), - make_container(DataType::UserSpace, encapsulated3), - ], - }; - assert_eq!(reader.read_response(), Ok(vec![Ok(message1), Ok(message2)])); - assert_eq!(reader.read_response(), Ok(vec![Ok(message3)])); - } - - #[test] - fn test_handles_parsing_errors() { - let (encapsulated1, message1) = get_test_message(&[1]); - let bad_message = hdlc::hdlc_encapsulate(&[0x01, 0x02, 0x03, 0x04], &CRC_CCITT); - let encapsulated2 = HdlcEncapsulatedMessage { - len: bad_message.len() as u32, - data: bad_message, - }; - let mut container = make_container(DataType::UserSpace, encapsulated1); - container.messages.push(encapsulated2); - container.num_messages += 1; - let mut reader = MockReader { - containers: vec![container], - }; - let result = reader.read_response().unwrap(); - assert_eq!(result[0], Ok(message1)); - assert!(matches!(result[1], Err(DiagParsingError::MessageParsingError(_, _)))); - } - - #[test] - fn test_handles_encapsulation_errors() { - let (encapsulated1, message1) = get_test_message(&[1]); - let bad_encapsulation = HdlcEncapsulatedMessage { - len: 4, - data: vec![0x01, 0x02, 0x03, 0x04], - }; - let mut container = make_container(DataType::UserSpace, encapsulated1); - container.messages.push(bad_encapsulation); - container.num_messages += 1; - let mut reader = MockReader { - containers: vec![container], - }; - let result = reader.read_response().unwrap(); - assert_eq!(result[0], Ok(message1)); - assert!(matches!(result[1], Err(DiagParsingError::HdlcDecapsulationError(_, _)))); - } -} diff --git a/lib/src/hdlc.rs b/lib/src/hdlc.rs index 3c99e1b..22456bd 100644 --- a/lib/src/hdlc.rs +++ b/lib/src/hdlc.rs @@ -9,7 +9,7 @@ use thiserror::Error; use crate::diag::{MESSAGE_ESCAPE_CHAR, MESSAGE_TERMINATOR, ESCAPED_MESSAGE_ESCAPE_CHAR, ESCAPED_MESSAGE_TERMINATOR}; -#[derive(Debug, Error, PartialEq)] +#[derive(Debug, Clone, Error, PartialEq)] pub enum HdlcError { #[error("Invalid checksum (expected {0}, got {1})")] InvalidChecksum(u16, u16), @@ -89,7 +89,7 @@ mod tests { #[test] fn test_hdlc_encapsulate() { - let crc = Crc::::new(&crate::diag_reader::CRC_CCITT_ALG); + let crc = Crc::::new(&crate::diag::CRC_CCITT_ALG); let data = vec![0x01, 0x02, 0x03, 0x04]; let expected = vec![1, 2, 3, 4, 145, 57, 126]; let encapsulated = hdlc_encapsulate(&data, &crc); diff --git a/lib/src/lib.rs b/lib/src/lib.rs index dda6d21..982f406 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,7 +1,6 @@ pub mod hdlc; pub mod diag; pub mod diag_device; -pub mod diag_reader; pub mod qmdl; pub mod log_codes; pub mod gsmtap; diff --git a/lib/src/pcap.rs b/lib/src/pcap.rs index 9ae59ca..7bd5197 100644 --- a/lib/src/pcap.rs +++ b/lib/src/pcap.rs @@ -1,14 +1,14 @@ use crate::gsmtap::GsmtapMessage; use crate::diag::Timestamp; -use std::io::Write; +use tokio::io::AsyncWrite; use std::borrow::Cow; use chrono::prelude::*; use deku::prelude::*; -use pcap_file::pcapng::blocks::enhanced_packet::EnhancedPacketBlock; -use pcap_file::pcapng::blocks::interface_description::InterfaceDescriptionBlock; -use pcap_file::pcapng::PcapNgWriter; -use pcap_file::PcapError; +use pcap_file_tokio::pcapng::blocks::enhanced_packet::EnhancedPacketBlock; +use pcap_file_tokio::pcapng::blocks::interface_description::InterfaceDescriptionBlock; +use pcap_file_tokio::pcapng::PcapNgWriter; +use pcap_file_tokio::PcapError; use thiserror::Error; #[derive(Error, Debug)] @@ -23,7 +23,7 @@ pub enum GsmtapPcapError { Deku(#[from] DekuError), } -pub struct GsmtapPcapWriter where T: Write { +pub struct GsmtapPcapWriter where T: AsyncWrite { writer: PcapNgWriter, ip_id: u16, } @@ -56,23 +56,23 @@ struct UdpHeader { checksum: u16, } -impl GsmtapPcapWriter where T: Write { - pub fn new(writer: T) -> Result { - let writer = PcapNgWriter::new(writer)?; +impl GsmtapPcapWriter where T: AsyncWrite + Unpin + Send { + pub async fn new(writer: T) -> Result { + let writer = PcapNgWriter::new(writer).await?; Ok(GsmtapPcapWriter { writer, ip_id: 0 }) } - pub fn write_iface_header(&mut self) -> Result<(), GsmtapPcapError> { + pub async fn write_iface_header(&mut self) -> Result<(), GsmtapPcapError> { let interface = InterfaceDescriptionBlock { - linktype: pcap_file::DataLink::IPV4, + linktype: pcap_file_tokio::DataLink::IPV4, snaplen: 0xffff, options: vec![], }; - self.writer.write_pcapng_block(interface)?; + self.writer.write_pcapng_block(interface).await?; Ok(()) } - pub fn write_gsmtap_message(&mut self, msg: GsmtapMessage, timestamp: Timestamp) -> Result<(), GsmtapPcapError> { + pub async fn write_gsmtap_message(&mut self, msg: GsmtapMessage, timestamp: Timestamp) -> Result<(), GsmtapPcapError> { let duration = timestamp.to_datetime() .signed_duration_since(DateTime::UNIX_EPOCH) .to_std()?; @@ -113,7 +113,7 @@ impl GsmtapPcapWriter where T: Write { data: Cow::Owned(data), options: vec![], }; - self.writer.write_pcapng_block(packet)?; + self.writer.write_pcapng_block(packet).await?; self.ip_id = self.ip_id.wrapping_add(1); Ok(()) } diff --git a/lib/src/qmdl.rs b/lib/src/qmdl.rs index c9ff5d6..e6863fc 100644 --- a/lib/src/qmdl.rs +++ b/lib/src/qmdl.rs @@ -3,19 +3,18 @@ //! QmdlReader and QmdlWriter can read and write MessagesContainers to and from //! QMDL files. -use crate::diag_reader::DiagReader; use crate::diag::{MessagesContainer, MESSAGE_TERMINATOR, HdlcEncapsulatedMessage, DataType}; -use std::io::{Write, BufReader, BufRead, Read}; -use thiserror::Error; +use futures::TryStream; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, AsyncBufReadExt}; use log::error; -pub struct QmdlWriter where T: Write { +pub struct QmdlWriter where T: AsyncWrite + Unpin { writer: T, pub total_written: usize, } -impl QmdlWriter where T: Write { +impl QmdlWriter where T: AsyncWrite + Unpin { pub fn new(writer: T) -> Self { QmdlWriter::new_with_existing_size(writer, 0) } @@ -27,30 +26,22 @@ impl QmdlWriter where T: Write { } } - pub fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> { + pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> { for msg in &container.messages { - self.writer.write_all(&msg.data)?; + self.writer.write_all(&msg.data).await?; self.total_written += msg.data.len(); } Ok(()) } } -#[derive(Debug, Error)] -pub enum QmdlReaderError { - #[error("IO error: {0}")] - IoError(#[from] std::io::Error), - #[error("Reached max_bytes count {0}")] - MaxBytesReached(usize), -} - -pub struct QmdlReader where T: Read { +pub struct QmdlReader where T: AsyncRead { reader: BufReader, bytes_read: usize, max_bytes: Option, } -impl QmdlReader where T: Read { +impl QmdlReader where T: AsyncRead + Unpin { pub fn new(reader: T, max_bytes: Option) -> Self { QmdlReader { reader: BufReader::new(reader), @@ -58,23 +49,29 @@ impl QmdlReader where T: Read { max_bytes, } } -} -impl DiagReader for QmdlReader where T: Read { - type Err = QmdlReaderError; + pub fn as_stream(&mut self) -> impl TryStream + '_ { + futures::stream::try_unfold(self, |reader| async { + let maybe_container = reader.get_next_messages_container().await?; + match maybe_container { + Some(container) => Ok(Some((container, reader))), + None => Ok(None) + } + }) + } - fn get_next_messages_container(&mut self) -> Result { + async fn get_next_messages_container(&mut self) -> Result, std::io::Error> { if let Some(max_bytes) = self.max_bytes { if self.bytes_read >= max_bytes { if self.bytes_read > max_bytes { error!("warning: {} bytes read, but max_bytes was {}", self.bytes_read, max_bytes); } - return Err(QmdlReaderError::MaxBytesReached(max_bytes)); + return Ok(None); } } let mut buf = Vec::new(); - let bytes_read = self.reader.read_until(MESSAGE_TERMINATOR, &mut buf)?; + let bytes_read = self.reader.read_until(MESSAGE_TERMINATOR, &mut buf).await?; self.bytes_read += bytes_read; // Since QMDL is just a flat list of messages, we can't actually @@ -82,7 +79,7 @@ impl DiagReader for QmdlReader where T: Read { // read. So we'll just pretend that all containers had exactly one // message. As far as I know, the number of messages per container // doesn't actually affect anything, so this should be fine. - Ok(MessagesContainer { + Ok(Some(MessagesContainer { data_type: DataType::UserSpace, num_messages: 1, messages: vec![ @@ -91,7 +88,7 @@ impl DiagReader for QmdlReader where T: Read { data: buf, }, ] - }) + })) } } @@ -100,7 +97,7 @@ mod test { use std::io::Cursor; use crate::hdlc::hdlc_encapsulate; - use crate::diag_reader::CRC_CCITT; + use crate::diag::CRC_CCITT; use super::*; @@ -140,8 +137,8 @@ mod test { ] } - #[test] - fn test_unbounded_qmdl_reader() { + #[tokio::test] + async fn test_unbounded_qmdl_reader() { let mut buf = Cursor::new(get_test_message_bytes()); let mut reader = QmdlReader::new(&mut buf, None); let expected_messages = get_test_messages(); @@ -151,12 +148,12 @@ mod test { num_messages: 1, messages: vec![message], }; - assert_eq!(expected_container, reader.get_next_messages_container().unwrap()); + assert_eq!(expected_container, reader.get_next_messages_container().await.unwrap().unwrap()); } } - #[test] - fn test_bounded_qmdl_reader() { + #[tokio::test] + async fn test_bounded_qmdl_reader() { let mut buf = Cursor::new(get_test_message_bytes()); // bound the reader to the first two messages @@ -170,30 +167,30 @@ mod test { num_messages: 1, messages: vec![message], }; - assert_eq!(expected_container, reader.get_next_messages_container().unwrap()); + assert_eq!(expected_container, reader.get_next_messages_container().await.unwrap().unwrap()); } - assert!(matches!(reader.get_next_messages_container(), Err(QmdlReaderError::MaxBytesReached(_)))); + assert!(matches!(reader.get_next_messages_container().await, Ok(None))); } - #[test] - fn test_qmdl_writer() { + #[tokio::test] + async fn test_qmdl_writer() { let mut buf = Vec::new(); let mut writer = QmdlWriter::new(&mut buf); let expected_containers = get_test_containers(); for container in &expected_containers { - writer.write_container(container).unwrap(); + writer.write_container(container).await.unwrap(); } assert_eq!(writer.total_written, buf.len()); assert_eq!(buf, get_test_message_bytes()); } - #[test] - fn test_writing_and_reading() { + #[tokio::test] + async fn test_writing_and_reading() { let mut buf = Vec::new(); let mut writer = QmdlWriter::new(&mut buf); let expected_containers = get_test_containers(); for container in &expected_containers { - writer.write_container(container).unwrap(); + writer.write_container(container).await.unwrap(); } let limit = Some(buf.len()); @@ -205,8 +202,8 @@ mod test { num_messages: 1, messages: vec![message], }; - assert_eq!(expected_container, reader.get_next_messages_container().unwrap()); + assert_eq!(expected_container, reader.get_next_messages_container().await.unwrap().unwrap()); } - assert!(matches!(reader.get_next_messages_container(), Err(QmdlReaderError::MaxBytesReached(_)))); + assert!(matches!(reader.get_next_messages_container().await, Ok(None))); } }