Merge pull request #38 from EFForg/various-fixups

Various fixups
This commit is contained in:
Cooper Quintin
2024-02-28 10:24:50 -08:00
committed by GitHub
21 changed files with 587 additions and 670 deletions

View File

@@ -1,5 +0,0 @@
{
"files.associations": {
"fcntl.h": "c"
}
}

338
Cargo.lock generated
View File

@@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.8.7"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01"
checksum = "d713b3834d76b85304d4d525563c1276e2e30dc97cc67bfb4585a4a29fc2c89f"
dependencies = [
"cfg-if",
"once_cell",
@@ -61,9 +61,9 @@ dependencies = [
[[package]]
name = "anstream"
version = "0.6.11"
version = "0.6.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5"
checksum = "96b09b5178381e0874812a9b157f7fe84982617e48f71f4e3235482775e5b540"
dependencies = [
"anstyle",
"anstyle-parse",
@@ -159,7 +159,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.50",
]
[[package]]
@@ -244,12 +244,6 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf"
[[package]]
name = "bitvec"
version = "1.0.1"
@@ -265,9 +259,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.14.0"
version = "3.15.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
checksum = "8ea184aa71bb362a1157c896979544cc23974e08fd265f29ea96b59f0b4a555b"
[[package]]
name = "byteorder"
@@ -292,12 +286,9 @@ checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
[[package]]
name = "cc"
version = "1.0.83"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0"
dependencies = [
"libc",
]
checksum = "7f9fa1897e4325be0d68d48df6aa1a71ac2ed4d27723887e7754192705350730"
[[package]]
name = "cfg-if"
@@ -307,9 +298,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.33"
version = "0.4.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb"
checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b"
dependencies = [
"android-tzdata",
"iana-time-zone",
@@ -317,14 +308,14 @@ dependencies = [
"num-traits",
"serde",
"wasm-bindgen",
"windows-targets 0.52.0",
"windows-targets 0.52.3",
]
[[package]]
name = "clap"
version = "4.5.0"
version = "4.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80c21025abd42669a92efc996ef13cfb2c5c627858421ea58d5c3b331a6c134f"
checksum = "c918d541ef2913577a0f9566e9ce27cb35b6df072075769e0b26cb5a554520da"
dependencies = [
"clap_builder",
"clap_derive",
@@ -332,9 +323,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.0"
version = "4.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "458bf1f341769dfcf849846f65dffdf9146daa56bcd2a47cb4e1de9915567c99"
checksum = "9f3e7391dad68afb0c2ede1bf619f579a3dc9c2ec67f089baa397123a2f3d1eb"
dependencies = [
"anstream",
"anstyle",
@@ -351,7 +342,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.50",
]
[[package]]
@@ -476,16 +467,6 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -513,6 +494,21 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c"
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
@@ -520,6 +516,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@@ -528,6 +525,23 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
@@ -536,7 +550,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.50",
]
[[package]]
@@ -557,9 +571,13 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
@@ -608,9 +626,9 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermit-abi"
version = "0.3.4"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f"
checksum = "bd5256b483761cd23699d0da46cc6fd2ee3be420bbe6d020ae4a091e70b7e9fd"
[[package]]
name = "http"
@@ -666,9 +684,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "1.1.0"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75"
checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a"
dependencies = [
"bytes",
"futures-channel",
@@ -680,6 +698,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"smallvec",
"tokio",
]
@@ -701,9 +720,9 @@ dependencies = [
[[package]]
name = "iana-time-zone"
version = "0.1.59"
version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539"
checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141"
dependencies = [
"android_system_properties",
"core-foundation-sys",
@@ -749,9 +768,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.2.2"
version = "2.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520"
checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177"
dependencies = [
"equivalent",
"hashbrown",
@@ -759,12 +778,12 @@ dependencies = [
[[package]]
name = "is-terminal"
version = "0.4.10"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455"
checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b"
dependencies = [
"hermit-abi",
"rustix",
"libc",
"windows-sys 0.52.0",
]
@@ -776,9 +795,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c"
[[package]]
name = "js-sys"
version = "0.3.67"
version = "0.3.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1"
checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee"
dependencies = [
"wasm-bindgen",
]
@@ -807,12 +826,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
[[package]]
name = "lock_api"
version = "0.4.11"
@@ -859,9 +872,9 @@ dependencies = [
[[package]]
name = "miniz_oxide"
version = "0.7.1"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7"
dependencies = [
"adler",
]
@@ -879,9 +892,9 @@ dependencies = [
[[package]]
name = "num-traits"
version = "0.2.17"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c"
checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a"
dependencies = [
"autocfg",
]
@@ -945,6 +958,21 @@ dependencies = [
"thiserror",
]
[[package]]
name = "pcap-file-tokio"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ee4f08e375f9aabbb17f4c031a2f0af1397835ce8d7909b167ada1dd8b572e6"
dependencies = [
"async-trait",
"byteorder",
"derive-into-owned",
"pcap-file",
"thiserror",
"tokio",
"tokio-byteorder",
]
[[package]]
name = "percent-encoding"
version = "2.3.1"
@@ -968,7 +996,7 @@ checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.50",
]
[[package]]
@@ -985,9 +1013,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb"
checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
[[package]]
name = "proc-macro-crate"
@@ -1060,11 +1088,14 @@ dependencies = [
"crc",
"deku",
"env_logger",
"futures",
"futures-core",
"libc",
"log",
"pcap-file",
"pcap-file-tokio",
"telcom-parser",
"thiserror",
"tokio",
]
[[package]]
@@ -1074,6 +1105,7 @@ dependencies = [
"axum",
"chrono",
"env_logger",
"futures",
"futures-core",
"futures-macro",
"include_dir",
@@ -1084,6 +1116,7 @@ dependencies = [
"tempdir",
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
"toml",
]
@@ -1103,7 +1136,7 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [
"bitflags 1.3.2",
"bitflags",
]
[[package]]
@@ -1164,19 +1197,6 @@ version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
version = "0.38.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca"
dependencies = [
"bitflags 2.4.2",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.52.0",
]
[[package]]
name = "rustversion"
version = "1.0.14"
@@ -1185,9 +1205,9 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4"
[[package]]
name = "ryu"
version = "1.0.16"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]]
name = "scopeguard"
@@ -1197,29 +1217,29 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "serde"
version = "1.0.196"
version = "1.0.197"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32"
checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.196"
version = "1.0.197"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67"
checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.50",
]
[[package]]
name = "serde_json"
version = "1.0.113"
version = "1.0.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79"
checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0"
dependencies = [
"itoa",
"ryu",
@@ -1323,9 +1343,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.48"
version = "2.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f"
checksum = "74f1bdc9872430ce9b75da68329d1c1746faf50ffac5f19e02b71e37ff881ffb"
dependencies = [
"proc-macro2",
"quote",
@@ -1378,29 +1398,29 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.56"
version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad"
checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.56"
version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471"
checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.50",
]
[[package]]
name = "tokio"
version = "1.35.1"
version = "1.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104"
checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
dependencies = [
"backtrace",
"bytes",
@@ -1415,6 +1435,16 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-byteorder"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cf347e8ae1d1ffd16c8aed569172a71bd81098a001d0f4964d476c0097aba4a"
dependencies = [
"byteorder",
"tokio",
]
[[package]]
name = "tokio-macros"
version = "2.2.0"
@@ -1423,7 +1453,18 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.50",
]
[[package]]
name = "tokio-stream"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
@@ -1444,14 +1485,14 @@ dependencies = [
[[package]]
name = "toml"
version = "0.8.9"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6a4b9e8023eb94392d3dca65d717c53abc5dad49c07cb65bb8fcd87115fa325"
checksum = "9a9aad4a3066010876e8dcf5a8a06e70a558751117a145c6ce2b82c2e2054290"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit 0.21.1",
"toml_edit 0.22.6",
]
[[package]]
@@ -1471,20 +1512,20 @@ checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
dependencies = [
"indexmap",
"toml_datetime",
"winnow",
"winnow 0.5.40",
]
[[package]]
name = "toml_edit"
version = "0.21.1"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1"
checksum = "2c1b5fd4128cc8d3e0cb74d4ed9a9cc7c7284becd4df68f5f940e1ad123606f6"
dependencies = [
"indexmap",
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
"winnow 0.6.2",
]
[[package]]
@@ -1582,9 +1623,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.90"
version = "0.2.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406"
checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
@@ -1592,24 +1633,24 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.90"
version = "0.2.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd"
checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.50",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.90"
version = "0.2.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999"
checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -1617,22 +1658,22 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.90"
version = "0.2.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7"
checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.50",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.90"
version = "0.2.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b"
checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838"
[[package]]
name = "winapi"
@@ -1671,7 +1712,7 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets 0.52.0",
"windows-targets 0.52.3",
]
[[package]]
@@ -1689,7 +1730,7 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.0",
"windows-targets 0.52.3",
]
[[package]]
@@ -1709,17 +1750,17 @@ dependencies = [
[[package]]
name = "windows-targets"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd"
checksum = "d380ba1dc7187569a8a9e91ed34b8ccfc33123bbacb8c0aed2d1ad7f3ef2dc5f"
dependencies = [
"windows_aarch64_gnullvm 0.52.0",
"windows_aarch64_msvc 0.52.0",
"windows_i686_gnu 0.52.0",
"windows_i686_msvc 0.52.0",
"windows_x86_64_gnu 0.52.0",
"windows_x86_64_gnullvm 0.52.0",
"windows_x86_64_msvc 0.52.0",
"windows_aarch64_gnullvm 0.52.3",
"windows_aarch64_msvc 0.52.3",
"windows_i686_gnu 0.52.3",
"windows_i686_msvc 0.52.3",
"windows_x86_64_gnu 0.52.3",
"windows_x86_64_gnullvm 0.52.3",
"windows_x86_64_msvc 0.52.3",
]
[[package]]
@@ -1730,9 +1771,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea"
checksum = "68e5dcfb9413f53afd9c8f86e56a7b4d86d9a2fa26090ea2dc9e40fba56c6ec6"
[[package]]
name = "windows_aarch64_msvc"
@@ -1742,9 +1783,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef"
checksum = "8dab469ebbc45798319e69eebf92308e541ce46760b49b18c6b3fe5e8965b30f"
[[package]]
name = "windows_i686_gnu"
@@ -1754,9 +1795,9 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]]
name = "windows_i686_gnu"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313"
checksum = "2a4e9b6a7cac734a8b4138a4e1044eac3404d8326b6c0f939276560687a033fb"
[[package]]
name = "windows_i686_msvc"
@@ -1766,9 +1807,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]]
name = "windows_i686_msvc"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a"
checksum = "28b0ec9c422ca95ff34a78755cfa6ad4a51371da2a5ace67500cf7ca5f232c58"
[[package]]
name = "windows_x86_64_gnu"
@@ -1778,9 +1819,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd"
checksum = "704131571ba93e89d7cd43482277d6632589b18ecf4468f591fbae0a8b101614"
[[package]]
name = "windows_x86_64_gnullvm"
@@ -1790,9 +1831,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e"
checksum = "42079295511643151e98d61c38c0acc444e52dd42ab456f7ccfd5152e8ecf21c"
[[package]]
name = "windows_x86_64_msvc"
@@ -1802,15 +1843,24 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04"
checksum = "0770833d60a970638e989b3fa9fd2bb1aaadcf88963d1659fd7d9990196ed2d6"
[[package]]
name = "winnow"
version = "0.5.36"
version = "0.5.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "818ce546a11a9986bc24f93d0cdf38a8a1a400f1473ea8c82e59f6e0ffab9249"
checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876"
dependencies = [
"memchr",
]
[[package]]
name = "winnow"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a4191c47f15cc3ec71fcb4913cb83d58def65dd3787610213c649283b5ce178"
dependencies = [
"memchr",
]
@@ -1841,5 +1891,5 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.50",
]

View File

@@ -23,3 +23,5 @@ include_dir = "0.7.3"
mime_guess = "2.0.4"
tempdir = "0.3.7"
chrono = { version = "0.4.31", features = ["serde"] }
tokio-stream = "0.1.14"
futures = "0.3.30"

View File

@@ -1,110 +1,101 @@
use std::pin::pin;
use std::sync::Arc;
use axum::extract::State;
use axum::http::StatusCode;
use rayhunter::diag::DataType;
use rayhunter::diag_device::DiagDevice;
use rayhunter::diag_reader::DiagReader;
use tokio::sync::RwLock;
use tokio::sync::mpsc::{Receiver, self};
use tokio::sync::mpsc::Receiver;
use rayhunter::qmdl::QmdlWriter;
use log::{debug, info};
use tokio::sync::mpsc::error::TryRecvError;
use tokio::task::JoinHandle;
use log::{debug, error, info};
use tokio::fs::File;
use tokio_util::task::TaskTracker;
use futures::{StreamExt, TryStreamExt};
use crate::error::RayhunterError;
use crate::qmdl_store::QmdlStore;
use crate::server::ServerState;
pub enum DiagDeviceCtrlMessage {
StopRecording,
StartRecording(QmdlWriter<std::fs::File>),
StartRecording(QmdlWriter<File>),
Exit,
}
pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>, qmdl_store_lock: Arc<RwLock<QmdlStore>>) -> JoinHandle<Result<(), RayhunterError>> {
// mpsc channel for updating QmdlStore entry filesizes. First usize is the
// index, second is the size in bytes
let (tx, mut rx) = mpsc::channel::<(usize, usize)>(1);
// Spawn a thread to monitor the (usize, usize) channel for updates,
// triggering QmdlStore updates
let qmdl_store_lock_clone = qmdl_store_lock.clone();
pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>, qmdl_store_lock: Arc<RwLock<QmdlStore>>) {
task_tracker.spawn(async move {
while let Some((entry_idx, new_size)) = rx.recv().await {
let mut qmdl_store = qmdl_store_lock_clone.write().await;
qmdl_store.update_entry_size(entry_idx, new_size).await
.expect("failed to update qmdl file size");
}
info!("QMDL store size updater thread exiting...");
});
// Spawn a thread to drive the DiagDevice reading loop. Since DiagDevice
// works via synchronous I/O, we have to spawn a "blocking" thread to avoid
// gumming up tokio's event loop.
task_tracker.spawn_blocking(move || {
let initial_file = qmdl_store_lock.write().await.new_entry().await.expect("failed creating QMDL file entry");
let mut qmdl_writer: Option<QmdlWriter<File>> = Some(QmdlWriter::new(initial_file));
let mut diag_stream = pin!(dev.as_stream().into_stream());
loop {
// First check if we've gotten any control meesages
match qmdl_file_rx.try_recv() {
Ok(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)) => {
dev.qmdl_writer = Some(qmdl_writer);
},
Ok(DiagDeviceCtrlMessage::StopRecording) => dev.qmdl_writer = None,
// Disconnected means all the Senders have been dropped, so it's
// time to go
Ok(DiagDeviceCtrlMessage::Exit) | Err(TryRecvError::Disconnected) => {
info!("Diag reader thread exiting...");
return Ok(())
},
// empty just means there's no message for us, so continue as normal
Err(TryRecvError::Empty) => {},
}
// remember the QmdlStore current entry index so we can update its size later
let qmdl_store_index = qmdl_store_lock.blocking_read().current_entry;
// TODO: once we're actually doing analysis, we'll wanna use the messages
// returned here. Until then, the DiagDevice has already written those messages
// to the QMDL file, so we can just ignore them.
debug!("reading response from diag device...");
let _messages = dev.read_response().map_err(RayhunterError::DiagReadError)?;
debug!("got diag response ({} messages)", _messages.len());
// keep track of how many bytes were written to the QMDL file so we can read
// a valid block of data from it in the HTTP server
if let Some(qmdl_writer) = dev.qmdl_writer.as_ref() {
debug!("total QMDL bytes written: {}, sending update...", qmdl_writer.total_written);
let index = qmdl_store_index.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
tx.blocking_send((index, qmdl_writer.total_written)).unwrap();
debug!("done!");
} else {
debug!("no qmdl_writer set, continuing...");
tokio::select! {
msg = qmdl_file_rx.recv() => {
match msg {
Some(DiagDeviceCtrlMessage::StartRecording(new_writer)) => {
qmdl_writer = Some(new_writer);
},
Some(DiagDeviceCtrlMessage::StopRecording) => qmdl_writer = None,
// None means all the Senders have been dropped, so it's
// time to go
Some(DiagDeviceCtrlMessage::Exit) | None => {
info!("Diag reader thread exiting...");
return Ok(())
},
}
}
maybe_container = diag_stream.next() => {
match maybe_container.unwrap() {
Ok(container) => {
if container.data_type != DataType::UserSpace {
debug!("skipping non-userspace diag messages...");
continue;
}
// keep track of how many bytes were written to the QMDL file so we can read
// a valid block of data from it in the HTTP server
if let Some(writer) = qmdl_writer.as_mut() {
writer.write_container(&container).await.expect("failed to write to QMDL writer");
debug!("total QMDL bytes written: {}, updating manifest...", writer.total_written);
let mut qmdl_store = qmdl_store_lock.write().await;
let index = qmdl_store.current_entry.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
qmdl_store.update_entry(index, writer.total_written).await
.expect("failed to update qmdl file size");
debug!("done!");
} else {
debug!("no qmdl_writer set, continuing...");
}
},
Err(err) => {
error!("error reading diag device: {}", err);
return Err(err);
}
}
}
}
}
})
});
}
pub async fn start_recording(State(state): State<Arc<ServerState>>) -> Result<(StatusCode, String), (StatusCode, String)> {
if state.readonly_mode {
return Err((StatusCode::FORBIDDEN, format!("server is in readonly mode")));
return Err((StatusCode::FORBIDDEN, "server is in readonly mode".to_string()));
}
let mut qmdl_store = state.qmdl_store_lock.write().await;
let qmdl_file = qmdl_store.new_entry().await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't create new qmdl entry: {}", e)))?;
let qmdl_writer = QmdlWriter::new(qmdl_file.into_std().await);
let qmdl_writer = QmdlWriter::new(qmdl_file);
state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?;
Ok((StatusCode::ACCEPTED, format!("ok")))
Ok((StatusCode::ACCEPTED, "ok".to_string()))
}
pub async fn stop_recording(State(state): State<Arc<ServerState>>) -> Result<(StatusCode, String), (StatusCode, String)> {
if state.readonly_mode {
return Err((StatusCode::FORBIDDEN, format!("server is in readonly mode")));
return Err((StatusCode::FORBIDDEN, "server is in readonly mode".to_string()));
}
let mut qmdl_store = state.qmdl_store_lock.write().await;
qmdl_store.close_current_entry().await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't close current qmdl entry: {}", e)))?;
state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StopRecording).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?;
Ok((StatusCode::ACCEPTED, format!("ok")))
Ok((StatusCode::ACCEPTED, "ok".to_string()))
}

