diff --git a/Cargo.lock b/Cargo.lock index f8793a9..e53f07d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1410,9 +1410,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.5.8" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" +checksum = "cc3dc5ad92c2e2d1c193bbbbdf2ea477cb81331de4f3103f267ca18368b988c4" dependencies = [ "powerfmt", ] @@ -1736,19 +1736,19 @@ checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", "libc", - "r-efi 5.3.0", + "r-efi", "wasip2", ] [[package]] name = "getrandom" -version = "0.4.2" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec" dependencies = [ "cfg-if", "libc", - "r-efi 6.0.0", + "r-efi", "wasip2", "wasip3", ] @@ -2179,9 +2179,9 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "ipnet" -version = "2.12.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" +checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" [[package]] name = "is_terminal_polyfill" @@ -2215,9 +2215,9 @@ checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" [[package]] name = "jiff" -version = "0.2.23" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a3546dc96b6d42c5f24902af9e2538e82e39ad350b0c766eb3fbf2d8f3d8359" +checksum = "c867c356cc096b33f4981825ab281ecba3db0acefe60329f044c1789d94c6543" dependencies = [ "jiff-static", "jiff-tzdb-platform", @@ -2230,9 +2230,9 @@ dependencies = [ [[package]] name = "jiff-static" -version = "0.2.23" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a8c8b344124222efd714b73bb41f8b5120b27a7cc1c75593a6ff768d9d05aa4" +checksum = "f7946b4325269738f270bb55b3c19ab5c5040525f83fd625259422a9d25d9be5" dependencies = [ "proc-macro2", "quote", @@ -2241,9 +2241,9 @@ dependencies = [ [[package]] name = "jiff-tzdb" -version = "0.1.6" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c900ef84826f1338a557697dc8fc601df9ca9af4ac137c7fb61d4c6f2dfd3076" +checksum = "68971ebff725b9e2ca27a601c5eb38a4c5d64422c4cbab0c535f248087eda5c2" [[package]] name = "jiff-tzdb-platform" @@ -2266,9 +2266,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.91" +version = "0.3.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c" +checksum = "8c942ebf8e95485ca0d52d97da7c5a2c387d0e7f0ba4c35e93bfcaee045955b3" dependencies = [ "once_cell", "wasm-bindgen", @@ -2367,14 +2367,13 @@ checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" [[package]] name = "libredox" -version = "0.1.14" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" +checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" dependencies = [ "bitflags 2.11.0", "libc", - "plain", - "redox_syscall 0.7.3", + "redox_syscall 0.7.1", ] [[package]] @@ -2390,9 +2389,9 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.1.24" +version = "1.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4735e9cbde5aac84a5ce588f6b23a90b9b0b528f6c5a8db8a4aff300463a0839" +checksum = "15d118bbf3771060e7311cc7bb0545b01d08a8b4a7de949198dec1fa0ca1c0f7" dependencies = [ "cc", "libc", @@ -2411,9 +2410,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.12.1" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" [[package]] name = "litemap" @@ -2947,9 +2946,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.17" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" [[package]] name = "pin-utils" @@ -2963,12 +2962,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" -[[package]] -name = "plain" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" - [[package]] name = "portable-atomic" version = "1.13.1" @@ -3020,9 +3013,9 @@ dependencies = [ [[package]] name = "proc-macro-crate" -version = "3.5.0" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" +checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" dependencies = [ "toml_edit", ] @@ -3195,9 +3188,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.45" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4" dependencies = [ "proc-macro2", ] @@ -3208,12 +3201,6 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" -[[package]] -name = "r-efi" -version = "6.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" - [[package]] name = "rand" version = "0.8.5" @@ -3334,9 +3321,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.7.3" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" +checksum = "35985aa610addc02e24fc232012c86fd11f14111180f902b67e2d5331f8ebf2b" dependencies = [ "bitflags 2.11.0", ] @@ -3372,9 +3359,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.10" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" [[package]] name = "reqwest" @@ -3460,9 +3447,9 @@ dependencies = [ [[package]] name = "rustix" -version = "1.1.4" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" dependencies = [ "bitflags 2.11.0", "errno 0.3.14", @@ -3719,7 +3706,7 @@ dependencies = [ [[package]] name = "sketchlib-rust" version = "0.1.0" -source = "git+https://github.com/ProjectASAP/sketchlib-rust#348db8415f97246c42de68b407b47fa038cf8b1f" +source = "git+https://github.com/ProjectASAP/sketchlib-rust?rev=663a1df#663a1df32ffaf8243af3b57bebfe33376ddeeb09" dependencies = [ "ahash", "clap 4.5.60", @@ -3783,12 +3770,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.6.3" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -4001,12 +3988,12 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.26.0" +version = "3.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0" +checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" dependencies = [ "fastrand", - "getrandom 0.4.2", + "getrandom 0.4.1", "once_cell", "rustix", "windows-sys 0.61.2", @@ -4150,9 +4137,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.50.0" +version = "1.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +checksum = "72a2903cd7736441aac9df9d7688bd0ce48edccaadf181c3b90be801e81d3d86" dependencies = [ "bytes", "libc", @@ -4160,16 +4147,16 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.6.3", + "socket2 0.6.2", "tokio-macros", "windows-sys 0.61.2", ] [[package]] name = "tokio-macros" -version = "2.6.1" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", @@ -4223,18 +4210,18 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "1.0.0+spec-1.1.0" +version = "0.7.5+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32c2555c699578a4f59f0cc68e5116c8d7cabbd45e1409b989d4be085b53f13e" +checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" dependencies = [ "serde_core", ] [[package]] name = "toml_edit" -version = "0.25.4+spec-1.1.0" +version = "0.23.10+spec-1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7193cbd0ce53dc966037f54351dbbcf0d5a642c7f0038c382ef9e677ce8c13f2" +checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269" dependencies = [ "indexmap", "toml_datetime", @@ -4452,11 +4439,11 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.22.0" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" +checksum = "b672338555252d43fd2240c714dc444b8c6fb0a5c5335e65a07bba7742735ddb" dependencies = [ - "getrandom 0.4.2", + "getrandom 0.4.1", "js-sys", "wasm-bindgen", ] @@ -4551,9 +4538,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.114" +version = "0.2.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e" +checksum = "64024a30ec1e37399cf85a7ffefebdb72205ca1c972291c51512360d90bd8566" dependencies = [ "cfg-if", "once_cell", @@ -4564,9 +4551,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.64" +version = "0.4.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9c5522b3a28661442748e09d40924dfb9ca614b21c00d3fd135720e48b67db8" +checksum = "70a6e77fd0ae8029c9ea0063f87c46fde723e7d887703d74ad2616d792e51e6f" dependencies = [ "cfg-if", "futures-util", @@ -4578,9 +4565,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.114" +version = "0.2.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6" +checksum = "008b239d9c740232e71bd39e8ef6429d27097518b6b30bdf9086833bd5b6d608" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4588,9 +4575,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.114" +version = "0.2.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3" +checksum = "5256bae2d58f54820e6490f9839c49780dff84c65aeab9e772f15d5f0e913a55" dependencies = [ "bumpalo", "proc-macro2", @@ -4601,9 +4588,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.114" +version = "0.2.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16" +checksum = "1f01b580c9ac74c8d8f0c0e4afb04eeef2acf145458e52c03845ee9cd23e3d12" dependencies = [ "unicode-ident", ] @@ -4644,9 +4631,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.91" +version = "0.3.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9" +checksum = "312e32e551d92129218ea9a2452120f4aabc03529ef03e4d0d82fb2780608598" dependencies = [ "js-sys", "wasm-bindgen", @@ -4782,6 +4769,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -4815,13 +4811,30 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm", + "windows_i686_gnullvm 0.52.6", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -4834,6 +4847,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.36.1" @@ -4852,6 +4871,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.36.1" @@ -4870,12 +4895,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.36.1" @@ -4894,6 +4931,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.36.1" @@ -4912,6 +4955,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -4924,6 +4973,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.36.1" @@ -4942,11 +4997,17 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "winnow" -version = "0.7.15" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945" +checksum = "5a5364e9d77fcdeeaa6062ced926ee3381faa2ee02d3eb83a5c27a8825540829" dependencies = [ "memchr", ] @@ -5095,18 +5156,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.40" +version = "0.8.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a789c6e490b576db9f7e6b6d661bcc9799f7c0ac8352f56ea20193b2681532e5" +checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.40" +version = "0.8.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f65c489a7071a749c849713807783f70672b28094011623e200cb86dcb835953" +checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" dependencies = [ "proc-macro2", "quote", diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index 11484c6..4edfeea 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -39,7 +39,7 @@ flate2 = "1.0" async-trait = "0.1" xxhash-rust = { version = "0.8", features = ["xxh32", "xxh64"] } dsrs = { git = "https://github.com/ProjectASAP/datasketches-rs" } -sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust" } +sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust", rev = "663a1df" } base64 = "0.21" hex = "0.4" sqlparser = "0.59.0" @@ -55,6 +55,18 @@ zstd = "0.13" reqwest = { version = "0.11", features = ["json"] } tracing-appender = "0.2" +[[bin]] +name = "precompute_engine" +path = "src/bin/precompute_engine.rs" + +[[bin]] +name = "test_e2e_precompute" +path = "src/bin/test_e2e_precompute.rs" + +[[bin]] +name = "bench_precompute_sketch" +path = "src/bin/bench_precompute_sketch.rs" + [dev-dependencies] tempfile = "3.20.0" diff --git a/asap-query-engine/Dockerfile b/asap-query-engine/Dockerfile index 190d9c0..8988d29 100644 --- a/asap-query-engine/Dockerfile +++ b/asap-query-engine/Dockerfile @@ -18,8 +18,12 @@ COPY Cargo.toml ./ COPY Cargo.lock ./ COPY asap-query-engine/Cargo.toml ./asap-query-engine/ -# Create a dummy main.rs to build dependencies -RUN mkdir -p asap-query-engine/src && echo "fn main() {}" > asap-query-engine/src/main.rs +# Create dummy Rust targets to build dependencies. +# Cargo.toml declares explicit bin paths, so they must exist in this cache layer. +RUN mkdir -p asap-query-engine/src/bin \ + && echo "fn main() {}" > asap-query-engine/src/main.rs \ + && echo "fn main() {}" > asap-query-engine/src/bin/precompute_engine.rs \ + && echo "fn main() {}" > asap-query-engine/src/bin/test_e2e_precompute.rs # Build dependencies (this layer will be cached) WORKDIR /code/asap-query-engine diff --git a/asap-query-engine/src/bin/bench_precompute_sketch.rs b/asap-query-engine/src/bin/bench_precompute_sketch.rs new file mode 100644 index 0000000..4783b36 --- /dev/null +++ b/asap-query-engine/src/bin/bench_precompute_sketch.rs @@ -0,0 +1,498 @@ +use clap::Parser; +use prost::Message; +use query_engine_rust::data_model::{AggregateCore, CleanupPolicy, LockStrategy, PrecomputedOutput, StreamingConfig}; +use query_engine_rust::drivers::ingest::prometheus_remote_write::{ + Label, Sample, TimeSeries, WriteRequest, +}; +use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; +use query_engine_rust::precompute_engine::output_sink::OutputSink; +use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::stores::{SimpleMapStore, Store}; +use sketch_db_common::aggregation_config::AggregationConfig; +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +#[derive(Parser, Debug)] +#[command(name = "bench_precompute_sketch")] +#[command(about = "Benchmark the precompute engine with DatasketchesKLL accumulators")] +struct Args { + #[arg(long, default_value_t = 4)] + workers: usize, + + #[arg(long, default_value_t = 4)] + concurrent_senders: usize, + + #[arg(long, default_value_t = 50)] + num_series: usize, + + #[arg(long, default_value_t = 100)] + samples_per_series: usize, + + #[arg(long, default_value_t = 100)] + num_requests: usize, + + #[arg(long, default_value_t = 5)] + latency_repetitions: usize, + + #[arg(long, default_value_t = 10)] + window_size_secs: u64, + + #[arg(long, default_value_t = 200)] + k: u16, + + #[arg(long, default_value_t = 19300)] + latency_port: u16, + + #[arg(long, default_value_t = 19301)] + throughput_port: u16, +} + +struct TrackingStoreSink { + store: Arc, + emitted_outputs: AtomicU64, +} + +impl TrackingStoreSink { + fn new(store: Arc) -> Self { + Self { + store, + emitted_outputs: AtomicU64::new(0), + } + } + + fn emitted(&self) -> u64 { + self.emitted_outputs.load(Ordering::Relaxed) + } +} + +impl OutputSink for TrackingStoreSink { + fn emit_batch( + &self, + outputs: Vec<(PrecomputedOutput, Box)>, + ) -> Result<(), Box> { + if outputs.is_empty() { + return Ok(()); + } + let emitted = outputs.len() as u64; + self.store.insert_precomputed_output_batch(outputs)?; + self.emitted_outputs.fetch_add(emitted, Ordering::Relaxed); + Ok(()) + } +} + +fn build_remote_write_body(timeseries: Vec) -> Vec { + let write_req = WriteRequest { timeseries }; + let proto_bytes = write_req.encode_to_vec(); + snap::raw::Encoder::new() + .compress_vec(&proto_bytes) + .expect("snappy compression should succeed") +} + +fn make_timeseries(metric: &str, label_0: &str, samples: Vec) -> TimeSeries { + TimeSeries { + labels: vec![ + Label { + name: "__name__".into(), + value: metric.into(), + }, + Label { + name: "instance".into(), + value: "bench".into(), + }, + Label { + name: "job".into(), + value: "bench".into(), + }, + Label { + name: "label_0".into(), + value: label_0.into(), + }, + ], + samples, + } +} + +fn make_kll_streaming_config(aggregation_id: u64, window_size_secs: u64, k: u16) -> Arc { + let mut params = HashMap::new(); + params.insert("K".to_string(), serde_json::Value::from(k as u64)); + + let agg_config = AggregationConfig::new( + aggregation_id, + "DatasketchesKLL".to_string(), + String::new(), + params, + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + window_size_secs, + "bench_metric".to_string(), + "bench_metric".to_string(), + None, + None, + Some(window_size_secs), + Some(window_size_secs), + Some("tumbling".to_string()), + None, + None, + ); + + let mut agg_map = HashMap::new(); + agg_map.insert(aggregation_id, agg_config); + Arc::new(StreamingConfig::new(agg_map)) +} + +fn make_store(streaming_config: Arc) -> Arc { + Arc::new(SimpleMapStore::new_with_strategy( + streaming_config, + CleanupPolicy::CircularBuffer, + LockStrategy::PerKey, + )) +} + +async fn start_engine( + port: u16, + workers: usize, + streaming_config: Arc, + sink: Arc, +) { + let config = PrecomputeEngineConfig { + num_workers: workers, + ingest_port: port, + allowed_lateness_ms: 5_000, + max_buffer_per_series: 100_000, + flush_interval_ms: 100, + channel_buffer_size: 50_000, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + }; + let engine = PrecomputeEngine::new(config, streaming_config, sink); + tokio::spawn(async move { + if let Err(err) = engine.run().await { + eprintln!("precompute engine on port {port} failed: {err}"); + } + }); + tokio::time::sleep(Duration::from_millis(500)).await; +} + +fn build_window_body( + metric: &str, + num_series: usize, + samples_per_series: usize, + window_start_ms: i64, + window_size_ms: i64, +) -> Vec { + let mut timeseries = Vec::with_capacity(num_series); + for series_idx in 0..num_series { + let label = format!("series_{series_idx}"); + let mut samples = Vec::with_capacity(samples_per_series); + for sample_idx in 0..samples_per_series { + let offset = (sample_idx as i64) % window_size_ms.max(1); + samples.push(Sample { + value: (series_idx * samples_per_series + sample_idx) as f64, + timestamp: window_start_ms + offset, + }); + } + timeseries.push(make_timeseries(metric, &label, samples)); + } + build_remote_write_body(timeseries) +} + +fn build_watermark_body(metric: &str, num_series: usize, timestamp_ms: i64) -> Vec { + let mut timeseries = Vec::with_capacity(num_series); + for series_idx in 0..num_series { + let label = format!("series_{series_idx}"); + timeseries.push(make_timeseries( + metric, + &label, + vec![Sample { + value: 0.0, + timestamp: timestamp_ms, + }], + )); + } + build_remote_write_body(timeseries) +} + +async fn post_body( + client: &reqwest::Client, + port: u16, + body: Vec, +) -> Result<(), Box> { + let response = client + .post(format!("http://localhost:{port}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await?; + if response.status() != reqwest::StatusCode::NO_CONTENT { + return Err(format!("unexpected HTTP status {}", response.status()).into()); + } + Ok(()) +} + +async fn wait_for_emitted_outputs( + sink: &TrackingStoreSink, + expected_outputs: u64, + timeout: Duration, +) -> Result> { + let start = Instant::now(); + let deadline = start + timeout; + loop { + if sink.emitted() >= expected_outputs { + return Ok(start.elapsed()); + } + if Instant::now() >= deadline { + return Err(format!( + "timed out waiting for outputs: expected {expected_outputs}, saw {}", + sink.emitted() + ) + .into()); + } + tokio::time::sleep(Duration::from_millis(20)).await; + } +} + +async fn run_latency_benchmark( + client: &reqwest::Client, + args: &Args, +) -> Result<(), Box> { + let aggregation_id = 101; + let metric = "bench_metric"; + let window_size_ms = (args.window_size_secs * 1000) as i64; + let streaming_config = make_kll_streaming_config(aggregation_id, args.window_size_secs, args.k); + let store = make_store(streaming_config.clone()); + let sink = Arc::new(TrackingStoreSink::new(store.clone())); + + start_engine( + args.latency_port, + args.workers, + streaming_config, + sink.clone(), + ) + .await; + + let warmup_body = build_window_body(metric, 1, 1, 0, window_size_ms); + let warmup_watermark = build_watermark_body(metric, 1, window_size_ms); + post_body(client, args.latency_port, warmup_body).await?; + post_body(client, args.latency_port, warmup_watermark).await?; + wait_for_emitted_outputs(&sink, 1, Duration::from_secs(5)).await?; + + let mut latencies_ms = Vec::with_capacity(args.latency_repetitions); + let mut rtts_ms = Vec::with_capacity(args.latency_repetitions); + + for rep in 0..args.latency_repetitions { + let baseline = sink.emitted(); + let window_start_ms = ((rep as i64) + 2) * 2 * window_size_ms; + let batch_body = build_window_body( + metric, + args.num_series, + args.samples_per_series, + window_start_ms, + window_size_ms, + ); + let watermark_body = build_watermark_body( + metric, + args.num_series, + window_start_ms + window_size_ms, + ); + + let t0 = Instant::now(); + post_body(client, args.latency_port, batch_body).await?; + let batch_rtt = t0.elapsed(); + post_body(client, args.latency_port, watermark_body).await?; + let e2e = wait_for_emitted_outputs( + &sink, + baseline + args.num_series as u64, + Duration::from_secs(10), + ) + .await?; + + latencies_ms.push(e2e.as_secs_f64() * 1000.0); + rtts_ms.push(batch_rtt.as_secs_f64() * 1000.0); + } + + let latency_store_results = store.query_precomputed_output( + metric, + aggregation_id, + 0, + ((args.latency_repetitions as u64) + 10) * args.window_size_secs * 1000, + )?; + let stored_windows: usize = latency_store_results.values().map(|buckets| buckets.len()).sum(); + + println!("\n=== DatasketchesKLL latency benchmark ==="); + println!( + " Config: {} workers, {} series, {} samples/series, K={}, {} repetitions", + args.workers, args.num_series, args.samples_per_series, args.k, args.latency_repetitions + ); + println!( + " HTTP RTT ms: min {:.2}, mean {:.2}, max {:.2}", + min_ms(&rtts_ms), + mean_ms(&rtts_ms), + max_ms(&rtts_ms) + ); + println!( + " E2E latency ms: min {:.2}, mean {:.2}, max {:.2}", + min_ms(&latencies_ms), + mean_ms(&latencies_ms), + max_ms(&latencies_ms) + ); + println!( + " Stored windows: {} (expected at least {})", + stored_windows, + 1 + args.latency_repetitions * args.num_series + ); + + Ok(()) +} + +async fn run_throughput_benchmark( + client: &reqwest::Client, + args: &Args, +) -> Result<(), Box> { + let aggregation_id = 202; + let metric = "bench_metric"; + let window_size_ms = (args.window_size_secs * 1000) as i64; + let total_samples = (args.num_requests * args.num_series * args.samples_per_series) as u64; + let expected_outputs = (args.num_requests * args.num_series) as u64; + + let streaming_config = make_kll_streaming_config(aggregation_id, args.window_size_secs, args.k); + let store = make_store(streaming_config.clone()); + let sink = Arc::new(TrackingStoreSink::new(store.clone())); + + start_engine( + args.throughput_port, + args.workers, + streaming_config, + sink.clone(), + ) + .await; + + let mut bodies = Vec::with_capacity(args.num_requests); + for req_idx in 0..args.num_requests { + let window_start_ms = (req_idx as i64) * window_size_ms; + bodies.push(build_window_body( + metric, + args.num_series, + args.samples_per_series, + window_start_ms, + window_size_ms, + )); + } + let final_watermark = build_watermark_body( + metric, + args.num_series, + (args.num_requests as i64) * window_size_ms, + ); + + let throughput_start = Instant::now(); + let mut chunks = vec![Vec::new(); args.concurrent_senders]; + for (idx, body) in bodies.into_iter().enumerate() { + chunks[idx % args.concurrent_senders].push(body); + } + + let mut handles = Vec::with_capacity(args.concurrent_senders); + for chunk in chunks { + let client = client.clone(); + let port = args.throughput_port; + handles.push(tokio::spawn(async move { + for body in chunk { + post_body(&client, port, body).await?; + } + Ok::<(), Box>(()) + })); + } + + for handle in handles { + handle.await??; + } + post_body(client, args.throughput_port, final_watermark).await?; + let send_elapsed = throughput_start.elapsed(); + + let wait_elapsed = wait_for_emitted_outputs( + &sink, + expected_outputs, + Duration::from_secs(60), + ) + .await?; + let total_elapsed = throughput_start.elapsed(); + + let store_results = store.query_precomputed_output( + metric, + aggregation_id, + 0, + ((args.num_requests as u64) + 2) * args.window_size_secs * 1000, + )?; + let stored_windows: usize = store_results.values().map(|buckets| buckets.len()).sum(); + + println!("\n=== DatasketchesKLL throughput benchmark ==="); + println!( + " Config: {} workers, {} senders, {} requests, {} series, {} samples/series, K={}", + args.workers, + args.concurrent_senders, + args.num_requests, + args.num_series, + args.samples_per_series, + args.k + ); + println!(" Total samples: {total_samples}"); + println!( + " Send throughput: {:.0} samples/sec ({:.1}ms)", + total_samples as f64 / send_elapsed.as_secs_f64(), + send_elapsed.as_secs_f64() * 1000.0 + ); + println!( + " E2E throughput: {:.0} samples/sec ({:.1}ms, drain wait {:.1}ms)", + total_samples as f64 / total_elapsed.as_secs_f64(), + total_elapsed.as_secs_f64() * 1000.0, + wait_elapsed.as_secs_f64() * 1000.0 + ); + println!( + " Stored windows: {} (expected {})", + stored_windows, expected_outputs + ); + + if stored_windows as u64 != expected_outputs { + return Err(format!( + "throughput benchmark stored {stored_windows} windows, expected {expected_outputs}" + ) + .into()); + } + + Ok(()) +} + +fn min_ms(values: &[f64]) -> f64 { + values.iter().copied().fold(f64::INFINITY, f64::min) +} + +fn mean_ms(values: &[f64]) -> f64 { + values.iter().sum::() / values.len() as f64 +} + +fn max_ms(values: &[f64]) -> f64 { + values.iter().copied().fold(f64::NEG_INFINITY, f64::max) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) + .init(); + + let client = reqwest::Client::new(); + + run_latency_benchmark(&client, &args).await?; + run_throughput_benchmark(&client, &args).await?; + + Ok(()) +} diff --git a/asap-query-engine/src/bin/precompute_engine.rs b/asap-query-engine/src/bin/precompute_engine.rs new file mode 100644 index 0000000..d5944a0 --- /dev/null +++ b/asap-query-engine/src/bin/precompute_engine.rs @@ -0,0 +1,156 @@ +use clap::Parser; +use query_engine_rust::data_model::QueryLanguage; +use query_engine_rust::data_model::{ + CleanupPolicy, InferenceConfig, LockStrategy, StreamingConfig, +}; +use query_engine_rust::drivers::query::adapters::AdapterConfig; +use query_engine_rust::engines::SimpleEngine; +use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; +use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, StoreOutputSink}; +use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::stores::SimpleMapStore; +use query_engine_rust::{HttpServer, HttpServerConfig}; +use std::sync::Arc; +use tracing::info; +use tracing_subscriber::fmt::format::FmtSpan; + +#[derive(Parser, Debug)] +#[command(name = "precompute_engine")] +#[command(about = "Standalone precompute engine for SketchDB")] +struct Args { + /// Path to streaming config YAML file + #[arg(long)] + streaming_config: String, + + /// Port for Prometheus remote write ingest + #[arg(long, default_value_t = 9090)] + ingest_port: u16, + + /// Number of worker threads + #[arg(long, default_value_t = 4)] + num_workers: usize, + + /// Maximum allowed lateness for out-of-order samples (ms) + #[arg(long, default_value_t = 5000)] + allowed_lateness_ms: i64, + + /// Maximum buffered samples per series + #[arg(long, default_value_t = 10000)] + max_buffer_per_series: usize, + + /// Flush interval for idle window detection (ms) + #[arg(long, default_value_t = 1000)] + flush_interval_ms: u64, + + /// MPSC channel buffer size per worker + #[arg(long, default_value_t = 10000)] + channel_buffer_size: usize, + + /// Port for the query HTTP server (0 to disable) + #[arg(long, default_value_t = 8080)] + query_port: u16, + + /// Lock strategy for the store + #[arg(long, value_enum, default_value_t = LockStrategy::PerKey)] + lock_strategy: LockStrategy, + + /// Skip aggregation and pass each raw sample directly to the store + #[arg(long, default_value_t = false)] + pass_raw_samples: bool, + + /// Aggregation ID to stamp on each raw-mode output + #[arg(long, default_value_t = 0)] + raw_mode_aggregation_id: u64, + + /// Policy for handling late samples that arrive after their window has closed + #[arg(long, value_enum, default_value_t = LateDataPolicy::Drop)] + late_data_policy: LateDataPolicy, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize tracing + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .with_span_events(FmtSpan::CLOSE) + .init(); + + let args = Args::parse(); + + info!("Loading streaming config from: {}", args.streaming_config); + let streaming_config = Arc::new(StreamingConfig::from_yaml_file(&args.streaming_config)?); + + info!( + "Loaded {} aggregation configs", + streaming_config.get_all_aggregation_configs().len() + ); + + // Create the store + let store: Arc = + Arc::new(SimpleMapStore::new_with_strategy( + streaming_config.clone(), + CleanupPolicy::CircularBuffer, + args.lock_strategy, + )); + + // Optionally start the query HTTP server + if args.query_port > 0 { + let inference_config = + InferenceConfig::new(QueryLanguage::promql, CleanupPolicy::CircularBuffer); + let query_engine = Arc::new(SimpleEngine::new( + store.clone(), + inference_config, + streaming_config.clone(), + 15, // default prometheus scrape interval + QueryLanguage::promql, + )); + let http_config = HttpServerConfig { + port: args.query_port, + handle_http_requests: true, + adapter_config: AdapterConfig { + protocol: query_engine_rust::data_model::QueryProtocol::PrometheusHttp, + language: QueryLanguage::promql, + fallback: None, + }, + }; + let http_server = HttpServer::new(http_config, query_engine, store.clone()); + tokio::spawn(async move { + if let Err(e) = http_server.run().await { + tracing::error!("Query server error: {}", e); + } + }); + info!("Query server started on port {}", args.query_port); + } + + // Build the precompute engine config + let engine_config = PrecomputeEngineConfig { + num_workers: args.num_workers, + ingest_port: args.ingest_port, + allowed_lateness_ms: args.allowed_lateness_ms, + max_buffer_per_series: args.max_buffer_per_series, + flush_interval_ms: args.flush_interval_ms, + channel_buffer_size: args.channel_buffer_size, + pass_raw_samples: args.pass_raw_samples, + raw_mode_aggregation_id: args.raw_mode_aggregation_id, + late_data_policy: args.late_data_policy, + }; + + // Create the output sink (writes directly to the store) + let output_sink: Arc = + if args.pass_raw_samples { + Arc::new(RawPassthroughSink::new(store)) + } else { + Arc::new(StoreOutputSink::new(store)) + }; + + // Build and run the engine + let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink); + + info!("Starting precompute engine..."); + engine.run().await?; + + Ok(()) +} diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs new file mode 100644 index 0000000..520a3c5 --- /dev/null +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -0,0 +1,844 @@ +//! End-to-end test for the standalone precompute_engine binary. +//! +//! This binary: +//! 1. Starts a PrecomputeEngine in-process (same as the precompute_engine binary) +//! 2. Sends Prometheus remote write samples via HTTP +//! 3. Queries the PromQL endpoint and prints results +//! +//! Usage: +//! cargo run --bin test_e2e_precompute + +use prost::Message; +use query_engine_rust::data_model::{LockStrategy, QueryLanguage, StreamingConfig}; +use query_engine_rust::drivers::ingest::prometheus_remote_write::{ + Label, Sample, TimeSeries, WriteRequest, +}; +use query_engine_rust::drivers::query::adapters::AdapterConfig; +use query_engine_rust::engines::SimpleEngine; +use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; +use query_engine_rust::precompute_engine::output_sink::{ + NoopOutputSink, RawPassthroughSink, StoreOutputSink, +}; +use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::stores::SimpleMapStore; +use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; +use query_engine_rust::{HttpServer, HttpServerConfig}; +use sketch_db_common::aggregation_config::AggregationConfig; +use std::collections::HashMap; +use std::sync::Arc; + +const INGEST_PORT: u16 = 19090; +const QUERY_PORT: u16 = 18080; +const RAW_INGEST_PORT: u16 = 19091; +const SCRAPE_INTERVAL: u64 = 1; // 1 second to match tumblingWindowSize + +fn build_remote_write_body(timeseries: Vec) -> Vec { + let write_req = WriteRequest { timeseries }; + let proto_bytes = write_req.encode_to_vec(); + snap::raw::Encoder::new() + .compress_vec(&proto_bytes) + .expect("snappy compress failed") +} + +fn make_sample(metric: &str, label_0: &str, timestamp_ms: i64, value: f64) -> TimeSeries { + TimeSeries { + labels: vec![ + Label { + name: "__name__".into(), + value: metric.into(), + }, + Label { + name: "instance".into(), + value: "i1".into(), + }, + Label { + name: "job".into(), + value: "test".into(), + }, + Label { + name: "label_0".into(), + value: label_0.into(), + }, + Label { + name: "label_1".into(), + value: "v1".into(), + }, + ], + samples: vec![Sample { + value, + timestamp: timestamp_ms, + }], + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) + .init(); + + // Load configs the same way main.rs does + let inference_config = read_inference_config( + "examples/promql/inference_config.yaml", + QueryLanguage::promql, + )?; + println!( + "Loaded inference config with {} query configs", + inference_config.query_configs.len() + ); + for qc in &inference_config.query_configs { + println!(" Query: '{}' -> {:?}", qc.query, qc.aggregations); + } + + let cleanup_policy = inference_config.cleanup_policy; + let streaming_config = Arc::new(read_streaming_config( + "examples/promql/streaming_config.yaml", + &inference_config, + )?); + println!( + "Loaded streaming config with {} aggregation configs", + streaming_config.get_all_aggregation_configs().len() + ); + + println!("\n=== Starting precompute engine (ingest={INGEST_PORT}, query={QUERY_PORT}) ==="); + + // Create store + let store: Arc = + Arc::new(SimpleMapStore::new_with_strategy( + streaming_config.clone(), + cleanup_policy, + LockStrategy::PerKey, + )); + + // Start query server + let query_engine = Arc::new(SimpleEngine::new( + store.clone(), + inference_config, + streaming_config.clone(), + SCRAPE_INTERVAL, + QueryLanguage::promql, + )); + let http_config = HttpServerConfig { + port: QUERY_PORT, + handle_http_requests: true, + adapter_config: AdapterConfig { + protocol: query_engine_rust::data_model::QueryProtocol::PrometheusHttp, + language: QueryLanguage::promql, + fallback: None, + }, + }; + let http_server = HttpServer::new(http_config, query_engine, store.clone()); + tokio::spawn(async move { + if let Err(e) = http_server.run().await { + eprintln!("Query server error: {e}"); + } + }); + + // Start precompute engine + let engine_config = PrecomputeEngineConfig { + num_workers: 2, + ingest_port: INGEST_PORT, + allowed_lateness_ms: 5000, + max_buffer_per_series: 10000, + flush_interval_ms: 200, + channel_buffer_size: 10000, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + }; + let output_sink = Arc::new(StoreOutputSink::new(store.clone())); + let engine = PrecomputeEngine::new(engine_config, streaming_config.clone(), output_sink); + tokio::spawn(async move { + if let Err(e) = engine.run().await { + eprintln!("Precompute engine error: {e}"); + } + }); + + // Wait for servers to bind + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + let client = reqwest::Client::new(); + + // ----------------------------------------------------------------------- + // Send samples across multiple 1-second tumbling windows. + // tumblingWindowSize=1 means windows are [0,1000), [1000,2000), etc. + // We need enough windows of data so the query engine can find results. + // ----------------------------------------------------------------------- + println!("\n=== Sending remote write samples ==="); + + // Send 20 windows worth of data (timestamps 0ms..20000ms = 0s..20s) + // Each window gets one sample. + for window in 0..20 { + let ts = window * 1000 + 500; // mid-window + let val = 10.0 + window as f64; + let body = build_remote_write_body(vec![make_sample("fake_metric", "groupA", ts, val)]); + + let resp = client + .post(format!("http://localhost:{INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await?; + + println!(" Sent t={ts}ms v={val} -> HTTP {}", resp.status().as_u16()); + } + + // Advance watermark well past to close all windows + println!("\n=== Advancing watermark to close all windows ==="); + let body = build_remote_write_body(vec![make_sample("fake_metric", "groupA", 25000, 0.0)]); + let resp = client + .post(format!("http://localhost:{INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await?; + println!(" Sent t=25000ms v=0 -> HTTP {}", resp.status().as_u16()); + + // Wait for flush + processing + println!("\n Waiting for flush..."); + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + // ----------------------------------------------------------------------- + // Query the PromQL endpoint + // The inference_config has: "quantile by (label_0) (0.99, fake_metric)" + // which maps to aggregation_id 1. + // ----------------------------------------------------------------------- + println!("\n=== Querying PromQL endpoint ==="); + + // Use the exact query pattern from inference_config + let queries_instant = vec![ + ( + "quantile by (label_0) (0.99, fake_metric)", + "10", + "Configured query at t=10", + ), + ( + "quantile by (label_0) (0.99, fake_metric)", + "15", + "Configured query at t=15", + ), + ( + "sum_over_time(fake_metric[1s])", + "10", + "Temporal: sum_over_time at t=10", + ), + ("sum(fake_metric)", "10", "Spatial: sum at t=10"), + ]; + + for (query, time, label) in &queries_instant { + println!("\n--- Instant query: {label} ---"); + let resp = client + .get(format!("http://localhost:{QUERY_PORT}/api/v1/query")) + .query(&[("query", *query), ("time", *time)]) + .send() + .await? + .text() + .await?; + print_json(&resp); + } + + // Range query + println!("\n--- Range query: quantile by (label_0) (0.99, fake_metric) t=5..20 step=1 ---"); + let resp = client + .get(format!("http://localhost:{QUERY_PORT}/api/v1/query_range")) + .query(&[ + ("query", "quantile by (label_0) (0.99, fake_metric)"), + ("start", "5"), + ("end", "20"), + ("step", "1"), + ]) + .send() + .await? + .text() + .await?; + print_json(&resp); + + // Runtime info + println!("\n--- Runtime info ---"); + let resp = client + .get(format!( + "http://localhost:{QUERY_PORT}/api/v1/status/runtimeinfo" + )) + .send() + .await? + .text() + .await?; + print_json(&resp); + + // ----------------------------------------------------------------------- + // RAW MODE TEST + // ----------------------------------------------------------------------- + println!("\n=== Starting raw-mode precompute engine (ingest={RAW_INGEST_PORT}) ==="); + + // The raw engine reuses the same store so we can query results directly. + // Pick aggregation_id = 1 to match the existing streaming config. + let raw_agg_id: u64 = 1; + let raw_engine_config = PrecomputeEngineConfig { + num_workers: 4, + ingest_port: RAW_INGEST_PORT, + allowed_lateness_ms: 5000, + max_buffer_per_series: 10000, + flush_interval_ms: 200, + channel_buffer_size: 10000, + pass_raw_samples: true, + raw_mode_aggregation_id: raw_agg_id, + late_data_policy: LateDataPolicy::Drop, + }; + let raw_sink = Arc::new(RawPassthroughSink::new(store.clone())); + let raw_engine = PrecomputeEngine::new(raw_engine_config, streaming_config.clone(), raw_sink); + tokio::spawn(async move { + if let Err(e) = raw_engine.run().await { + eprintln!("Raw precompute engine error: {e}"); + } + }); + + // Wait for server to bind + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + // Send a few raw samples — no need to advance watermark. + println!("\n=== Sending raw-mode samples ==="); + let raw_timestamps = [100_000i64, 101_000, 102_000]; + let raw_values = [42.0f64, 43.0, 44.0]; + for (&ts, &val) in raw_timestamps.iter().zip(raw_values.iter()) { + let body = build_remote_write_body(vec![make_sample("fake_metric", "groupA", ts, val)]); + let resp = client + .post(format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await?; + println!( + " Sent raw t={ts}ms v={val} -> HTTP {}", + resp.status().as_u16() + ); + } + + // Short wait for processing (no watermark advancement needed) + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Verify raw samples appeared in the store + println!("\n=== Verifying raw samples in store ==="); + let results = store.query_precomputed_output("fake_metric", raw_agg_id, 100_000, 103_000)?; + let total_buckets: usize = results.values().map(|v| v.len()).sum(); + println!(" Found {total_buckets} buckets for aggregation_id={raw_agg_id} in [100000, 103000)"); + assert!( + total_buckets >= 3, + "Expected at least 3 raw samples in store, got {total_buckets}" + ); + + for (key, buckets) in &results { + for ((start, end), _acc) in buckets { + println!(" key={key:?} start={start} end={end}"); + } + } + println!(" Raw mode test PASSED"); + + // ----------------------------------------------------------------------- + // BATCH LATENCY TEST + // Send 1000 samples in a single HTTP request to measure realistic e2e + // latency. Uses the raw-mode engine to avoid window-close dependencies. + // ----------------------------------------------------------------------- + println!("\n=== Batch latency test: 1000 samples in one request ==="); + + // Build a single WriteRequest with 1000 TimeSeries entries spread across + // 10 series × 100 timestamps each, so routing fans out to workers. + let mut batch_timeseries = Vec::with_capacity(1000); + for series_idx in 0..10 { + let label_val = format!("batch_{series_idx}"); + for t in 0..100 { + let ts = 200_000 + series_idx * 1000 + t; // unique ts per sample + let val = (series_idx * 100 + t) as f64; + batch_timeseries.push(make_sample("fake_metric", &label_val, ts, val)); + } + } + let batch_body = build_remote_write_body(batch_timeseries); + println!( + " Payload size: {} bytes (snappy-compressed)", + batch_body.len() + ); + + let t0 = std::time::Instant::now(); + let resp = client + .post(format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(batch_body) + .send() + .await?; + let client_rtt = t0.elapsed(); + println!( + " HTTP response: {} in {:.3}ms", + resp.status().as_u16(), + client_rtt.as_secs_f64() * 1000.0, + ); + + // Wait for all workers to finish processing + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Verify samples landed in the store + let batch_results = + store.query_precomputed_output("fake_metric", raw_agg_id, 200_000, 210_000)?; + let batch_buckets: usize = batch_results.values().map(|v| v.len()).sum(); + println!(" Stored {batch_buckets} buckets from batch (expected 1000)"); + assert!( + batch_buckets >= 1000, + "Expected at least 1000 raw samples in store, got {batch_buckets}" + ); + println!(" Batch latency test PASSED"); + + // ----------------------------------------------------------------------- + // THROUGHPUT TEST + // Send many requests back-to-back and measure sustained throughput + // (samples/sec). Uses the raw-mode engine for a clean measurement. + // ----------------------------------------------------------------------- + let num_concurrent_senders = 8usize; + println!("\n=== Throughput test: 1000 requests × 10000 samples ({num_concurrent_senders} concurrent senders) ==="); + + let num_requests = 1000u64; + let samples_per_request = 10_000u64; + let total_samples = num_requests * samples_per_request; + + // Pre-build all request bodies in parallel using rayon-style chunking via tokio tasks. + // Each task builds its share of requests, then we flatten the results. + let num_build_tasks = num_concurrent_senders; + let requests_per_task = (num_requests as usize).div_ceil(num_build_tasks); + let mut build_handles = Vec::with_capacity(num_build_tasks); + for task_idx in 0..num_build_tasks { + let start = task_idx * requests_per_task; + let end = ((task_idx + 1) * requests_per_task).min(num_requests as usize); + build_handles.push(tokio::task::spawn_blocking(move || { + let mut chunk = Vec::with_capacity(end - start); + for req_idx in start..end { + let mut timeseries = Vec::with_capacity(samples_per_request as usize); + for s in 0..samples_per_request { + let series_label = format!("tp_{}", s % 50); // 50 distinct series + let ts = 300_000 + req_idx as i64 * 10_000 + s as i64; + timeseries.push(make_sample("fake_metric", &series_label, ts, s as f64)); + } + chunk.push(build_remote_write_body(timeseries)); + } + chunk + })); + } + let mut bodies = Vec::with_capacity(num_requests as usize); + for handle in build_handles { + bodies.extend(handle.await?); + } + + let throughput_start = std::time::Instant::now(); + + // Send requests using multiple concurrent sender tasks + let mut body_chunks: Vec>> = vec![Vec::new(); num_concurrent_senders]; + for (i, body) in bodies.into_iter().enumerate() { + body_chunks[i % num_concurrent_senders].push(body); + } + let mut send_handles = Vec::new(); + for chunk in body_chunks { + let client = client.clone(); + let url = format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write"); + send_handles.push(tokio::spawn(async move { + for body in chunk { + let resp = client + .post(&url) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await; + if let Ok(r) = resp { + if r.status() != reqwest::StatusCode::NO_CONTENT { + eprintln!(" request failed: {}", r.status()); + } + } + } + })); + } + for handle in send_handles { + handle.await?; + } + + let send_elapsed = throughput_start.elapsed(); + println!( + " All {} requests sent in {:.1}ms", + num_requests, + send_elapsed.as_secs_f64() * 1000.0, + ); + + // Poll until workers drain or timeout after 60s + let max_ts = 300_000u64 + num_requests * 10_000 + samples_per_request; + let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(120); + let mut tp_buckets: usize; + loop { + let tp_results = + store.query_precomputed_output("fake_metric", raw_agg_id, 300_000, max_ts)?; + tp_buckets = tp_results.values().map(|v| v.len()).sum(); + if tp_buckets as u64 >= total_samples || std::time::Instant::now() > drain_deadline { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + } + let total_elapsed = throughput_start.elapsed(); + + let send_throughput = total_samples as f64 / send_elapsed.as_secs_f64(); + let e2e_throughput = tp_buckets as f64 / total_elapsed.as_secs_f64(); + println!(" Stored {tp_buckets}/{total_samples} samples"); + println!( + " Send throughput: {:.0} samples/sec ({:.1}ms for {total_samples} samples)", + send_throughput, + send_elapsed.as_secs_f64() * 1000.0, + ); + println!( + " E2E throughput: {:.0} samples/sec ({:.1}ms until all stored)", + e2e_throughput, + total_elapsed.as_secs_f64() * 1000.0, + ); + assert!( + tp_buckets as u64 >= total_samples, + "Expected at least {total_samples} samples in store, got {tp_buckets}" + ); + println!(" Throughput test PASSED"); + + // ----------------------------------------------------------------------- + // WINDOWED AGGREGATION THROUGHPUT BENCHMARKS + // Compare tumbling vs sliding window performance with the pane-based + // engine. Each benchmark spins up its own PrecomputeEngine with a + // NoopOutputSink (to isolate worker throughput from store I/O). + // ----------------------------------------------------------------------- + let bench_results = run_windowed_benchmarks(&client).await?; + println!("\n=== Windowed aggregation benchmark summary ==="); + println!( + " {:<30} {:>12} {:>12} {:>14}", + "Config", "Send (s/s)", "E2E (s/s)", "Latency (ms)" + ); + for r in &bench_results { + println!( + " {:<30} {:>12.0} {:>12.0} {:>14.1}", + r.label, r.send_throughput, r.e2e_throughput, r.batch_latency_ms + ); + } + + // ----------------------------------------------------------------------- + // SCALABILITY TEST + // Measure throughput as a function of worker count (1, 2, 4, 8, 16) + // to verify linear scaling with cores. Uses sliding 30s/10s Sum (W=3) + // with NoopOutputSink. + // ----------------------------------------------------------------------- + let scale_results = run_scalability_benchmark(&client).await?; + println!("\n=== Scalability benchmark summary (Sliding 30s/10s Sum, W=3) ==="); + println!( + " {:<10} {:>12} {:>12} {:>10}", + "Workers", "Send (s/s)", "E2E (s/s)", "Speedup" + ); + let baseline_e2e = scale_results + .first() + .map(|r| r.e2e_throughput) + .unwrap_or(1.0); + for r in &scale_results { + println!( + " {:<10} {:>12.0} {:>12.0} {:>10.2}x", + r.label, + r.send_throughput, + r.e2e_throughput, + r.e2e_throughput / baseline_e2e + ); + } + + println!("\n=== E2E test complete ==="); + + Ok(()) +} + +// --------------------------------------------------------------------------- +// Windowed aggregation benchmarks +// --------------------------------------------------------------------------- + +struct BenchResult { + label: String, + send_throughput: f64, + e2e_throughput: f64, + batch_latency_ms: f64, +} + +struct BenchRunConfig { + label: String, + port: u16, + streaming_config: Arc, + num_workers: usize, + num_concurrent_senders: usize, + num_requests: u64, + samples_per_request: u64, + num_series: u64, +} + +/// Build an AggregationConfig for Sum with specified window parameters. +fn make_sum_agg_config( + agg_id: u64, + window_size_secs: u64, + slide_interval_secs: u64, +) -> AggregationConfig { + let window_type = if slide_interval_secs == 0 || slide_interval_secs == window_size_secs { + "tumbling" + } else { + "sliding" + }; + AggregationConfig::new( + agg_id, + "SingleSubpopulation".to_string(), + "Sum".to_string(), + HashMap::new(), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + window_size_secs, + "bench_metric".to_string(), + "bench_metric".to_string(), + None, + None, + Some(window_size_secs), + Some(slide_interval_secs), + Some(window_type.to_string()), + None, + None, + ) +} + +/// Run a single windowed benchmark and return the results. +async fn run_single_bench( + client: &reqwest::Client, + config: BenchRunConfig, +) -> Result> { + let BenchRunConfig { + label, + port, + streaming_config, + num_workers, + num_concurrent_senders, + num_requests, + samples_per_request, + num_series, + } = config; + let total_samples = num_requests * samples_per_request; + + let noop_sink = Arc::new(NoopOutputSink::new()); + let engine_config = PrecomputeEngineConfig { + num_workers, + ingest_port: port, + allowed_lateness_ms: 5000, + max_buffer_per_series: 100_000, + flush_interval_ms: 100, + channel_buffer_size: 50_000, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + }; + let engine = PrecomputeEngine::new(engine_config, streaming_config, noop_sink.clone()); + tokio::spawn(async move { + if let Err(e) = engine.run().await { + eprintln!("Bench engine error: {e}"); + } + }); + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Pre-build request bodies. Timestamps are monotonically increasing + // across requests so windows close naturally as the watermark advances. + let mut bodies = Vec::with_capacity(num_requests as usize); + for req_idx in 0..num_requests { + let mut timeseries = Vec::with_capacity(samples_per_request as usize); + for s in 0..samples_per_request { + let series_label = format!("s_{}", s % num_series); + // Each request advances time by 1000ms (1 second) + let ts = (req_idx as i64) * 1000 + (s as i64 % 1000); + timeseries.push(make_sample("bench_metric", &series_label, ts, s as f64)); + } + bodies.push(build_remote_write_body(timeseries)); + } + + // --- Batch latency: single request --- + let latency_body = bodies[0].clone(); + let t0 = std::time::Instant::now(); + client + .post(format!("http://localhost:{port}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(latency_body) + .send() + .await?; + let batch_latency_ms = t0.elapsed().as_secs_f64() * 1000.0; + + // --- Throughput: all requests with concurrent senders --- + let throughput_start = std::time::Instant::now(); + + // Round-robin distribute request bodies across concurrent sender tasks + let mut body_chunks: Vec>> = vec![Vec::new(); num_concurrent_senders]; + for (i, body) in bodies.into_iter().enumerate() { + body_chunks[i % num_concurrent_senders].push(body); + } + let mut send_handles = Vec::new(); + for chunk in body_chunks { + let client = client.clone(); + let url = format!("http://localhost:{port}/api/v1/write"); + send_handles.push(tokio::spawn(async move { + for body in chunk { + let resp = client + .post(&url) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await; + if let Ok(r) = resp { + if r.status() != reqwest::StatusCode::NO_CONTENT { + eprintln!(" request failed: {}", r.status()); + } + } + } + })); + } + for handle in send_handles { + handle.await?; + } + let send_elapsed = throughput_start.elapsed(); + + // Wait for workers to drain (poll emit_count on noop sink) + let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(30); + loop { + let emitted = noop_sink + .emit_count + .load(std::sync::atomic::Ordering::Relaxed); + if emitted > 0 || std::time::Instant::now() > drain_deadline { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + // Give workers a bit more time to finish in-flight work + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + let total_elapsed = throughput_start.elapsed(); + + let emitted = noop_sink + .emit_count + .load(std::sync::atomic::Ordering::Relaxed); + let send_throughput = total_samples as f64 / send_elapsed.as_secs_f64(); + let e2e_throughput = total_samples as f64 / total_elapsed.as_secs_f64(); + + println!(" {label}:"); + println!( + " Sent {total_samples} samples in {:.1}ms ({:.0} samples/sec)", + send_elapsed.as_secs_f64() * 1000.0, + send_throughput + ); + println!( + " E2E: {:.1}ms ({:.0} samples/sec), emitted {emitted} windows", + total_elapsed.as_secs_f64() * 1000.0, + e2e_throughput + ); + println!(" Batch latency: {batch_latency_ms:.1}ms"); + + Ok(BenchResult { + label, + send_throughput, + e2e_throughput, + batch_latency_ms, + }) +} + +async fn run_windowed_benchmarks( + client: &reqwest::Client, +) -> Result, Box> { + let num_requests = 200u64; + let samples_per_request = 5_000u64; + let num_series = 50u64; + + let configs: Vec<(&str, u16, u64, u64)> = vec![ + // (label, port, window_size_secs, slide_interval_secs) + ("Tumbling 10s Sum", 19100, 10, 0), + ("Sliding 30s/10s Sum", 19101, 30, 10), + ("Sliding 60s/10s Sum (W=6)", 19102, 60, 10), + ]; + + println!("\n=== Windowed aggregation benchmarks ({num_requests} req × {samples_per_request} samples, {num_series} series) ==="); + + let mut results = Vec::new(); + for (label, port, window_size, slide_interval) in configs { + let agg_config = make_sum_agg_config(100, window_size, slide_interval); + let mut agg_map = HashMap::new(); + agg_map.insert(100u64, agg_config); + let sc = Arc::new(StreamingConfig::new(agg_map)); + + let r = run_single_bench( + client, + BenchRunConfig { + label: label.to_string(), + port, + streaming_config: sc, + num_workers: 4, + num_concurrent_senders: 4, // concurrent senders to saturate workers + num_requests, + samples_per_request, + num_series, + }, + ) + .await?; + results.push(r); + } + + Ok(results) +} + +async fn run_scalability_benchmark( + client: &reqwest::Client, +) -> Result, Box> { + let num_requests = 200u64; + let samples_per_request = 5_000u64; + let num_series = 100u64; // more series to give workers enough parallel work + let worker_counts: Vec = vec![1, 2, 4, 8, 16]; + let base_port: u16 = 19200; + + println!( + "\n=== Scalability benchmark ({num_requests} req × {samples_per_request} samples, \ + {num_series} series, Sliding 30s/10s Sum) ===" + ); + + let mut results = Vec::new(); + for (i, &num_workers) in worker_counts.iter().enumerate() { + let port = base_port + i as u16; + let label = format!("{num_workers}"); + + let agg_config = make_sum_agg_config(200 + i as u64, 30, 10); + let mut agg_map = HashMap::new(); + agg_map.insert(200 + i as u64, agg_config); + let sc = Arc::new(StreamingConfig::new(agg_map)); + + let r = run_single_bench( + client, + BenchRunConfig { + label, + port, + streaming_config: sc, + num_workers, + num_concurrent_senders: num_workers, // concurrent senders match worker count + num_requests, + samples_per_request, + num_series, + }, + ) + .await?; + results.push(r); + } + + Ok(results) +} + +fn print_json(s: &str) { + match serde_json::from_str::(s) { + Ok(v) => println!("{}", serde_json::to_string_pretty(&v).unwrap()), + Err(_) => println!("{s}"), + } +} diff --git a/asap-query-engine/src/data_model/precomputed_output.rs b/asap-query-engine/src/data_model/precomputed_output.rs index c5ebab5..4c607b6 100644 --- a/asap-query-engine/src/data_model/precomputed_output.rs +++ b/asap-query-engine/src/data_model/precomputed_output.rs @@ -444,8 +444,12 @@ impl PrecomputedOutput { Ok(Box::new(accumulator)) } "MultipleSum" => { - let accumulator = MultipleSumAccumulator::deserialize_from_bytes(buffer) - .map_err(|e| format!("Failed to deserialize MultipleSumAccumulator: {e}"))?; + let accumulator = if streaming_engine == "flink" { + MultipleSumAccumulator::deserialize_from_bytes(buffer) + } else { + MultipleSumAccumulator::deserialize_from_bytes_arroyo(buffer) + } + .map_err(|e| format!("Failed to deserialize MultipleSumAccumulator: {e}"))?; Ok(Box::new(accumulator)) } "MultipleMinMax" => { diff --git a/asap-query-engine/src/drivers/ingest/prometheus_remote_write.rs b/asap-query-engine/src/drivers/ingest/prometheus_remote_write.rs index 428c9de..694064c 100644 --- a/asap-query-engine/src/drivers/ingest/prometheus_remote_write.rs +++ b/asap-query-engine/src/drivers/ingest/prometheus_remote_write.rs @@ -20,6 +20,117 @@ // pub timeseries: Vec, // } +use prost::Message; + +/// Protobuf payload root for Prometheus remote-write. +#[derive(Clone, PartialEq, Message)] +pub struct WriteRequest { + #[prost(message, repeated, tag = "1")] + pub timeseries: Vec, +} + +#[derive(Clone, PartialEq, Message)] +pub struct TimeSeries { + #[prost(message, repeated, tag = "1")] + pub labels: Vec