View File

@@ -9,8 +9,6 @@ pub enum RayhunterError{
ConfigFileParsingError(#[from] toml::de::Error),
#[error("Diag intialization error: {0}")]
DiagInitError(DiagDeviceError),
#[error("Diag read error: {0}")]
DiagReadError(DiagDeviceError),
#[error("Tokio error: {0}")]
TokioError(#[from] tokio::io::Error),
#[error("QmdlStore error: {0}")]

View File

@@ -20,7 +20,6 @@ use log::{info, error};
use rayhunter::diag_device::DiagDevice;
use axum::routing::{get, post};
use axum::Router;
use rayhunter::qmdl::QmdlWriter;
use stats::get_qmdl_manifest;
use tokio::sync::mpsc::{self, Sender};
use tokio::task::JoinHandle;
@@ -127,11 +126,9 @@ async fn main() -> Result<(), RayhunterError> {
let qmdl_store_lock = Arc::new(RwLock::new(init_qmdl_store(&config).await?));
let (tx, rx) = mpsc::channel::<DiagDeviceCtrlMessage>(1);
if !config.readonly_mode {
let qmdl_file = qmdl_store_lock.write().await.new_entry().await?;
let qmdl_writer = QmdlWriter::new(qmdl_file.into_std().await);
let mut dev = DiagDevice::new(Some(qmdl_writer))
let mut dev = DiagDevice::new().await
.map_err(RayhunterError::DiagInitError)?;
dev.config_logs()
dev.config_logs().await
.map_err(RayhunterError::DiagInitError)?;
run_diag_read_thread(&task_tracker, dev, rx, qmdl_store_lock.clone());

View File

@@ -1,26 +1,24 @@
use crate::ServerState;
use rayhunter::diag::DataType;
use rayhunter::gsmtap_parser::GsmtapParser;
use rayhunter::pcap::GsmtapPcapWriter;
use rayhunter::qmdl::{QmdlReader, QmdlReaderError};
use rayhunter::diag_reader::DiagReader;
use rayhunter::qmdl::QmdlReader;
use axum::body::Body;
use axum::http::header::CONTENT_TYPE;
use axum::extract::{State, Path};
use axum::http::StatusCode;
use axum::response::{Response, IntoResponse};
use std::io::Write;
use std::pin::Pin;
use tokio::io::duplex;
use tokio_util::io::ReaderStream;
use std::{future, pin::pin};
use std::sync::Arc;
use std::task::{Poll, Context};
use futures_core::Stream;
use log::error;
use tokio::sync::mpsc;
use futures::TryStreamExt;
// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data
// written so far. This is done by spawning a blocking thread (a tokio thread
// capable of handling blocking operations) which streams chunks of pcap data to
// a channel that's piped to the client.
// written so far. This is done by spawning a thread which streams chunks of
// pcap data to a channel that's piped to the client.
pub async fn get_pcap(State(state): State<Arc<ServerState>>, Path(qmdl_name): Path<String>) -> Result<Response, (StatusCode, String)> {
let qmdl_store = state.qmdl_store_lock.read().await;
let entry = qmdl_store.entry_for_name(&qmdl_name)
@@ -31,82 +29,39 @@ pub async fn get_pcap(State(state): State<Arc<ServerState>>, Path(qmdl_name): Pa
"QMDL file is empty, try again in a bit!".to_string()
));
}
let qmdl_file = qmdl_store.open_entry(&entry).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?
.into_std().await;
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?;
// the QMDL reader should stop at the last successfully written data chunk
// (entry.size_bytes)
let mut gsmtap_parser = GsmtapParser::new();
let (reader, writer) = duplex(1024);
let mut pcap_writer = GsmtapPcapWriter::new(writer).await.unwrap();
pcap_writer.write_iface_header().await.unwrap();
let (tx, rx) = mpsc::channel(1);
let channel_reader = ChannelReader { rx };
let channel_writer = ChannelWriter { tx };
tokio::task::spawn_blocking(move || {
// the QMDL reader should stop at the last successfully written data
// chunk (qmdl_bytes_written)
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(entry.size_bytes));
let mut gsmtap_parser = GsmtapParser::new();
let mut pcap_writer = GsmtapPcapWriter::new(channel_writer).unwrap();
pcap_writer.write_iface_header().unwrap();
loop {
match qmdl_reader.read_response() {
Ok(messages) => {
for maybe_msg in messages {
match maybe_msg {
Ok(msg) => {
let maybe_gsmtap_msg = gsmtap_parser.recv_message(msg)
.expect("error parsing gsmtap message");
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
pcap_writer.write_gsmtap_message(gsmtap_msg, timestamp)
.expect("error writing pcap packet");
}
},
Err(e) => error!("error parsing message: {:?}", e),
tokio::spawn(async move {
let mut reader = QmdlReader::new(qmdl_file, Some(entry.size_bytes));
let mut messages_stream = pin!(reader.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));
while let Some(container) = messages_stream.try_next().await.expect("failed getting QMDL container") {
for maybe_msg in container.into_messages() {
match maybe_msg {
Ok(msg) => {
let maybe_gsmtap_msg = gsmtap_parser.recv_message(msg)
.expect("error parsing gsmtap message");
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
pcap_writer.write_gsmtap_message(gsmtap_msg, timestamp).await
.expect("error writing pcap packet");
}
}
},
// this is expected, and just means we've reached the end of the
// safely written QMDL data
Err(QmdlReaderError::MaxBytesReached(_)) => break,
Err(e) => {
error!("error reading qmdl file: {:?}", e);
break;
},
},
Err(e) => error!("error parsing message: {:?}", e),
}
}
}
});
let headers = [(CONTENT_TYPE, "application/vnd.tcpdump.pcap")];
let body = Body::from_stream(channel_reader);
let body = Body::from_stream(ReaderStream::new(reader));
Ok((headers, body).into_response())
}
struct ChannelWriter {
tx: mpsc::Sender<Vec<u8>>,
}
impl Write for ChannelWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.tx.blocking_send(buf.to_vec())
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "channel closed"))?;
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
struct ChannelReader {
rx: mpsc::Receiver<Vec<u8>>,
}
impl Stream for ChannelReader {
type Item = Result<Vec<u8>, String>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.rx.poll_recv(cx) {
Poll::Ready(Some(msg)) => Poll::Ready(Some(Ok(msg))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

View File

@@ -37,7 +37,7 @@ pub struct Manifest {
pub struct ManifestEntry {
pub name: String,
pub start_time: DateTime<Local>,
pub end_time: Option<DateTime<Local>>,
pub last_message_time: Option<DateTime<Local>>,
pub size_bytes: usize,
}
@@ -47,7 +47,7 @@ impl ManifestEntry {
ManifestEntry {
name: format!("{}", now.timestamp()),
start_time: now,
end_time: None,
last_message_time: None,
size_bytes: 0,
}
}
@@ -128,18 +128,21 @@ impl QmdlStore {
.map_err(QmdlStoreError::ReadFileError)
}
// Sets the current entry's end_time, updates the manifest, and unsets the
// current entry
// Unsets the current entry
pub async fn close_current_entry(&mut self) -> Result<(), QmdlStoreError> {
let entry_index = self.current_entry.take()
.ok_or(QmdlStoreError::NoCurrentEntry)?;
self.manifest.entries[entry_index].end_time = Some(Local::now());
self.write_manifest().await
match self.current_entry {
Some(_) => {
self.current_entry = None;
Ok(())
},
None => Err(QmdlStoreError::NoCurrentEntry)
}
}
// Sets the given entry's size, updating the manifest
pub async fn update_entry_size(&mut self, entry_index: usize, size_bytes: usize) -> Result<(), QmdlStoreError> {
// Sets the given entry's size and updates the last_message_time to now, updating the manifest
pub async fn update_entry(&mut self, entry_index: usize, size_bytes: usize) -> Result<(), QmdlStoreError> {
self.manifest.entries[entry_index].size_bytes = size_bytes;
self.manifest.entries[entry_index].last_message_time = Some(Local::now());
self.write_manifest().await
}
@@ -185,17 +188,15 @@ mod tests {
let _ = store.new_entry().await.unwrap();
let entry_index = store.current_entry.unwrap();
assert_eq!(QmdlStore::read_manifest(dir.path()).await.unwrap(), store.manifest);
assert!(store.manifest.entries[entry_index].last_message_time.is_none());
store.update_entry_size(entry_index, 1000).await.unwrap();
store.update_entry(entry_index, 1000).await.unwrap();
let entry = store.entry_for_name(&store.manifest.entries[entry_index].name).unwrap();
assert!(entry.last_message_time.is_some());
assert_eq!(store.manifest.entries[entry_index].size_bytes, 1000);
assert_eq!(QmdlStore::read_manifest(dir.path()).await.unwrap(), store.manifest);
assert!(store.manifest.entries[entry_index].end_time.is_none());
store.close_current_entry().await.unwrap();
let entry = store.entry_for_name(&store.manifest.entries[entry_index].name).unwrap();
assert!(entry.end_time.is_some());
assert_eq!(QmdlStore::read_manifest(dir.path()).await.unwrap(), store.manifest);
assert!(matches!(store.close_current_entry().await, Err(QmdlStoreError::NoCurrentEntry)));
}

View File

@@ -103,10 +103,10 @@ pub async fn get_system_stats(State(state): State<Arc<ServerState>>) -> Result<J
Ok(stats) => Ok(Json(stats)),
Err(err) => {
error!("error getting system stats: {}", err);
return Err((
Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error getting system stats".to_string()
));
))
},
}
}

View File

@@ -23,7 +23,7 @@
<tr>
<th scope="col">Name</th>
<th scope="col">Date Started</th>
<th scope="col">Date Stopped</th>
<th scope="col">Date of Last Message</th>
<th scope="col">Size (bytes)</th>
<th scope="col">PCAP</th>
<th scope="col">QMDL</th>

View File

@@ -29,7 +29,7 @@ function createEntryRow(entry) {
name.scope = 'row';
name.innerText = entry.name;
row.appendChild(name);
for (const key of ['start_time', 'end_time', 'size_bytes']) {
for (const key of ['start_time', 'last_message_time', 'size_bytes']) {
const td = document.createElement('td');
td.innerText = entry[key];
row.appendChild(td);
@@ -57,10 +57,15 @@ async function getQmdlManifest() {
const manifest = JSON.parse(await req('GET', '/api/qmdl-manifest'));
if (manifest.current_entry) {
manifest.current_entry.start_time = new Date(manifest.current_entry.start_time);
if (manifest.current_entry.last_message_time === undefined) {
manifest.current_entry.last_message_time = "N/A";
} else {
manifest.current_entry.last_message_time = new Date(manifest.current_entry.last_message_time);
}
}
for (entry of manifest.entries) {
entry.start_time = new Date(entry.start_time);
entry.end_time = new Date(entry.end_time);
entry.last_message_time = new Date(entry.last_message_time);
}
// sort them in reverse chronological order
manifest.entries.reverse();

View File

@@ -12,6 +12,9 @@ deku = { version = "0.16.0", features = ["logging"] }
env_logger = "0.10.1"
libc = "0.2.150"
log = "0.4.20"
pcap-file = "2.0.0"
pcap-file-tokio = "0.1.0"
thiserror = "1.0.50"
telcom-parser = { path = "../telcom-parser" }
tokio = { version = "1.35.1", features = ["full"] }
futures-core = "0.3.30"
futures = "0.3.30"

View File

@@ -10,12 +10,10 @@ pub struct LteSib7DowngradeAnalyzer {
impl LteSib7DowngradeAnalyzer {
fn unpack_system_information<'a>(&self, ie: &'a InformationElement) -> Option<&'a SystemInformation_r8_IEsSib_TypeAndInfo> {
if let InformationElement::LTE(message) = ie {
if let LteInformationElement::BcchDlSch(bcch_dl_sch_message) = message {
if let BCCH_DL_SCH_MessageType::C1(BCCH_DL_SCH_MessageType_c1::SystemInformation(system_information)) = &bcch_dl_sch_message.message {
if let SystemInformationCriticalExtensions::SystemInformation_r8(sib) = &system_information.critical_extensions {
return Some(&sib.sib_type_and_info);
}
if let InformationElement::LTE(LteInformationElement::BcchDlSch(bcch_dl_sch_message)) = ie {
if let BCCH_DL_SCH_MessageType::C1(BCCH_DL_SCH_MessageType_c1::SystemInformation(system_information)) = &bcch_dl_sch_message.message {
if let SystemInformationCriticalExtensions::SystemInformation_r8(sib) = &system_information.critical_extensions {
return Some(&sib.sib_type_and_info);
}
}
}

View File

@@ -1,8 +1,13 @@
//! Diag protocol serialization/deserialization
use chrono::{DateTime, FixedOffset};
use crc::{Algorithm, Crc};
use deku::prelude::*;
use crate::hdlc::{self, hdlc_decapsulate};
use log::{warn, error};
use thiserror::Error;
pub const MESSAGE_TERMINATOR: u8 = 0x7e;
pub const MESSAGE_ESCAPE_CHAR: u8 = 0x7d;
@@ -49,6 +54,28 @@ pub enum DataType {
Other(u32),
}
#[derive(Debug, Clone, PartialEq, Error)]
pub enum DiagParsingError {
#[error("Failed to parse Message: {0}, data: {1:?}")]
MessageParsingError(deku::DekuError, Vec<u8>),
#[error("HDLC decapsulation of message failed: {0}, data: {1:?}")]
HdlcDecapsulationError(hdlc::HdlcError, Vec<u8>),
}
// this is sorta based on the params qcsuper uses, plus what seems to be used in
// https://github.com/fgsect/scat/blob/f1538b397721df3ab8ba12acd26716abcf21f78b/util.py#L47
pub const CRC_CCITT_ALG: Algorithm<u16> = Algorithm {
poly: 0x1021,
init: 0xffff,
refin: true,
refout: true,
width: 16,
xorout: 0xffff,
check: 0x2189,
residue: 0x0000,
};
pub const CRC_CCITT: Crc<u16> = Crc::<u16>::new(&CRC_CCITT_ALG);
#[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)]
pub struct MessagesContainer {
pub data_type: DataType,
@@ -57,6 +84,29 @@ pub struct MessagesContainer {
pub messages: Vec<HdlcEncapsulatedMessage>,
}
impl MessagesContainer {
pub fn into_messages(self) -> Vec<Result<Message, DiagParsingError>> {
let mut result = Vec::new();
for msg in self.messages {
for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) {
match hdlc_decapsulate(sub_msg, &CRC_CCITT) {
Ok(data) => match Message::from_bytes((&data, 0)) {
Ok(((leftover_bytes, _), res)) => {
if !leftover_bytes.is_empty() {
warn!("warning: {} leftover bytes when parsing Message", leftover_bytes.len());
}
result.push(Ok(res));
},
Err(e) => result.push(Err(DiagParsingError::MessageParsingError(e, data))),
},
Err(err) => result.push(Err(DiagParsingError::HdlcDecapsulationError(err, sub_msg.to_vec()))),
}
}
}
result
}
}
#[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)]
pub struct HdlcEncapsulatedMessage {
pub len: u32,
@@ -431,4 +481,99 @@ mod test {
},
});
}
fn make_container(data_type: DataType, message: HdlcEncapsulatedMessage) -> MessagesContainer {
MessagesContainer {
data_type,
num_messages: 1,
messages: vec![message],
}
}
// this log is based on one captured on a real device -- if it fails to
// serialize or deserialize, that's probably a problem with this mock, not
// the DiagReader implementation
fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) {
let length_with_payload = 31 + payload.len() as u16;
let message = Message::Log {
pending_msgs: 0,
outer_length: length_with_payload,
inner_length: length_with_payload,
log_type: 0xb0c0,
timestamp: Timestamp { ts: 72659535985485082 },
body: LogBody::LteRrcOtaMessage {
ext_header_version: 20,
packet: LteRrcOtaPacket::V8 {
rrc_rel_maj: 14,
rrc_rel_min: 48,
bearer_id: 0,
phy_cell_id: 160,
earfcn: 2050,
sfn_subfn: 4057,
pdu_num: 5,
sib_mask: 0,
len: payload.len() as u16,
packet: payload.to_vec(),
},
},
};
let serialized = message.to_bytes().expect("failed to serialize test message");
let encapsulated_data = hdlc::hdlc_encapsulate(&serialized, &CRC_CCITT);
let encapsulated = HdlcEncapsulatedMessage {
len: encapsulated_data.len() as u32,
data: encapsulated_data,
};
(encapsulated, message)
}
#[test]
fn test_containers_with_multiple_messages() {
let (encapsulated1, message1) = get_test_message(&[1]);
let (encapsulated2, message2) = get_test_message(&[2]);
let mut container = make_container(DataType::UserSpace, encapsulated1);
container.messages.push(encapsulated2);
container.num_messages += 1;
assert_eq!(container.into_messages(), vec![Ok(message1), Ok(message2)]);
}
#[test]
fn test_containers_with_concatenated_message() {
let (mut encapsulated1, message1) = get_test_message(&[1]);
let (encapsulated2, message2) = get_test_message(&[2]);
encapsulated1.data.extend(encapsulated2.data);
encapsulated1.len += encapsulated2.len;
let container = make_container(DataType::UserSpace, encapsulated1);
assert_eq!(container.into_messages(), vec![Ok(message1), Ok(message2)]);
}
#[test]
fn test_handles_parsing_errors() {
let (encapsulated1, message1) = get_test_message(&[1]);
let bad_message = hdlc::hdlc_encapsulate(&[0x01, 0x02, 0x03, 0x04], &CRC_CCITT);
let encapsulated2 = HdlcEncapsulatedMessage {
len: bad_message.len() as u32,
data: bad_message,
};
let mut container = make_container(DataType::UserSpace, encapsulated1);
container.messages.push(encapsulated2);
container.num_messages += 1;
let result = container.into_messages();
assert_eq!(result[0], Ok(message1));
assert!(matches!(result[1], Err(DiagParsingError::MessageParsingError(_, _))));
}
#[test]
fn test_handles_encapsulation_errors() {
let (encapsulated1, message1) = get_test_message(&[1]);
let bad_encapsulation = HdlcEncapsulatedMessage {
len: 4,
data: vec![0x01, 0x02, 0x03, 0x04],
};
let mut container = make_container(DataType::UserSpace, encapsulated1);
container.messages.push(bad_encapsulation);
container.num_messages += 1;
let result = container.into_messages();
assert_eq!(result[0], Ok(message1));
assert!(matches!(result[1], Err(DiagParsingError::HdlcDecapsulationError(_, _))));
}
}

View File

@@ -1,15 +1,15 @@
use crate::hdlc::hdlc_encapsulate;
use crate::diag::{Message, ResponsePayload, Request, LogConfigRequest, LogConfigResponse, build_log_mask_request, RequestContainer, DataType, MessagesContainer};
use crate::diag_reader::{DiagReader, CRC_CCITT};
use crate::qmdl::QmdlWriter;
use crate::diag::{build_log_mask_request, DataType, DiagParsingError, LogConfigRequest, LogConfigResponse, Message, MessagesContainer, Request, RequestContainer, ResponsePayload, CRC_CCITT};
use crate::log_codes;
use std::fs::File;
use std::io::Read;
use std::io::ErrorKind;
use std::os::fd::AsRawFd;
use futures_core::TryStream;
use thiserror::Error;
use log::{info, warn, error};
use deku::prelude::*;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
pub type DiagResult<T> = Result<T, DiagDeviceError>;
@@ -20,7 +20,7 @@ pub enum DiagDeviceError {
#[error("Failed to read diag device: {0}")]
DeviceReadFailed(std::io::Error),
#[error("Failed to write diag device: {0}")]
DeviceWriteFailed(String),
DeviceWriteFailed(std::io::Error),
#[error("Nonzero status code {0} for diag request: {1:?}")]
RequestFailed(u32, Request),
#[error("Didn't receive response for request: {0:?}")]
@@ -71,43 +71,17 @@ const DIAG_IOCTL_SWITCH_LOGGING: u64 = 7;
pub struct DiagDevice {
file: File,
pub qmdl_writer: Option<QmdlWriter<File>>,
fully_initialized: bool,
read_buf: Vec<u8>,
use_mdm: i32,
}
impl DiagReader for DiagDevice {
type Err = DiagDeviceError;
fn get_next_messages_container(&mut self) -> DiagResult<MessagesContainer> {
let mut bytes_read = 0;
while bytes_read == 0 {
bytes_read = self.file.read(&mut self.read_buf)
.map_err(DiagDeviceError::DeviceReadFailed)?;
}
let ((leftover_bytes, _), container) = MessagesContainer::from_bytes((&self.read_buf[0..bytes_read], 0))
.map_err(DiagDeviceError::ParseMessagesContainerError)?;
if !leftover_bytes.is_empty() {
warn!("warning: {} leftover bytes when parsing MessagesContainer", leftover_bytes.len());
}
if let Some(qmdl_writer) = self.qmdl_writer.as_mut() {
if self.fully_initialized {
qmdl_writer.write_container(&container)
.map_err(DiagDeviceError::QmdlFileWriteError)?;
}
}
Ok(container)
}
}
impl DiagDevice {
pub fn new(qmdl_writer: Option<QmdlWriter<File>>) -> DiagResult<Self> {
let diag_file = std::fs::File::options()
pub async fn new() -> DiagResult<Self> {
let diag_file = File::options()
.read(true)
.write(true)
.open("/dev/diag")
.await
.map_err(DiagDeviceError::OpenDiagDeviceError)?;
let fd = diag_file.as_raw_fd();
@@ -117,13 +91,32 @@ impl DiagDevice {
Ok(DiagDevice {
read_buf: vec![0; BUFFER_LEN],
file: diag_file,
fully_initialized: false,
qmdl_writer,
use_mdm,
})
}
pub fn write_request(&mut self, req: &Request) -> DiagResult<()> {
pub fn as_stream(&mut self) -> impl TryStream<Ok = MessagesContainer, Error = DiagDeviceError> + '_ {
futures::stream::try_unfold(self, |dev| async {
let container = dev.get_next_messages_container().await?;
Ok(Some((container, dev)))
})
}
async fn get_next_messages_container(&mut self) -> Result<MessagesContainer, DiagDeviceError> {
let mut bytes_read = 0;
while bytes_read == 0 {
bytes_read = self.file.read(&mut self.read_buf).await
.map_err(DiagDeviceError::DeviceReadFailed)?;
}
let ((leftover_bytes, _), container) = MessagesContainer::from_bytes((&self.read_buf[0..bytes_read], 0))
.map_err(DiagDeviceError::ParseMessagesContainerError)?;
if !leftover_bytes.is_empty() {
warn!("warning: {} leftover bytes when parsing MessagesContainer", leftover_bytes.len());
}
Ok(container)
}
async fn write_request(&mut self, req: &Request) -> DiagResult<()> {
let req_bytes = &req.to_bytes().expect("Failed to serialize Request");
let buf = RequestContainer {
data_type: DataType::UserSpace,
@@ -131,23 +124,35 @@ impl DiagDevice {
mdm_field: -1,
hdlc_encapsulated_request: hdlc_encapsulate(req_bytes, &CRC_CCITT),
}.to_bytes().expect("Failed to serialize RequestContainer");
unsafe {
let fd = self.file.as_raw_fd();
let buf_ptr = buf.as_ptr() as *const libc::c_void;
let ret = libc::write(fd, buf_ptr, buf.len());
if ret < 0 {
let msg = format!("write failed with error code {}", ret);
return Err(DiagDeviceError::DeviceWriteFailed(msg));
if let Err(err) = self.file.write(&buf).await {
// For reasons I don't entirely understand, calls to write(2) on
// /dev/diag always return 0 bytes written, though the written
// requests end up being interpreted. As such, we're not concerned
// about WriteZero errors
if err.kind() != ErrorKind::WriteZero {
return Err(DiagDeviceError::DeviceWriteFailed(err));
}
}
self.file.flush().await
.map_err(DiagDeviceError::DeviceWriteFailed)?;
Ok(())
}
fn retrieve_id_ranges(&mut self) -> DiagResult<[u32; 16]> {
let req = Request::LogConfig(LogConfigRequest::RetrieveIdRanges);
self.write_request(&req)?;
async fn read_response(&mut self) -> DiagResult<Vec<Result<Message, DiagParsingError>>> {
loop {
let container = self.get_next_messages_container().await?;
if container.data_type != DataType::UserSpace {
continue;
}
return Ok(container.into_messages());
}
}
for msg in self.read_response()? {
async fn retrieve_id_ranges(&mut self) -> DiagResult<[u32; 16]> {
let req = Request::LogConfig(LogConfigRequest::RetrieveIdRanges);
self.write_request(&req).await?;
for msg in self.read_response().await? {
match msg {
Ok(Message::Log { .. }) => info!("skipping log response..."),
Ok(Message::Response { payload, status, .. }) => match payload {
@@ -166,11 +171,11 @@ impl DiagDevice {
Err(DiagDeviceError::NoResponse(req))
}
fn set_log_mask(&mut self, log_type: u32, log_mask_bitsize: u32) -> DiagResult<()> {
async fn set_log_mask(&mut self, log_type: u32, log_mask_bitsize: u32) -> DiagResult<()> {
let req = build_log_mask_request(log_type, log_mask_bitsize, &LOG_CODES_FOR_RAW_PACKET_LOGGING);
self.write_request(&req)?;
self.write_request(&req).await?;
for msg in self.read_response()? {
for msg in self.read_response().await? {
match msg {
Ok(Message::Log { .. }) => info!("skipping log response..."),
Ok(Message::Response { payload, status, .. }) => {
@@ -188,18 +193,17 @@ impl DiagDevice {
Err(DiagDeviceError::NoResponse(req))
}
pub fn config_logs(&mut self) -> DiagResult<()> {
pub async fn config_logs(&mut self) -> DiagResult<()> {
info!("retrieving diag logging capabilities...");
let log_mask_sizes = self.retrieve_id_ranges()?;
let log_mask_sizes = self.retrieve_id_ranges().await?;
for (log_type, &log_mask_bitsize) in log_mask_sizes.iter().enumerate() {
if log_mask_bitsize > 0 {
self.set_log_mask(log_type as u32, log_mask_bitsize)?;
self.set_log_mask(log_type as u32, log_mask_bitsize).await?;
info!("enabled logging for log type {}", log_type);
}
}
self.fully_initialized = true;
Ok(())
}
}

View File

@@ -1,223 +0,0 @@
use crate::diag;
use crate::{diag::*, hdlc::hdlc_decapsulate};
use crate::hdlc;
use crc::{Crc, Algorithm};
use deku::prelude::*;
use log::{info, warn, error};
use thiserror::Error;
// this is sorta based on the params qcsuper uses, plus what seems to be used in
// https://github.com/fgsect/scat/blob/f1538b397721df3ab8ba12acd26716abcf21f78b/util.py#L47
pub const CRC_CCITT_ALG: Algorithm<u16> = Algorithm {
poly: 0x1021,
init: 0xffff,
refin: true,
refout: true,
width: 16,
xorout: 0xffff,
check: 0x2189,
residue: 0x0000,
};
pub const CRC_CCITT: Crc<u16> = Crc::<u16>::new(&CRC_CCITT_ALG);
#[derive(Debug, PartialEq, Error)]
pub enum DiagParsingError {
#[error("Failed to parse Message: {0}, data: {1:?}")]
MessageParsingError(deku::DekuError, Vec<u8>),
#[error("HDLC decapsulation of message failed: {0}, data: {1:?}")]
HdlcDecapsulationError(hdlc::HdlcError, Vec<u8>),
}
type MaybeMessage = Result<Message, DiagParsingError>;
pub trait DiagReader {
type Err;
fn get_next_messages_container(&mut self) -> Result<MessagesContainer, Self::Err>;
fn read_response(&mut self) -> Result<Vec<MaybeMessage>, Self::Err> {
loop {
let container = self.get_next_messages_container()?;
if container.data_type == DataType::UserSpace {
return self.parse_response_container(container);
} else {
info!("skipping non-userspace message...")
}
}
}
fn parse_response_container(&self, container: MessagesContainer) -> Result<Vec<MaybeMessage>, Self::Err> {
let mut result = Vec::new();
for msg in container.messages {
for sub_msg in msg.data.split_inclusive(|&b| b == diag::MESSAGE_TERMINATOR) {
match hdlc_decapsulate(sub_msg, &CRC_CCITT) {
Ok(data) => match Message::from_bytes((&data, 0)) {
Ok(((leftover_bytes, _), res)) => {
if !leftover_bytes.is_empty() {
warn!("warning: {} leftover bytes when parsing Message", leftover_bytes.len());
}
result.push(Ok(res));
},
Err(e) => {
result.push(Err(DiagParsingError::MessageParsingError(e, data)));
},
},
Err(err) => {
result.push(Err(DiagParsingError::HdlcDecapsulationError(err, sub_msg.to_vec())));
}
}
}
}
Ok(result)
}
}
#[cfg(test)]
mod test {
use super::*;
struct MockReader {
containers: Vec<MessagesContainer>,
}
impl DiagReader for MockReader {
type Err = ();
fn get_next_messages_container(&mut self) -> Result<MessagesContainer, Self::Err> {
Ok(self.containers.remove(0))
}
}
fn make_container(data_type: DataType, message: HdlcEncapsulatedMessage) -> MessagesContainer {
MessagesContainer {
data_type,
num_messages: 1,
messages: vec![message],
}
}
// this log is based on one captured on a real device -- if it fails to
// serialize or deserialize, that's probably a problem with this mock, not
// the DiagReader implementation
fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) {
let length_with_payload = 31 + payload.len() as u16;
let message = Message::Log {
pending_msgs: 0,
outer_length: length_with_payload,
inner_length: length_with_payload,
log_type: 0xb0c0,
timestamp: Timestamp { ts: 72659535985485082 },
body: LogBody::LteRrcOtaMessage {
ext_header_version: 20,
packet: LteRrcOtaPacket::V8 {
rrc_rel_maj: 14,
rrc_rel_min: 48,
bearer_id: 0,
phy_cell_id: 160,
earfcn: 2050,
sfn_subfn: 4057,
pdu_num: 5,
sib_mask: 0,
len: payload.len() as u16,
packet: payload.to_vec(),
},
},
};
let serialized = message.to_bytes().expect("failed to serialize test message");
let encapsulated_data = hdlc::hdlc_encapsulate(&serialized, &CRC_CCITT);
let encapsulated = HdlcEncapsulatedMessage {
len: encapsulated_data.len() as u32,
data: encapsulated_data,
};
(encapsulated, message)
}
#[test]
fn test_skipping_nonuser_containers() {
let (encapsulated1, message1) = get_test_message(&[1]);
let (encapsulated2, _) = get_test_message(&[2]);
let (encapsulated3, message3) = get_test_message(&[3]);
let mut reader = MockReader {
containers: vec![
make_container(DataType::UserSpace, encapsulated1),
make_container(DataType::Other(0), encapsulated2),
make_container(DataType::UserSpace, encapsulated3),
],
};
assert_eq!(reader.read_response(), Ok(vec![Ok(message1)]));
assert_eq!(reader.read_response(), Ok(vec![Ok(message3)]));
}
#[test]
fn test_containers_with_multiple_messages() {
let (encapsulated1, message1) = get_test_message(&[1]);
let (encapsulated2, message2) = get_test_message(&[2]);
let mut container1 = make_container(DataType::UserSpace, encapsulated1);
container1.messages.push(encapsulated2);
container1.num_messages += 1;
let (encapsulated3, message3) = get_test_message(&[3]);
let mut reader = MockReader {
containers: vec![
container1,
make_container(DataType::UserSpace, encapsulated3),
],
};
assert_eq!(reader.read_response(), Ok(vec![Ok(message1), Ok(message2)]));
assert_eq!(reader.read_response(), Ok(vec![Ok(message3)]));
}
#[test]
fn test_containers_with_concatenated_message() {
let (mut encapsulated1, message1) = get_test_message(&[1]);
let (encapsulated2, message2) = get_test_message(&[2]);
encapsulated1.data.extend(encapsulated2.data);
encapsulated1.len += encapsulated2.len;
let (encapsulated3, message3) = get_test_message(&[3]);
let mut reader = MockReader {
containers: vec![
make_container(DataType::UserSpace, encapsulated1),
make_container(DataType::UserSpace, encapsulated3),
],
};
assert_eq!(reader.read_response(), Ok(vec![Ok(message1), Ok(message2)]));
assert_eq!(reader.read_response(), Ok(vec![Ok(message3)]));
}
#[test]
fn test_handles_parsing_errors() {
let (encapsulated1, message1) = get_test_message(&[1]);
let bad_message = hdlc::hdlc_encapsulate(&[0x01, 0x02, 0x03, 0x04], &CRC_CCITT);
let encapsulated2 = HdlcEncapsulatedMessage {
len: bad_message.len() as u32,
data: bad_message,
};
let mut container = make_container(DataType::UserSpace, encapsulated1);
container.messages.push(encapsulated2);
container.num_messages += 1;
let mut reader = MockReader {
containers: vec![container],
};
let result = reader.read_response().unwrap();
assert_eq!(result[0], Ok(message1));
assert!(matches!(result[1], Err(DiagParsingError::MessageParsingError(_, _))));
}
#[test]
fn test_handles_encapsulation_errors() {
let (encapsulated1, message1) = get_test_message(&[1]);
let bad_encapsulation = HdlcEncapsulatedMessage {
len: 4,
data: vec![0x01, 0x02, 0x03, 0x04],
};
let mut container = make_container(DataType::UserSpace, encapsulated1);
container.messages.push(bad_encapsulation);
container.num_messages += 1;
let mut reader = MockReader {
containers: vec![container],
};
let result = reader.read_response().unwrap();
assert_eq!(result[0], Ok(message1));
assert!(matches!(result[1], Err(DiagParsingError::HdlcDecapsulationError(_, _))));
}
}

View File

@@ -9,7 +9,7 @@ use thiserror::Error;
use crate::diag::{MESSAGE_ESCAPE_CHAR, MESSAGE_TERMINATOR, ESCAPED_MESSAGE_ESCAPE_CHAR, ESCAPED_MESSAGE_TERMINATOR};
#[derive(Debug, Error, PartialEq)]
#[derive(Debug, Clone, Error, PartialEq)]
pub enum HdlcError {
#[error("Invalid checksum (expected {0}, got {1})")]
InvalidChecksum(u16, u16),
@@ -89,7 +89,7 @@ mod tests {
#[test]
fn test_hdlc_encapsulate() {
let crc = Crc::<u16>::new(&crate::diag_reader::CRC_CCITT_ALG);
let crc = Crc::<u16>::new(&crate::diag::CRC_CCITT_ALG);
let data = vec![0x01, 0x02, 0x03, 0x04];
let expected = vec![1, 2, 3, 4, 145, 57, 126];
let encapsulated = hdlc_encapsulate(&data, &crc);

View File

@@ -1,7 +1,6 @@
pub mod hdlc;
pub mod diag;
pub mod diag_device;
pub mod diag_reader;
pub mod qmdl;
pub mod log_codes;
pub mod gsmtap;

View File

@@ -1,14 +1,14 @@
use crate::gsmtap::GsmtapMessage;
use crate::diag::Timestamp;
use std::io::Write;
use tokio::io::AsyncWrite;
use std::borrow::Cow;
use chrono::prelude::*;
use deku::prelude::*;
use pcap_file::pcapng::blocks::enhanced_packet::EnhancedPacketBlock;
use pcap_file::pcapng::blocks::interface_description::InterfaceDescriptionBlock;
use pcap_file::pcapng::PcapNgWriter;
use pcap_file::PcapError;
use pcap_file_tokio::pcapng::blocks::enhanced_packet::EnhancedPacketBlock;
use pcap_file_tokio::pcapng::blocks::interface_description::InterfaceDescriptionBlock;
use pcap_file_tokio::pcapng::PcapNgWriter;
use pcap_file_tokio::PcapError;
use thiserror::Error;
#[derive(Error, Debug)]
@@ -23,7 +23,7 @@ pub enum GsmtapPcapError {
Deku(#[from] DekuError),
}
pub struct GsmtapPcapWriter<T> where T: Write {
pub struct GsmtapPcapWriter<T> where T: AsyncWrite {
writer: PcapNgWriter<T>,
ip_id: u16,
}
@@ -56,23 +56,23 @@ struct UdpHeader {
checksum: u16,
}
impl<T> GsmtapPcapWriter<T> where T: Write {
pub fn new(writer: T) -> Result<Self, GsmtapPcapError> {
let writer = PcapNgWriter::new(writer)?;
impl<T> GsmtapPcapWriter<T> where T: AsyncWrite + Unpin + Send {
pub async fn new(writer: T) -> Result<Self, GsmtapPcapError> {
let writer = PcapNgWriter::new(writer).await?;
Ok(GsmtapPcapWriter { writer, ip_id: 0 })
}
pub fn write_iface_header(&mut self) -> Result<(), GsmtapPcapError> {
pub async fn write_iface_header(&mut self) -> Result<(), GsmtapPcapError> {
let interface = InterfaceDescriptionBlock {
linktype: pcap_file::DataLink::IPV4,
linktype: pcap_file_tokio::DataLink::IPV4,
snaplen: 0xffff,
options: vec![],
};
self.writer.write_pcapng_block(interface)?;
self.writer.write_pcapng_block(interface).await?;
Ok(())
}
pub fn write_gsmtap_message(&mut self, msg: GsmtapMessage, timestamp: Timestamp) -> Result<(), GsmtapPcapError> {
pub async fn write_gsmtap_message(&mut self, msg: GsmtapMessage, timestamp: Timestamp) -> Result<(), GsmtapPcapError> {
let duration = timestamp.to_datetime()
.signed_duration_since(DateTime::UNIX_EPOCH)
.to_std()?;
@@ -113,7 +113,7 @@ impl<T> GsmtapPcapWriter<T> where T: Write {
data: Cow::Owned(data),
options: vec![],
};
self.writer.write_pcapng_block(packet)?;
self.writer.write_pcapng_block(packet).await?;
self.ip_id = self.ip_id.wrapping_add(1);
Ok(())
}

View File

@@ -3,19 +3,18 @@
//! QmdlReader and QmdlWriter can read and write MessagesContainers to and from
//! QMDL files.
use crate::diag_reader::DiagReader;
use crate::diag::{MessagesContainer, MESSAGE_TERMINATOR, HdlcEncapsulatedMessage, DataType};
use std::io::{Write, BufReader, BufRead, Read};
use thiserror::Error;
use futures::TryStream;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, AsyncBufReadExt};
use log::error;
pub struct QmdlWriter<T> where T: Write {
pub struct QmdlWriter<T> where T: AsyncWrite + Unpin {
writer: T,
pub total_written: usize,
}
impl<T> QmdlWriter<T> where T: Write {
impl<T> QmdlWriter<T> where T: AsyncWrite + Unpin {
pub fn new(writer: T) -> Self {
QmdlWriter::new_with_existing_size(writer, 0)
}
@@ -27,30 +26,22 @@ impl<T> QmdlWriter<T> where T: Write {
}
}
pub fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> {
pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> {
for msg in &container.messages {
self.writer.write_all(&msg.data)?;
self.writer.write_all(&msg.data).await?;
self.total_written += msg.data.len();
}
Ok(())
}
}
#[derive(Debug, Error)]
pub enum QmdlReaderError {
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("Reached max_bytes count {0}")]
MaxBytesReached(usize),
}
pub struct QmdlReader<T> where T: Read {
pub struct QmdlReader<T> where T: AsyncRead {
reader: BufReader<T>,
bytes_read: usize,
max_bytes: Option<usize>,
}
impl<T> QmdlReader<T> where T: Read {
impl<T> QmdlReader<T> where T: AsyncRead + Unpin {
pub fn new(reader: T, max_bytes: Option<usize>) -> Self {
QmdlReader {
reader: BufReader::new(reader),
@@ -58,23 +49,29 @@ impl<T> QmdlReader<T> where T: Read {
max_bytes,
}
}
}
impl<T> DiagReader for QmdlReader<T> where T: Read {
type Err = QmdlReaderError;
pub fn as_stream(&mut self) -> impl TryStream<Ok = MessagesContainer, Error = std::io::Error> + '_ {
futures::stream::try_unfold(self, |reader| async {
let maybe_container = reader.get_next_messages_container().await?;
match maybe_container {
Some(container) => Ok(Some((container, reader))),
None => Ok(None)
}
})
}
fn get_next_messages_container(&mut self) -> Result<MessagesContainer, Self::Err> {
async fn get_next_messages_container(&mut self) -> Result<Option<MessagesContainer>, std::io::Error> {
if let Some(max_bytes) = self.max_bytes {
if self.bytes_read >= max_bytes {
if self.bytes_read > max_bytes {
error!("warning: {} bytes read, but max_bytes was {}", self.bytes_read, max_bytes);
}
return Err(QmdlReaderError::MaxBytesReached(max_bytes));
return Ok(None);
}
}
let mut buf = Vec::new();
let bytes_read = self.reader.read_until(MESSAGE_TERMINATOR, &mut buf)?;
let bytes_read = self.reader.read_until(MESSAGE_TERMINATOR, &mut buf).await?;
self.bytes_read += bytes_read;
// Since QMDL is just a flat list of messages, we can't actually
@@ -82,7 +79,7 @@ impl<T> DiagReader for QmdlReader<T> where T: Read {
// read. So we'll just pretend that all containers had exactly one
// message. As far as I know, the number of messages per container
// doesn't actually affect anything, so this should be fine.
Ok(MessagesContainer {
Ok(Some(MessagesContainer {
data_type: DataType::UserSpace,
num_messages: 1,
messages: vec![
@@ -91,7 +88,7 @@ impl<T> DiagReader for QmdlReader<T> where T: Read {
data: buf,
},
]
})
}))
}
}
@@ -100,7 +97,7 @@ mod test {
use std::io::Cursor;
use crate::hdlc::hdlc_encapsulate;
use crate::diag_reader::CRC_CCITT;
use crate::diag::CRC_CCITT;
use super::*;
@@ -140,8 +137,8 @@ mod test {
]
}
#[test]
fn test_unbounded_qmdl_reader() {
#[tokio::test]
async fn test_unbounded_qmdl_reader() {
let mut buf = Cursor::new(get_test_message_bytes());
let mut reader = QmdlReader::new(&mut buf, None);
let expected_messages = get_test_messages();
@@ -151,12 +148,12 @@ mod test {
num_messages: 1,
messages: vec![message],
};
assert_eq!(expected_container, reader.get_next_messages_container().unwrap());
assert_eq!(expected_container, reader.get_next_messages_container().await.unwrap().unwrap());
}
}
#[test]
fn test_bounded_qmdl_reader() {
#[tokio::test]
async fn test_bounded_qmdl_reader() {
let mut buf = Cursor::new(get_test_message_bytes());
// bound the reader to the first two messages
@@ -170,30 +167,30 @@ mod test {
num_messages: 1,
messages: vec![message],
};
assert_eq!(expected_container, reader.get_next_messages_container().unwrap());
assert_eq!(expected_container, reader.get_next_messages_container().await.unwrap().unwrap());
}
assert!(matches!(reader.get_next_messages_container(), Err(QmdlReaderError::MaxBytesReached(_))));
assert!(matches!(reader.get_next_messages_container().await, Ok(None)));
}
#[test]
fn test_qmdl_writer() {
#[tokio::test]
async fn test_qmdl_writer() {
let mut buf = Vec::new();
let mut writer = QmdlWriter::new(&mut buf);
let expected_containers = get_test_containers();
for container in &expected_containers {
writer.write_container(container).unwrap();
writer.write_container(container).await.unwrap();
}
assert_eq!(writer.total_written, buf.len());
assert_eq!(buf, get_test_message_bytes());
}
#[test]
fn test_writing_and_reading() {
#[tokio::test]
async fn test_writing_and_reading() {
let mut buf = Vec::new();
let mut writer = QmdlWriter::new(&mut buf);
let expected_containers = get_test_containers();
for container in &expected_containers {
writer.write_container(container).unwrap();
writer.write_container(container).await.unwrap();
}
let limit = Some(buf.len());
@@ -205,8 +202,8 @@ mod test {
num_messages: 1,
messages: vec![message],
};
assert_eq!(expected_container, reader.get_next_messages_container().unwrap());
assert_eq!(expected_container, reader.get_next_messages_container().await.unwrap().unwrap());
}
assert!(matches!(reader.get_next_messages_container(), Err(QmdlReaderError::MaxBytesReached(_))));
assert!(matches!(reader.get_next_messages_container().await, Ok(None)));
}
}

View File

@@ -1,6 +1,6 @@
use asn1_codecs::{uper::UperCodec, PerCodecData, PerCodecError};
use thiserror::Error;
#[allow(unreachable_patterns, non_camel_case_types)]
#[allow(warnings, unused, unreachable_patterns, non_camel_case_types)]
pub mod lte_rrc;
#[derive(Error, Debug)]
@@ -14,5 +14,5 @@ pub fn decode<T>(data: &[u8]) -> Result<T, ParsingError>
{
let mut asn_data = PerCodecData::from_slice_uper(data);
T::uper_decode(&mut asn_data)
.map_err(|e| ParsingError::UperDecodeError(e))
.map_err(ParsingError::UperDecodeError)
}