diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ea01d87e09..ee55523d46 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -61,6 +61,7 @@ jobs: run: | sudo apt update sudo apt install --yes protobuf-compiler + cargo install pb-rs - name: Install check binaries run: | cargo install --git https://github.com/DevinR528/cargo-sort --rev 55ec890 --locked @@ -82,6 +83,7 @@ jobs: run: | sudo apt update sudo apt install --yes protobuf-compiler + cargo install pb-rs - uses: Swatinem/rust-cache@v2 - name: Run Unit Tests run: | diff --git a/.gitignore b/.gitignore index f3b4b08ab3..21aab843d3 100644 --- a/.gitignore +++ b/.gitignore @@ -9,5 +9,6 @@ integration_tests/dist_query/output .project .tools bin +!src/benchmarks/src/bin coverage.txt tini diff --git a/Cargo.lock b/Cargo.lock index 42ca6c4888..a97698eaba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -546,7 +546,7 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax", + "regex-syntax 0.8.4", ] [[package]] @@ -633,9 +633,19 @@ dependencies = [ "columnar_storage", "common", "criterion", + "deadpool", + "num_cpus", "pb_types", "prost", + "protobuf", + "protobuf-codegen", + "quick-protobuf", + "remote_write", "serde", + "serde_json", + "tikv-jemalloc-ctl", + "tikv-jemallocator", + "tokio", "toml", "tracing", "tracing-subscriber", @@ -1410,7 +1420,7 @@ dependencies = [ "itertools 0.13.0", "log", "paste", - "regex-syntax", + "regex-syntax 0.8.4", ] [[package]] @@ -1524,6 +1534,24 @@ dependencies = [ "strum", ] +[[package]] +name = "deadpool" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490" +dependencies = [ + "async-trait", + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "deranged" version = "0.3.11" @@ -1842,6 +1870,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "home" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "http" version = "0.2.12" @@ -2152,11 +2189,11 @@ dependencies = [ [[package]] name = "matchers" -version = "0.2.0" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" dependencies = [ - "regex-automata", + "regex-automata 0.1.10", ] [[package]] @@ -2226,11 +2263,12 @@ checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" [[package]] name = "nu-ansi-term" -version = "0.50.1" +version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" dependencies = [ - "windows-sys 0.52.0", + "overload", + "winapi", ] [[package]] @@ -2383,6 +2421,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.3" @@ -2672,6 +2716,66 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror", +] + +[[package]] +name = "protobuf-codegen" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace" +dependencies = [ + "anyhow", + "once_cell", + "protobuf", + "protobuf-parse", + "regex", + "tempfile", + "thiserror", +] + +[[package]] +name = "protobuf-parse" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973" +dependencies = [ + "anyhow", + "indexmap", + "log", + "protobuf", + "protobuf-support", + "tempfile", + "thiserror", + "which", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror", +] + +[[package]] +name = "quick-protobuf" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d6da84cc204722a989e01ba2f6e1e276e190f22263d0cb6ce8526fcdb0d2e1f" +dependencies = [ + "byteorder", +] + [[package]] name = "quote" version = "1.0.37" @@ -2748,8 +2852,17 @@ checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.7", + "regex-syntax 0.8.4", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2760,7 +2873,7 @@ checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.4", ] [[package]] @@ -2769,12 +2882,32 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "remote_write" +version = "2.2.0-alpha" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "deadpool", + "once_cell", + "pb_types", + "prost", + "tokio", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -3178,6 +3311,37 @@ dependencies = [ "ordered-float", ] +[[package]] +name = "tikv-jemalloc-ctl" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c" +dependencies = [ + "libc", + "paste", + "tikv-jemalloc-sys", +] + +[[package]] +name = "tikv-jemalloc-sys" +version = "0.5.4+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.37" @@ -3323,9 +3487,9 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.41" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", @@ -3335,9 +3499,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.30" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", @@ -3346,9 +3510,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.34" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", "valuable", @@ -3367,14 +3531,14 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.20" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ "matchers", "nu-ansi-term", "once_cell", - "regex-automata", + "regex", "sharded-slab", "smallvec", "thread_local", @@ -3552,6 +3716,34 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + [[package]] name = "winapi-util" version = "0.1.9" @@ -3561,6 +3753,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 35e3a56b99..b4bab12b6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,60 +15,64 @@ # specific language governing permissions and limitations # under the License. -[workspace.package] -version = "2.2.0-alpha" -authors = ["Apache HoraeDB(incubating) "] -edition = "2021" -license = "Apache-2.0" -repository = "https://github.com/apache/horaedb" -homepage = "https://horaedb.apache.org/" -description = "A high-performance, distributed, cloud native time-series database." - [workspace] resolver = "2" members = [ "src/benchmarks", - "src/columnar_storage" -, + "src/columnar_storage", "src/common", "src/metric_engine", "src/pb_types", + "src/remote_write", "src/server" ] +[workspace.package] +version = "2.2.0-alpha" +authors = ["Apache HoraeDB(incubating) "] +edition = "2021" +license = "Apache-2.0" +repository = "https://github.com/apache/horaedb" +homepage = "https://horaedb.apache.org/" +description = "A high-performance, distributed, cloud native time-series database." + [workspace.dependencies] anyhow = { version = "1.0" } -seahash = { version = "4" } -metric_engine = { path = "src/metric_engine" } +arrow = { version = "53", features = ["prettyprint"] } +arrow-schema = "53" +async-scoped = { version = "0.9.0", features = ["use-tokio"] } +async-stream = "0.3" +async-trait = "0.1" +byteorder = "1" +bytes = "1" +bytesize = "1" +clap = "4" columnar_storage = { path = "src/columnar_storage" } common = { path = "src/common" } -thiserror = "1" -bytes = "1" -byteorder = "1" +criterion = "0.5" datafusion = "43" -parquet = { version = "53" } +deadpool = "0.10" +futures = "0.3" +itertools = "0.3" +lazy_static = "1" +metric_engine = { path = "src/metric_engine" } object_store = { version = "0.11" } +once_cell = "1" +parquet = { version = "53" } pb_types = { path = "src/pb_types" } prost = { version = "0.13" } -arrow = { version = "53", features = ["prettyprint"] } -bytesize = "1" -clap = "4" -arrow-schema = "53" -tokio = { version = "1", features = ["full"] } -async-trait = "0.1" -async-stream = "0.3" -futures = "0.3" +remote_write = { path = "src/remote_write" } +seahash = { version = "4" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" temp-dir = "0.1" -itertools = "0.3" -lazy_static = "1" +test-log = "0.2" +thiserror = "1" +tokio = { version = "1", features = ["full"] } +toml = "0.8" tracing = "0.1" tracing-subscriber = "0.3" -async-scoped = { version = "0.9.0", features = ["use-tokio"] } -test-log = "0.2" uuid = "1" -criterion = "0.5" -serde = { version = "1.0", features = ["derive"] } -toml = "0.8" # This profile optimizes for good runtime performance. [profile.release] diff --git a/LICENSE b/LICENSE index 866183b703..c31a89355c 100644 --- a/LICENSE +++ b/LICENSE @@ -237,6 +237,8 @@ The following components are provided under the MIT License. See project link fo The text of each license is also included in licenses/LICENSE-[project].txt * consistent(https://github.com/buraksezer/consistent) +* prom-write-request-bench(https://github.com/v0y4g3r/prom-write-request-bench) Files horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform.go, -horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go are modified from consistent. \ No newline at end of file +horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go are modified from consistent. +File src/remote_write/src/repeated_field.rs from prom-write-request-bench. \ No newline at end of file diff --git a/docs/assets/remote-write-concurrent-performance.png b/docs/assets/remote-write-concurrent-performance.png new file mode 100644 index 0000000000..15d185a11c Binary files /dev/null and b/docs/assets/remote-write-concurrent-performance.png differ diff --git a/docs/assets/remote-write-memory-performance.png b/docs/assets/remote-write-memory-performance.png new file mode 100644 index 0000000000..c58598207d Binary files /dev/null and b/docs/assets/remote-write-memory-performance.png differ diff --git a/docs/assets/remote-write-sequential-performance.png b/docs/assets/remote-write-sequential-performance.png new file mode 100644 index 0000000000..a401c285ca Binary files /dev/null and b/docs/assets/remote-write-sequential-performance.png differ diff --git a/licenserc.toml b/licenserc.toml index 9bc60b6391..54129a8f9f 100644 --- a/licenserc.toml +++ b/licenserc.toml @@ -20,4 +20,6 @@ headerPath = "Apache-2.0-ASF.txt" excludes = [ # Forked "src/common/src/size_ext.rs", + "src/remote_write/src/repeated_field.rs", + "src/pb_types/protos/remote_write.proto", ] diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml index a6c017bf07..de738d0123 100644 --- a/src/benchmarks/Cargo.toml +++ b/src/benchmarks/Cargo.toml @@ -25,19 +25,44 @@ repository.workspace = true homepage.workspace = true description.workspace = true +[[bin]] +name = "parser_mem" +path = "src/bin/parser_mem.rs" + +[[bin]] +name = "pool_stats" +path = "src/bin/pool_stats.rs" + +[features] +unsafe-split = ["remote_write/unsafe-split"] + [dependencies] bytes = { workspace = true } columnar_storage = { workspace = true } common = { workspace = true } +deadpool = { workspace = true } pb_types = { workspace = true } prost = { workspace = true } +protobuf = "3.7" +quick-protobuf = "0.8" +remote_write = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } +tikv-jemalloc-ctl = "0.5" +tokio = { workspace = true } toml = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5" + +[build-dependencies] +protobuf-codegen = "3.7" + [dev-dependencies] criterion = { workspace = true } +num_cpus = "1.16" [[bench]] name = "bench" diff --git a/src/benchmarks/benches/bench.rs b/src/benchmarks/benches/bench.rs index 5ce94444e8..b932a44101 100644 --- a/src/benchmarks/benches/bench.rs +++ b/src/benchmarks/benches/bench.rs @@ -22,6 +22,7 @@ use std::{cell::RefCell, sync::Once}; use benchmarks::{ config::{self, BenchConfig}, encoding_bench::EncodingBench, + remote_write_bench::RemoteWriteBench, }; use criterion::*; @@ -56,10 +57,114 @@ fn bench_manifest_encoding(c: &mut Criterion) { group.finish(); } +fn bench_remote_write(c: &mut Criterion) { + let config = init_bench(); + + let sequential_scales = config.remote_write.sequential_scales.clone(); + let concurrent_scales = config.remote_write.concurrent_scales.clone(); + let bench = RefCell::new(RemoteWriteBench::new(config.remote_write)); + + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(num_cpus::get()) + .enable_all() + .build() + .unwrap(); + + // Sequential parse bench. + let mut group = c.benchmark_group("remote_write_sequential"); + + for &n in &sequential_scales { + group.bench_with_input( + BenchmarkId::new("prost", n), + &(&bench, n), + |b, (bench, scale)| { + let bench = bench.borrow(); + b.iter(|| bench.prost_parser_sequential(*scale).unwrap()) + }, + ); + + group.bench_with_input( + BenchmarkId::new("pooled", n), + &(&bench, &rt, n), + |b, (bench, rt, scale)| { + let bench = bench.borrow(); + b.iter(|| rt.block_on(bench.pooled_parser_sequential(*scale)).unwrap()) + }, + ); + + group.bench_with_input( + BenchmarkId::new("quick_protobuf", n), + &(&bench, n), + |b, (bench, scale)| { + let bench = bench.borrow(); + b.iter(|| bench.quick_protobuf_parser_sequential(*scale).unwrap()) + }, + ); + + group.bench_with_input( + BenchmarkId::new("rust_protobuf", n), + &(&bench, n), + |b, (bench, scale)| { + let bench = bench.borrow(); + b.iter(|| bench.rust_protobuf_parser_sequential(*scale).unwrap()) + }, + ); + } + group.finish(); + + // Concurrent parse bench. + let mut group = c.benchmark_group("remote_write_concurrent"); + + for &scale in &concurrent_scales { + group.bench_with_input( + BenchmarkId::new("prost", scale), + &(&bench, &rt, scale), + |b, (bench, rt, scale)| { + let bench = bench.borrow(); + b.iter(|| rt.block_on(bench.prost_parser_concurrent(*scale)).unwrap()) + }, + ); + + group.bench_with_input( + BenchmarkId::new("pooled", scale), + &(&bench, &rt, scale), + |b, (bench, rt, scale)| { + let bench = bench.borrow(); + b.iter(|| rt.block_on(bench.pooled_parser_concurrent(*scale)).unwrap()) + }, + ); + + group.bench_with_input( + BenchmarkId::new("quick_protobuf", scale), + &(&bench, &rt, scale), + |b, (bench, rt, scale)| { + let bench = bench.borrow(); + b.iter(|| { + rt.block_on(bench.quick_protobuf_parser_concurrent(*scale)) + .unwrap() + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new("rust_protobuf", scale), + &(&bench, &rt, scale), + |b, (bench, rt, scale)| { + let bench = bench.borrow(); + b.iter(|| { + rt.block_on(bench.rust_protobuf_parser_concurrent(*scale)) + .unwrap() + }) + }, + ); + } + group.finish(); +} + criterion_group!( name = benches; config = Criterion::default(); - targets = bench_manifest_encoding, + targets = bench_manifest_encoding, bench_remote_write, ); criterion_main!(benches); diff --git a/src/benchmarks/build.rs b/src/benchmarks/build.rs new file mode 100644 index 0000000000..a2f9593165 --- /dev/null +++ b/src/benchmarks/build.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{env, fs, path::PathBuf, process::Command}; + +fn main() { + // Similar to prost, we generate rust-protobuf and quick-protobuf code to + // OUT_DIR instead of the src directory. + let proto_path = "../pb_types/protos/remote_write.proto"; + let include_path = "../pb_types/protos"; + let out_dir = env::var("OUT_DIR").unwrap(); + let out_dir_path = PathBuf::from(&out_dir); + + // Generate rust-protobuf code to OUT_DIR. + protobuf_codegen::Codegen::new() + .pure() + .out_dir(&out_dir) + .input(proto_path) + .include(include_path) + .run() + .expect("rust-protobuf code generation failed"); + + // Rename rust-protobuf generated file to avoid potential conflicts. + let src_file = out_dir_path.join("remote_write.rs"); + let dst_file = out_dir_path.join("rust_protobuf_remote_write.rs"); + fs::rename(&src_file, &dst_file).expect("rust-protobuf file rename failed"); + + // Generate quick-protobuf code to OUT_DIR using pb-rs command line tool. + let quick_protobuf_file = out_dir_path.join("quick_protobuf_remote_write.rs"); + let output = Command::new("pb-rs") + .args([ + "-I", + include_path, + "-o", + quick_protobuf_file.to_str().unwrap(), + "-s", + proto_path, + ]) + .output() + .expect("pb-rs command execution failed"); + + if !output.status.success() { + panic!( + "pb-rs command execution failed: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + + // Fix package namespace conflicts and inner attributes using sed. + let output = Command::new("sed") + .args([ + "-i", + "-e", + "s/remote_write:://g", + "-e", + r"s/#!\[/#[/g", + "-e", + r"s/^\/\/! /\/\/ /g", + "-e", + "s/pb_types::mod_MetricMetadata::MetricType/mod_MetricMetadata::MetricType/g", + quick_protobuf_file.to_str().unwrap(), + ]) + .output() + .expect("sed command execution failed"); + if !output.status.success() { + eprintln!( + "warning: sed patching quick-protobuf output failed: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + + // Fix inner attributes in rust-protobuf generated file. + let output = Command::new("sed") + .args([ + "-i", + "-e", + r"s/#!\[/#[/g", + "-e", + r"s/^\/\/! /\/\/ /g", + dst_file.to_str().unwrap(), + ]) + .output() + .expect("sed command execution failed"); + if !output.status.success() { + eprintln!( + "warning: sed patching rust-protobuf output failed: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + + println!("cargo:rerun-if-changed={}", proto_path); +} diff --git a/src/benchmarks/config.toml b/src/benchmarks/config.toml index 1859f30310..00d270060d 100644 --- a/src/benchmarks/config.toml +++ b/src/benchmarks/config.toml @@ -20,3 +20,8 @@ record_count = 1000 append_count = 100 bench_measurement_time = "5s" bench_sample_size = 10 + +[remote_write] +workload_file = "../remote_write/tests/workloads/1709380533560664458.data" +sequential_scales = [1, 5, 10, 20, 50, 100] +concurrent_scales = [1, 5, 10, 20, 50, 100] diff --git a/src/benchmarks/remote_write_memory_bench.py b/src/benchmarks/remote_write_memory_bench.py new file mode 100644 index 0000000000..42a2e50293 --- /dev/null +++ b/src/benchmarks/remote_write_memory_bench.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import subprocess +import json +import sys +import os +from typing import Dict, Any +import argparse + +try: + from tabulate import tabulate +except ImportError: + print("Error: tabulate library not found.") + print("Please install it with: pip3 install tabulate") + sys.exit(1) + + +class MemoryBenchmark: + def __init__(self, scale, mode, use_unsafe=False): + self.project_path = "." + self.data_path = "../remote_write/tests/workloads/1709380533560664458.data" + self.scale = scale + self.mode = mode + self.use_unsafe = use_unsafe + self.parsers = [ + "pooled", + "prost", + "rust-protobuf", + "quick-protobuf", + ] + + def build_binary(self) -> bool: + features_msg = " with unsafe-split" if self.use_unsafe else "" + print(f"Building binary{features_msg}...") + try: + bin_name = "parser_mem" + build_cmd = ["cargo", "build", "--release", "--bin", bin_name] + if self.use_unsafe: + build_cmd.extend(["--features", "unsafe-split"]) + result = subprocess.run( + build_cmd, + cwd=self.project_path, + check=False, + ) + if result.returncode != 0: + print("Failed to build binary") + return False + return True + except Exception as e: + print(f"Failed to build binary: {e}") + return False + + def run_parser(self, parser: str, mode: str, scale: int, bin_name: str) -> Dict[str, Any]: + binary_path = f"../../target/release/{bin_name}" + + cmd = [binary_path, mode, str(scale), parser] + + try: + result = subprocess.run( + cmd, + cwd=self.project_path, + capture_output=True, + text=True, + timeout=300, # 5 minute timeout + ) + + if result.returncode != 0: + print(f"Error running {parser}: {result.stderr}") + return {} + + return json.loads(result.stdout.strip()) + + except subprocess.TimeoutExpired: + print(f"Timeout running {parser} {mode} {scale}") + return {} + except json.JSONDecodeError as e: + print(f"Failed to parse JSON from {parser}: {e}") + print(f"Raw output: {result.stdout}") + return {} + except FileNotFoundError: + print(f"Binary not found: {binary_path}") + print(f"Please run: cargo build --release --bins") + return {} + except Exception as e: + print(f"Exception running {parser}: {e}") + return {} + + def run_benchmarks(self) -> Dict[str, Dict[str, Any]]: + results = {} + successful_count = 0 + total_count = len(self.parsers) + + print(f"\nRunning benchmarks for {total_count} parsers...") + + for i, parser in enumerate(self.parsers, 1): + print(f"\n[{i}/{total_count}] Testing {parser}...") + print(f"Running {self.mode} mode with scale {self.scale}...") + result = self.run_parser( + parser, self.mode, self.scale, "parser_mem") + + if result: + result["parser"] = parser + results[parser] = result + successful_count += 1 + print(f"Success") + else: + print(f"Failed") + + print( + f"\nCompleted: {successful_count}/{total_count} parsers succeeded") + + if successful_count == total_count: + print("All parsers succeeded - generating report...") + return results + else: + print("Some parsers failed - skipping report generation") + return {} + + def analyze_results(self, results: Dict[str, Dict[str, Any]]): + if not results: + print("\nNo results to analyze - all parsers failed or were skipped") + return + + print(f"\n{'='*80}") + print("MEMORY BENCHMARK RESULTS") + print(f"{'='*80}") + print(f"Mode: {self.mode.upper()}, Scale: {self.scale}") + print() + + headers = [ + "Parser", + "ThreadAlloc", + "ThreadDealloc", + "Allocated", + "Active", + "Metadata", + "Mapped", + "Resident", + "Retained", + ] + + table_data = [] + for parser in self.parsers: + if parser in results: + result = results[parser] + memory = result.get("memory", {}) + row = [ + parser, + f"{memory.get('thread_allocated_diff', 0):,}", + f"{memory.get('thread_deallocated_diff', 0):,}", + f"{memory.get('allocated', 0):,}", + f"{memory.get('active', 0):,}", + f"{memory.get('metadata', 0):,}", + f"{memory.get('mapped', 0):,}", + f"{memory.get('resident', 0):,}", + f"{memory.get('retained', 0):,}", + ] + table_data.append(row) + + print("SUMMARY TABLE (All values in bytes)") + print( + tabulate( + table_data, + headers=headers, + tablefmt="grid", + stralign="right", + numalign="right", + ) + ) + + +def main(): + parser = argparse.ArgumentParser( + description="Memory benchmark for protobuf parsers" + ) + parser.add_argument( + "--unsafe", action="store_true", help="Enable unsafe-split feature" + ) + parser.add_argument( + "--mode", + choices=["sequential", "concurrent"], + default="sequential", + help="Test mode to run (default: sequential)", + ) + parser.add_argument( + "--scale", + type=int, + default=10, + help="Scale value for benchmark (default: 10)", + ) + + args = parser.parse_args() + + if args.scale <= 0: + print(f"Invalid scale value '{args.scale}', scale must be positive") + sys.exit(1) + + data_path = "../remote_write/tests/workloads/1709380533560664458.data" + if not os.path.exists(data_path): + print(f"Test data file not found at {data_path}") + print("Please ensure test data exists before running benchmarks") + sys.exit(1) + + benchmark = MemoryBenchmark( + scale=args.scale, mode=args.mode, use_unsafe=args.unsafe + ) + + if not benchmark.build_binary(): + sys.exit(1) + + print(f"\nRunning memory benchmarks...") + print(f"Mode: {args.mode}") + print(f"Scale: {args.scale}") + print(f"Unsafe optimization: {'enabled' if args.unsafe else 'disabled'}") + + results = benchmark.run_benchmarks() + benchmark.analyze_results(results) + + +if __name__ == "__main__": + main() diff --git a/src/benchmarks/src/bin/parser_mem.rs b/src/benchmarks/src/bin/parser_mem.rs new file mode 100644 index 0000000000..0028cd5750 --- /dev/null +++ b/src/benchmarks/src/bin/parser_mem.rs @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use benchmarks::util::{MemoryBenchConfig, MemoryStats}; +use pb_types::WriteRequest as ProstWriteRequest; +use prost::Message; +use protobuf::Message as ProtobufMessage; +use quick_protobuf::{BytesReader, MessageRead}; +use remote_write::pooled_parser::PooledParser; +use tikv_jemallocator::Jemalloc; + +#[global_allocator] +static ALLOC: Jemalloc = Jemalloc; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + let config = MemoryBenchConfig::from_args(); + let args: Vec = std::env::args().collect(); + let parser = args.get(3).map(|s| s.as_str()).unwrap_or("pooled"); + + let start_stats = MemoryStats::collect()?; + + match config.mode.as_str() { + "sequential" => match parser { + "pooled" => { + let parser = PooledParser; + for _ in 0..config.scale { + let _ = parser.decode_async(config.test_data.clone()).await?; + } + } + "prost" => { + for _ in 0..config.scale { + ProstWriteRequest::decode(config.test_data.clone())?; + } + } + "rust-protobuf" => { + for _ in 0..config.scale { + let _ = benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes( + &config.test_data, + )?; + } + } + "quick-protobuf" => { + for _ in 0..config.scale { + let mut reader = BytesReader::from_bytes(&config.test_data); + let _ = benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader( + &mut reader, + &config.test_data, + )?; + } + } + other => panic!("unknown parser: {}", other), + }, + "concurrent" => match parser { + "pooled" => { + let mut handles = Vec::new(); + for _ in 0..config.scale { + let data_clone = config.test_data.clone(); + let handle = tokio::spawn(async move { + let parser = PooledParser; + let _ = parser.decode_async(data_clone).await; + }); + handles.push(handle); + } + for handle in handles { + handle.await?; + } + } + "prost" => { + let mut handles = Vec::new(); + for _ in 0..config.scale { + let data_clone = config.test_data.clone(); + let handle = tokio::spawn(async move { + let _ = ProstWriteRequest::decode(data_clone); + }); + handles.push(handle); + } + for handle in handles { + handle.await?; + } + } + "rust-protobuf" => { + let mut handles = Vec::new(); + for _ in 0..config.scale { + let data_clone = config.test_data.clone(); + let handle = tokio::spawn(async move { + let _ = + benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes( + &data_clone, + ); + }); + handles.push(handle); + } + for handle in handles { + handle.await?; + } + } + "quick-protobuf" => { + let mut handles = Vec::new(); + for _ in 0..config.scale { + let data_clone = config.test_data.clone(); + let handle = tokio::spawn(async move { + let mut reader = BytesReader::from_bytes(&data_clone); + let _ = benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader( + &mut reader, + &data_clone, + ); + }); + handles.push(handle); + } + for handle in handles { + handle.await?; + } + } + other => panic!("unknown parser: {}", other), + }, + _ => panic!("invalid mode"), + } + + let end_stats = MemoryStats::collect()?; + let memory_diff = start_stats.diff(&end_stats); + config.output_json(&memory_diff); + Ok(()) +} diff --git a/src/benchmarks/src/bin/pool_stats.rs b/src/benchmarks/src/bin/pool_stats.rs new file mode 100644 index 0000000000..48115075b3 --- /dev/null +++ b/src/benchmarks/src/bin/pool_stats.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! evaluate the efficiency of the deadpool-backed object pool. + +use std::fs; + +use bytes::Bytes; +use remote_write::{pooled_parser::PooledParser, pooled_types::POOL}; +use tikv_jemallocator::Jemalloc; +use tokio::task::JoinHandle; + +#[global_allocator] +static ALLOC: Jemalloc = Jemalloc; + +async fn run_concurrent_parsing(scale: usize) -> deadpool::Status { + let data = fs::read("../remote_write/tests/workloads/1709380533560664458.data") + .expect("test data load failed"); + let data = Bytes::from(data); + + let handles: Vec> = (0..scale) + .map(|_| { + let data = data.clone(); + tokio::spawn(async move { + let parser = PooledParser; + let _ = parser + .decode_async(data.clone()) + .await + .expect("parse failed"); + }) + }) + .collect(); + + for handle in handles { + handle.await.expect("task completion failed"); + } + + POOL.status() +} + +#[tokio::main] +async fn main() { + let scale_values = [1, 2, 5, 10, 20, 50, 100, 200, 500]; + + println!( + "{:<8} {:<10} {:<10} {:<10} {:<10}", + "Scale", "MaxSize", "PoolSize", "Available", "Waiting" + ); + println!("{}", "=".repeat(50)); + + for &scale in &scale_values { + let status = run_concurrent_parsing(scale).await; + + println!( + "{:<8} {:<10} {:<10} {:<10} {:<10}", + scale, status.max_size, status.size, status.available, status.waiting + ); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + + println!("=== Final Pool Status ==="); + let final_status = POOL.status(); + println!("Max Pool Size: {}", final_status.max_size); + println!("Current Pool Size: {}", final_status.size); + println!("Available Objects: {}", final_status.available); + println!("Waiting Requests: {}", final_status.waiting); +} diff --git a/src/benchmarks/src/config.rs b/src/benchmarks/src/config.rs index 27577f258c..199ef62765 100644 --- a/src/benchmarks/src/config.rs +++ b/src/benchmarks/src/config.rs @@ -27,6 +27,7 @@ const BENCH_CONFIG_PATH_KEY: &str = "BENCH_CONFIG_PATH"; #[derive(Debug, Deserialize)] pub struct BenchConfig { pub manifest: ManifestConfig, + pub remote_write: RemoteWriteConfig, } pub fn config_from_env() -> BenchConfig { @@ -48,3 +49,10 @@ pub struct ManifestConfig { pub bench_measurement_time: ReadableDuration, pub bench_sample_size: usize, } + +#[derive(Deserialize, Debug, Clone)] +pub struct RemoteWriteConfig { + pub workload_file: String, + pub sequential_scales: Vec, + pub concurrent_scales: Vec, +} diff --git a/src/benchmarks/src/lib.rs b/src/benchmarks/src/lib.rs index 180d1b8a38..20d2564f54 100644 --- a/src/benchmarks/src/lib.rs +++ b/src/benchmarks/src/lib.rs @@ -19,4 +19,17 @@ pub mod config; pub mod encoding_bench; -mod util; +pub mod remote_write_bench; +pub mod util; + +#[allow(clippy::all)] +#[allow(warnings)] +pub mod quick_protobuf_remote_write { + include!(concat!(env!("OUT_DIR"), "/quick_protobuf_remote_write.rs")); +} + +#[allow(clippy::all)] +#[allow(warnings)] +pub mod rust_protobuf_remote_write { + include!(concat!(env!("OUT_DIR"), "/rust_protobuf_remote_write.rs")); +} diff --git a/src/benchmarks/src/remote_write_bench.rs b/src/benchmarks/src/remote_write_bench.rs new file mode 100644 index 0000000000..b5372da5b6 --- /dev/null +++ b/src/benchmarks/src/remote_write_bench.rs @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! remote write parser bench. + +use std::{fs, path::PathBuf}; + +use bytes::Bytes; +use pb_types::WriteRequest as ProstWriteRequest; +use prost::Message; +use protobuf::Message as ProtobufMessage; +use quick_protobuf::{BytesReader, MessageRead}; +use remote_write::pooled_parser::PooledParser; +use tokio::task::JoinHandle; + +use crate::{ + config::RemoteWriteConfig, + quick_protobuf_remote_write::WriteRequest as QuickProtobufWriteRequest, + rust_protobuf_remote_write::WriteRequest as RustProtobufWriteRequest, +}; + +pub struct RemoteWriteBench { + raw_data: Vec, +} + +impl RemoteWriteBench { + pub fn new(config: RemoteWriteConfig) -> Self { + let mut workload_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + workload_path.push(&config.workload_file); + + let raw_data = fs::read(&workload_path) + .unwrap_or_else(|_| panic!("failed to read workload file: {:?}", workload_path)); + + Self { raw_data } + } + + // prost parser sequential bench. + pub fn prost_parser_sequential(&self, scale: usize) -> Result<(), String> { + for _ in 0..scale { + let data = Bytes::from(self.raw_data.clone()); + ProstWriteRequest::decode(data) + .map_err(|e| format!("prost sequential parse failed: {}", e))?; + } + Ok(()) + } + + // Hand-written pooled parser sequential bench. + pub async fn pooled_parser_sequential(&self, scale: usize) -> Result<(), String> { + let parser = PooledParser; + for _ in 0..scale { + let data = Bytes::from(self.raw_data.clone()); + let _ = parser + .decode_async(data.clone()) + .await + .map_err(|e| format!("pooled sequential parse failed: {:?}", e))?; + } + Ok(()) + } + + // quick-protobuf parser sequential bench. + pub fn quick_protobuf_parser_sequential(&self, scale: usize) -> Result<(), String> { + for _ in 0..scale { + let mut reader = BytesReader::from_bytes(&self.raw_data); + QuickProtobufWriteRequest::from_reader(&mut reader, &self.raw_data) + .map_err(|e| format!("quick-protobuf sequential parse failed: {}", e))?; + } + Ok(()) + } + + // rust-protobuf parser sequential bench. + pub fn rust_protobuf_parser_sequential(&self, scale: usize) -> Result<(), String> { + for _ in 0..scale { + RustProtobufWriteRequest::parse_from_bytes(&self.raw_data) + .map_err(|e| format!("rust-protobuf sequential parse failed: {}", e))?; + } + Ok(()) + } + + // prost parser concurrent bench. + pub async fn prost_parser_concurrent(&self, scale: usize) -> Result<(), String> { + let join_handles: Vec>> = (0..scale) + .map(|_| { + let raw_data = self.raw_data.clone(); + tokio::spawn(async move { + let data = Bytes::from(raw_data); + ProstWriteRequest::decode(data) + .map_err(|e| format!("prost concurrent parse failed: {}", e))?; + Ok(()) + }) + }) + .collect(); + + for join_handle in join_handles { + join_handle.await.unwrap()?; + } + Ok(()) + } + + // Hand-written pooled parser concurrent bench. + pub async fn pooled_parser_concurrent(&self, scale: usize) -> Result<(), String> { + let parser = PooledParser; + let join_handles: Vec>> = (0..scale) + .map(|_| { + let parser = parser.clone(); + let raw_data = self.raw_data.clone(); + tokio::spawn(async move { + let data = Bytes::from(raw_data); + let _ = parser + .decode_async(data.clone()) + .await + .map_err(|e| format!("pooled concurrent parse failed: {:?}", e))?; + Ok(()) + }) + }) + .collect(); + + for join_handle in join_handles { + join_handle.await.unwrap()?; + } + Ok(()) + } + + // quick-protobuf parser concurrent bench. + pub async fn quick_protobuf_parser_concurrent(&self, scale: usize) -> Result<(), String> { + let join_handles: Vec>> = (0..scale) + .map(|_| { + let data = self.raw_data.clone(); + tokio::spawn(async move { + let mut reader = BytesReader::from_bytes(&data); + QuickProtobufWriteRequest::from_reader(&mut reader, &data) + .map_err(|e| format!("quick-protobuf concurrent parse failed: {}", e))?; + Ok(()) + }) + }) + .collect(); + + for join_handle in join_handles { + join_handle.await.unwrap()?; + } + Ok(()) + } + + // rust-protobuf parser concurrent bench. + pub async fn rust_protobuf_parser_concurrent(&self, scale: usize) -> Result<(), String> { + let join_handles: Vec>> = (0..scale) + .map(|_| { + let data = self.raw_data.clone(); + tokio::spawn(async move { + RustProtobufWriteRequest::parse_from_bytes(&data) + .map_err(|e| format!("rust-protobuf concurrent parse failed: {}", e))?; + Ok(()) + }) + }) + .collect(); + + for join_handle in join_handles { + join_handle.await.unwrap()?; + } + Ok(()) + } +} diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs index 623b5b0679..4d74f78558 100644 --- a/src/benchmarks/src/util.rs +++ b/src/benchmarks/src/util.rs @@ -18,15 +18,20 @@ //! Utilities for benchmarks. use std::{ + env, fmt::{self, Write}, + fs, str::FromStr, time::Duration, }; +use bytes::Bytes; use serde::{ de::{self, Visitor}, Deserialize, Deserializer, Serialize, Serializer, }; +use serde_json::json; +use tikv_jemalloc_ctl::{epoch, stats, thread}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Default)] pub struct ReadableDuration(pub Duration); @@ -168,3 +173,123 @@ impl<'de> Deserialize<'de> for ReadableDuration { deserializer.deserialize_str(DurVisitor) } } + +// Memory bench utilities. +#[derive(Debug, Clone)] +pub struct MemoryStats { + pub thread_allocated: u64, + pub thread_deallocated: u64, + pub allocated: u64, + pub active: u64, + pub metadata: u64, + pub mapped: u64, + pub resident: u64, + pub retained: u64, +} + +#[derive(Debug, Clone)] +pub struct MemoryStatsDiff { + pub thread_allocated_diff: i64, + pub thread_deallocated_diff: i64, + pub allocated: i64, + pub active: i64, + pub metadata: i64, + pub mapped: i64, + pub resident: i64, + pub retained: i64, +} + +impl MemoryStats { + pub fn collect() -> Result { + epoch::advance().map_err(|e| format!("failed to advance jemalloc epoch: {}", e))?; + + Ok(MemoryStats { + thread_allocated: thread::allocatedp::read() + .map_err(|e| format!("failed to read thread.allocatedp: {}", e))? + .get(), + thread_deallocated: thread::deallocatedp::read() + .map_err(|e| format!("failed to read thread.deallocatedp: {}", e))? + .get(), + allocated: stats::allocated::read() + .map_err(|e| format!("failed to read allocated: {}", e))? + .try_into() + .unwrap(), + active: stats::active::read() + .map_err(|e| format!("failed to read active: {}", e))? + .try_into() + .unwrap(), + metadata: stats::metadata::read() + .map_err(|e| format!("failed to read metadata: {}", e))? + .try_into() + .unwrap(), + mapped: stats::mapped::read() + .map_err(|e| format!("failed to read mapped: {}", e))? + .try_into() + .unwrap(), + resident: stats::resident::read() + .map_err(|e| format!("failed to read resident: {}", e))? + .try_into() + .unwrap(), + retained: stats::retained::read() + .map_err(|e| format!("failed to read retained: {}", e))? + .try_into() + .unwrap(), + }) + } + + pub fn diff(&self, other: &MemoryStats) -> MemoryStatsDiff { + MemoryStatsDiff { + thread_allocated_diff: other.thread_allocated as i64 - self.thread_allocated as i64, + thread_deallocated_diff: other.thread_deallocated as i64 + - self.thread_deallocated as i64, + allocated: other.allocated as i64 - self.allocated as i64, + active: other.active as i64 - self.active as i64, + metadata: other.metadata as i64 - self.metadata as i64, + mapped: other.mapped as i64 - self.mapped as i64, + resident: other.resident as i64 - self.resident as i64, + retained: other.retained as i64 - self.retained as i64, + } + } +} + +pub struct MemoryBenchConfig { + pub test_data: Bytes, + pub scale: usize, + pub mode: String, +} + +impl MemoryBenchConfig { + pub fn from_args() -> Self { + let args: Vec = env::args().collect(); + let mode = args[1].clone(); + let scale: usize = args[2].parse().expect("invalid scale"); + let test_data = Bytes::from( + fs::read("../remote_write/tests/workloads/1709380533560664458.data") + .expect("test data load failed"), + ); + + MemoryBenchConfig { + test_data, + scale, + mode, + } + } + + pub fn output_json(&self, memory_diff: &MemoryStatsDiff) { + let result = json!({ + "mode": self.mode, + "scale": self.scale, + "memory": { + "thread_allocated_diff": memory_diff.thread_allocated_diff, + "thread_deallocated_diff": memory_diff.thread_deallocated_diff, + "allocated": memory_diff.allocated, + "active": memory_diff.active, + "metadata": memory_diff.metadata, + "mapped": memory_diff.mapped, + "resident": memory_diff.resident, + "retained": memory_diff.retained + } + }); + println!("{}", result); + } +} diff --git a/src/pb_types/build.rs b/src/pb_types/build.rs index 7eb68464b0..1dc9a4ffdb 100644 --- a/src/pb_types/build.rs +++ b/src/pb_types/build.rs @@ -18,6 +18,9 @@ use std::io::Result; fn main() -> Result<()> { - prost_build::compile_protos(&["protos/sst.proto"], &["protos/"])?; + prost_build::compile_protos( + &["protos/sst.proto", "protos/remote_write.proto"], + &["protos/"], + )?; Ok(()) } diff --git a/src/pb_types/protos/remote_write.proto b/src/pb_types/protos/remote_write.proto new file mode 100644 index 0000000000..110ac67bcc --- /dev/null +++ b/src/pb_types/protos/remote_write.proto @@ -0,0 +1,77 @@ +// Copyright 2016 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file is is modified from +// https://github.com/prometheus/prometheus/blob/main/prompb/remote.proto +// and https://github.com/prometheus/prometheus/blob/main/prompb/types.proto + +syntax = "proto3"; +package pb_types.remote_write; + +message WriteRequest { + repeated TimeSeries timeseries = 1; + // Cortex uses this field to determine the source of the write request. + // We reserve it to avoid any compatibility issues. + reserved 2; + repeated MetricMetadata metadata = 3; +} + +message MetricMetadata { + enum MetricType { + UNKNOWN = 0; + COUNTER = 1; + GAUGE = 2; + HISTOGRAM = 3; + GAUGEHISTOGRAM = 4; + SUMMARY = 5; + INFO = 6; + STATESET = 7; + } + + // Represents the metric type, these match the set from Prometheus. + // Refer to pkg/textparse/interface.go for details. + MetricType type = 1; + string metric_family_name = 2; + string help = 4; + string unit = 5; +} + +// TimeSeries represents samples and labels for a single time series. +message TimeSeries { + // For a timeseries to be valid, and for the samples and exemplars + // to be ingested by the remote system properly, the labels field is required. + repeated Label labels = 1; + repeated Sample samples = 2; + repeated Exemplar exemplars = 3; +} + +message Label { + string name = 1; + string value = 2; +} + +message Sample { + double value = 1; + // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + int64 timestamp = 2; +} + +message Exemplar { + // Optional, can be empty. + repeated Label labels = 1; + double value = 2; + // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + int64 timestamp = 3; +} \ No newline at end of file diff --git a/src/pb_types/src/lib.rs b/src/pb_types/src/lib.rs index bfa215b02c..727a1d8fa2 100644 --- a/src/pb_types/src/lib.rs +++ b/src/pb_types/src/lib.rs @@ -19,4 +19,9 @@ mod pb_types { include!(concat!(env!("OUT_DIR"), "/pb_types.sst.rs")); } +mod prost_remote_write { + include!(concat!(env!("OUT_DIR"), "/pb_types.remote_write.rs")); +} + pub use pb_types::*; +pub use prost_remote_write::*; diff --git a/src/remote_write/Cargo.toml b/src/remote_write/Cargo.toml new file mode 100644 index 0000000000..344308c4ed --- /dev/null +++ b/src/remote_write/Cargo.toml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "remote_write" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +description.workspace = true + +[features] +unsafe-split = [] + +[dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } +bytes = { workspace = true } +deadpool = { workspace = true } +once_cell = { workspace = true } +pb_types = { workspace = true } +prost = { workspace = true } +tokio = { workspace = true } diff --git a/src/remote_write/README.md b/src/remote_write/README.md new file mode 100644 index 0000000000..f41a00c5b6 --- /dev/null +++ b/src/remote_write/README.md @@ -0,0 +1,291 @@ +# Remote Write Parser + +A hand-written [Prometheus Remote Write Request (V1)](https://prometheus.io/docs/specs/prw/remote_write_spec/) parser optimized for zero-allocation parsing. It receives protobuf data as `Bytes` and returns a parsed `WriteRequest` instance. + +## Implementation + +Key optimization techniques: + +- Object pooling backed by deadpool. + +- `RepeatedField` data structures. + +- Zero-copy bytes split backed by unsafe. + +- Manual loop unrolling and function inline optimization. + +## Performance + +This section presents comprehensive performance analysis of the hand-written pooled parser compared to other popular Rust protobuf libraries ([prost](https://github.com/tokio-rs/prost), [rust-protobuf](https://github.com/stepancheg/rust-protobuf), [quick-protobuf](https://github.com/tafia/quick-protobuf)) and the [easyproto](https://github.com/VictoriaMetrics/easyproto) (Go) implementation. All tests were conducted on Ubuntu 22.04.4 LTS x86_64 with AMD EPYC 7742 (8) @ 2.249GHz CPU and 16GB RAM. + +### Prerequisites + +Install the required dependencies: + +```shell +cargo install pb-rs +pip3 install tabulate matplotlib +``` + +### CPU Time + +#### Steps + +Navigate to the benchmarks directory: + +```shell +cd src/benchmarks +``` + +Run the standard benchmarks: + +```shell +BENCH_CONFIG_PATH=config.toml cargo bench --bench bench remote_write +``` + +Or enable unsafe optimization for better performance: + +```shell +BENCH_CONFIG_PATH=config.toml cargo bench --features unsafe-split --bench bench remote_write +``` + +We also benchmarked against the [easyproto](https://github.com/VictoriaMetrics/easyproto) library for comparison: + +```shell +git clone https://github.com/VictoriaMetrics/VictoriaMetrics.git +git checkout d083ff790a203ecda1cbbd527c792ef19c159f91 +cd VictoriaMetrics/lib/prompb +vim prom_decode_bench_test.go +``` + +and add the following code (please change the path of 1709380533560664458.data): + +```go +package prompb + +import ( + "fmt" + "os" + "sync" + "testing" +) + +type Decoder interface { + Parse(data []byte) error + Reset() + Clone() Decoder +} + +type PooledDecoder struct { + pool *sync.Pool +} + +func NewPooledDecoder() *PooledDecoder { + pool := &sync.Pool{ + New: func() interface{} { + return &WriteRequest{} + }, + } + return &PooledDecoder{pool: pool} +} + +func (d *PooledDecoder) Parse(data []byte) error { + wr := d.pool.Get().(*WriteRequest) + defer d.pool.Put(wr) + wr.Reset() + return wr.UnmarshalProtobuf(data) +} + +func (d *PooledDecoder) Reset() { + // Pool handles reset internally. +} + +func (d *PooledDecoder) Clone() Decoder { + return d +} + +type NoPoolDecoder struct { + wr *WriteRequest +} + +func NewNoPoolDecoder() *NoPoolDecoder { + return &NoPoolDecoder{ + wr: &WriteRequest{}, + } +} + +func (d *NoPoolDecoder) Parse(data []byte) error { + d.wr.Reset() + return d.wr.UnmarshalProtobuf(data) +} + +func (d *NoPoolDecoder) Reset() { + d.wr.Reset() +} + +func (d *NoPoolDecoder) Clone() Decoder { + return NewNoPoolDecoder() +} + +func getTestDataPath() ([]byte, error) { + return os.ReadFile("1709380533560664458.data") +} + +// Sequential benchmark. +func benchDecoderSequential(decoder Decoder, data []byte, n int) error { + for i := 0; i < n; i++ { + decoder.Reset() + if err := decoder.Parse(data); err != nil { + return err + } + } + return nil +} + +// Concurrent benchmark. +func benchDecoderConcurrent(decoder Decoder, data []byte, workers int) error { + results := make(chan error, workers) + + // Spawn workers (similar to tokio::spawn in Rust). + for w := 0; w < workers; w++ { + go func() { + clonedDecoder := decoder.Clone() + clonedDecoder.Reset() + err := clonedDecoder.Parse(data) + results <- err + }() + } + + // Wait for all workers to complete (similar to join_handle.await). + for w := 0; w < workers; w++ { + if err := <-results; err != nil { + return err + } + } + return nil +} + +func BenchmarkSequentialParse(b *testing.B) { + data, err := getTestDataPath() + if err != nil { + b.Skipf("test data file not found: %v", err) + } + + decoders := map[string]Decoder{ + "pooled": NewPooledDecoder(), + "nopool": NewNoPoolDecoder(), + } + + iterations := []int{1, 5, 10, 20, 100} + + for decoderName, decoder := range decoders { + for _, n := range iterations { + b.Run(fmt.Sprintf("%s/%d", decoderName, n), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := benchDecoderSequential(decoder, data, n); err != nil { + b.Fatalf("failed to parse: %v", err) + } + } + }) + } + } +} + +func BenchmarkConcurrentParse(b *testing.B) { + data, err := getTestDataPath() + if err != nil { + b.Skipf("test data file not found: %v", err) + } + + decoders := map[string]Decoder{ + "pooled": NewPooledDecoder(), + "nopool": NewNoPoolDecoder(), + } + + workers := []int{1, 5, 10, 20, 100} + + for decoderName, decoder := range decoders { + for _, w := range workers { + b.Run(fmt.Sprintf("%s/%d", decoderName, w), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := benchDecoderConcurrent(decoder, data, w); err != nil { + b.Fatalf("failed to parse: %v", err) + } + } + }) + } + } +} +``` + +Execute the Go benchmarks: + +```shell +go test -bench=. -run=^$ +``` + +#### Results + +Test results are as follows: + +![Sequential Performance](../../docs/assets/remote-write-sequential-performance.png) + +In sequential parsing scenarios, the hand-written pooled parsers (with and without unsafe optimization) achieve the best performance across all scales compared to other Rust parsers. The unsafe optimization provides nearly 50% performance improvement. + +![Concurrent Performance](../../docs/assets/remote-write-concurrent-performance.png) + +**Note**: Due to the nature of concurrent execution, concurrent parsing benchmark results may vary (sometimes dramatically) across different runs. However, we can still draw an overall conclusion. + +In concurrent parsing scenarios, from an overall perspective, the hand-written pooled parsers (with and without unsafe optimization) still achieve the best performance compared to other Rust parsers. The unsafe optimization continues to provide performance improvements. + +### Memory Allocation + +Navigate to the benchmarks directory: + +```shell +cd src/benchmarks +``` + +Run memory allocation benchmarks: + +```shell +python3 remote_write_memory_bench.py --mode sequential --scale 10 +``` + +Or enable unsafe optimization: + +```shell +python3 remote_write_memory_bench.py --mode concurrent --scale 10 --unsafe +``` + +**Note**: Sequential and concurrent mode results are similar due to the enforced `#[tokio::main(flavor = "current_thread")]` configuration. This constraint is necessary because Jemalloc's `thread::allocatedp` and `thread::deallocatedp` statistics can only track single-threaded allocations accurately. + +We focus on the [allocatedp](https://docs.rs/tikv-jemalloc-ctl/0.6.0/tikv_jemalloc_ctl/thread/struct.allocatedp.html) value to verify our zero-allocation parsing efforts, since it represents the number of bytes that **have ever been** allocated by the thread. + +The results are as follows: + +![Memory](../../docs/assets/remote-write-memory-performance.png) + +The hand-written pooled parser allocates minimal memory compared to other Rust parsers. Note that the difference between `ThreadAlloc` and `ThreadDealloc` in the pooled decoder is expected since we gather statistics right before the program terminates and objects remain in the pool (not freed) at that time. + +### Object Pool Efficiency + +Navigate to the benchmarks directory: + +```shell +cd src/benchmarks +``` + +Analyze pool utilization: + +```shell +cargo run --bin pool_stats --release +``` + +Our testing finds that only 8 objects in the pool are sufficient to handle 500 concurrent parsing operations. + +## Acknowledgements + +The two test data files in `src/remote_write/tests/workloads` are taken from [prom-write-request-bench](https://github.com/v0y4g3r/prom-write-request-bench/tree/main/assets). diff --git a/src/remote_write/src/lib.rs b/src/remote_write/src/lib.rs new file mode 100644 index 0000000000..7b5b157351 --- /dev/null +++ b/src/remote_write/src/lib.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod pb_reader; +pub mod pooled_parser; +pub mod pooled_types; +mod repeated_field; diff --git a/src/remote_write/src/pb_reader.rs b/src/remote_write/src/pb_reader.rs new file mode 100644 index 0000000000..9545d27e4e --- /dev/null +++ b/src/remote_write/src/pb_reader.rs @@ -0,0 +1,565 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use anyhow::{ensure, Result}; +use bytes::{Buf, Bytes}; + +use crate::pooled_types::{ + Exemplar, Label, MetricMetadata, MetricType, Sample, TimeSeries, WriteRequest, +}; + +#[repr(u8)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum WireType { + Varint = 0, + SixtyFourBit = 1, + LengthDelimited = 2, +} + +const FIELD_NUM_TIMESERIES: u32 = 1; +const FIELD_NUM_METADATA: u32 = 3; +const FIELD_NUM_LABELS: u32 = 1; +const FIELD_NUM_SAMPLES: u32 = 2; +const FIELD_NUM_EXEMPLARS: u32 = 3; +const FIELD_NUM_LABEL_NAME: u32 = 1; +const FIELD_NUM_LABEL_VALUE: u32 = 2; +const FIELD_NUM_SAMPLE_VALUE: u32 = 1; +const FIELD_NUM_SAMPLE_TIMESTAMP: u32 = 2; +const FIELD_NUM_EXEMPLAR_LABELS: u32 = 1; +const FIELD_NUM_EXEMPLAR_VALUE: u32 = 2; +const FIELD_NUM_EXEMPLAR_TIMESTAMP: u32 = 3; +const FIELD_NUM_METADATA_TYPE: u32 = 1; +const FIELD_NUM_METADATA_FAMILY_NAME: u32 = 2; +const FIELD_NUM_METADATA_HELP: u32 = 4; +const FIELD_NUM_METADATA_UNIT: u32 = 5; + +// Taken from https://github.com/v0y4g3r/prom-write-request-bench/blob/step6/optimize-slice/src/bytes.rs under Apache License 2.0. +#[cfg(feature = "unsafe-split")] +#[inline(always)] +unsafe fn copy_to_bytes(data: &mut Bytes, len: usize) -> Bytes { + if len == data.remaining() { + std::mem::replace(data, Bytes::new()) + } else { + let ret = unsafe { split_to_unsafe(data, len) }; + data.advance(len); + ret + } +} + +// Taken from https://github.com/v0y4g3r/prom-write-request-bench/blob/step6/optimize-slice/src/bytes.rs under Apache License 2.0. +#[cfg(feature = "unsafe-split")] +#[inline(always)] +pub unsafe fn split_to_unsafe(buf: &Bytes, end: usize) -> Bytes { + let len = buf.len(); + assert!( + end <= len, + "range end out of bounds: {:?} <= {:?}", + end, + len, + ); + + if end == 0 { + return Bytes::new(); + } + + let ptr = buf.as_ptr(); + // `Bytes::drop` does nothing when it's built via `from_static`. + use std::slice; + Bytes::from_static(unsafe { slice::from_raw_parts(ptr, end) }) +} + +pub struct ProtobufReader { + data: Bytes, +} + +impl ProtobufReader { + pub fn new(data: Bytes) -> Self { + Self { data } + } + + pub fn remaining(&self) -> usize { + self.data.remaining() + } + + /// Read a varint from the buffer. + /// + /// Similar to [quick-protobuf](https://github.com/tafia/quick-protobuf), unroll the loop in + /// [the official Go implementation](https://cs.opensource.google/go/go/+/refs/tags/go1.24.5:src/encoding/binary/varint.go;l=68) + /// for better performance. + #[inline(always)] + pub fn read_varint(&mut self) -> Result { + ensure!(self.data.has_remaining(), "not enough bytes for varint"); + // First byte. + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(b as u64); + } + let mut x = (b & 0x7f) as u64; + // Second byte. + ensure!(self.data.has_remaining(), "not enough bytes for varint"); + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 7)); + } + x |= ((b & 0x7f) as u64) << 7; + // Third byte. + ensure!(self.data.has_remaining(), "not enough bytes for varint"); + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 14)); + } + x |= ((b & 0x7f) as u64) << 14; + // Fourth byte. + ensure!(self.data.has_remaining(), "not enough bytes for varint"); + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 21)); + } + x |= ((b & 0x7f) as u64) << 21; + // Fifth byte. + ensure!(self.data.has_remaining(), "not enough bytes for varint"); + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 28)); + } + x |= ((b & 0x7f) as u64) << 28; + // Sixth byte. + ensure!(self.data.has_remaining(), "not enough bytes for varint"); + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 35)); + } + x |= ((b & 0x7f) as u64) << 35; + // Seventh byte. + ensure!(self.data.has_remaining(), "not enough bytes for varint"); + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 42)); + } + x |= ((b & 0x7f) as u64) << 42; + // Eighth byte. + ensure!(self.data.has_remaining(), "not enough bytes for varint"); + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 49)); + } + x |= ((b & 0x7f) as u64) << 49; + // Ninth byte. + ensure!(self.data.has_remaining(), "not enough bytes for varint"); + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 56)); + } + x |= ((b & 0x7f) as u64) << 56; + // Tenth byte (final byte, must terminate). + ensure!(self.data.has_remaining(), "not enough bytes for varint"); + let b = self.data.get_u8(); + ensure!(b < 0x80, "varint overflow"); + ensure!(b <= 1, "varint overflow"); + Ok(x | ((b as u64) << 63)) + } + + /// Read a double from the buffer. + #[inline(always)] + pub fn read_double(&mut self) -> Result { + ensure!(self.data.remaining() >= 8, "not enough bytes for double"); + // In Protobuf, double is encoded as 64-bit. + let bits = self.data.get_u64_le(); + Ok(f64::from_bits(bits)) + } + + /// Read a 64-bit integer from the buffer. + #[inline(always)] + pub fn read_int64(&mut self) -> Result { + // In Protobuf, int64 is encoded as varint. + self.read_varint().map(|v| v as i64) + } + + /// Read a string from the buffer. + pub fn read_string(&mut self) -> Result { + let len = self.read_varint()? as usize; + ensure!(self.data.remaining() >= len, "not enough bytes for string"); + // In Protobuf, string is encoded as length-delimited UTF-8 bytes. + #[cfg(feature = "unsafe-split")] + let bytes = unsafe { copy_to_bytes(&mut self.data, len) }; + #[cfg(not(feature = "unsafe-split"))] + let bytes = self.data.split_to(len); + // Leave the responsibility of validating UTF-8 to the caller, + // which is the practice of both [easyproto](https://github.com/VictoriaMetrics/easyproto) + // and [prom-write-request-bench](https://github.com/v0y4g3r/prom-write-request-bench). + Ok(bytes) + } + + /// Read a tag from the buffer. + #[inline(always)] + pub fn read_tag(&mut self) -> Result<(u32, WireType)> { + // In Protobuf, tag is encoded as varint. + // tag = (field_number << 3) | wire_type. + let tag = self.read_varint()?; + let field_number = tag >> 3; + let wt_val = (tag & 0x07) as u8; + ensure!(wt_val <= 2, "unsupported wire type: {}", wt_val); + let wire_type = match wt_val { + 0 => WireType::Varint, + 1 => WireType::SixtyFourBit, + 2 => WireType::LengthDelimited, + _ => unreachable!(), + }; + Ok((field_number as u32, wire_type)) + } + + /// Read timeseries from the buffer. + #[inline(always)] + pub fn read_timeseries(&mut self, timeseries: &mut TimeSeries) -> Result<()> { + let len = self.read_varint()? as usize; + ensure!( + self.data.remaining() >= len, + "not enough bytes for timeseries" + ); + let start_remaining = self.data.remaining(); + let end_remaining = start_remaining - len; + while self.data.remaining() > end_remaining { + let (field_number, wire_type) = self.read_tag()?; + match field_number { + FIELD_NUM_LABELS => { + validate_wire_type(wire_type, WireType::LengthDelimited, "labels")?; + let label_ref = timeseries.labels.push_default(); + self.read_label(label_ref)?; + } + FIELD_NUM_SAMPLES => { + validate_wire_type(wire_type, WireType::LengthDelimited, "samples")?; + let sample_ref = timeseries.samples.push_default(); + self.read_sample(sample_ref)?; + } + FIELD_NUM_EXEMPLARS => { + validate_wire_type(wire_type, WireType::LengthDelimited, "exemplars")?; + let exemplar_ref = timeseries.exemplars.push_default(); + self.read_exemplar(exemplar_ref)?; + } + _ => { + // Skip unknown fields instead of returning an error + self.skip_field(wire_type)?; + } + } + } + Ok(()) + } + + /// Read label from the buffer. + #[inline(always)] + pub fn read_label(&mut self, label: &mut Label) -> Result<()> { + let len = self.read_varint()? as usize; + ensure!(self.data.remaining() >= len, "not enough bytes for label"); + let start_remaining = self.data.remaining(); + let end_remaining = start_remaining - len; + while self.data.remaining() > end_remaining { + let (field_number, wire_type) = self.read_tag()?; + match field_number { + FIELD_NUM_LABEL_NAME => { + validate_wire_type(wire_type, WireType::LengthDelimited, "label name")?; + label.name = self.read_string()?; + } + FIELD_NUM_LABEL_VALUE => { + validate_wire_type(wire_type, WireType::LengthDelimited, "label value")?; + label.value = self.read_string()?; + } + _ => { + self.skip_field(wire_type)?; + } + } + } + Ok(()) + } + + /// Read sample from the buffer. + #[inline(always)] + pub fn read_sample(&mut self, sample: &mut Sample) -> Result<()> { + let len = self.read_varint()? as usize; + ensure!(self.data.remaining() >= len, "not enough bytes for sample"); + let start_remaining = self.data.remaining(); + let end_remaining = start_remaining - len; + while self.data.remaining() > end_remaining { + let (field_number, wire_type) = self.read_tag()?; + match field_number { + FIELD_NUM_SAMPLE_VALUE => { + validate_wire_type(wire_type, WireType::SixtyFourBit, "sample value")?; + sample.value = self.read_double()?; + } + FIELD_NUM_SAMPLE_TIMESTAMP => { + validate_wire_type(wire_type, WireType::Varint, "sample timestamp")?; + sample.timestamp = self.read_int64()?; + } + _ => { + self.skip_field(wire_type)?; + } + } + } + Ok(()) + } + + /// Read exemplar from the buffer. + #[inline(always)] + pub fn read_exemplar(&mut self, exemplar: &mut Exemplar) -> Result<()> { + let len = self.read_varint()? as usize; + ensure!( + self.data.remaining() >= len, + "not enough bytes for exemplar" + ); + let start_remaining = self.data.remaining(); + let end_remaining = start_remaining - len; + while self.data.remaining() > end_remaining { + let (field_number, wire_type) = self.read_tag()?; + match field_number { + FIELD_NUM_EXEMPLAR_LABELS => { + validate_wire_type(wire_type, WireType::LengthDelimited, "exemplar labels")?; + let label_ref = exemplar.labels.push_default(); + self.read_label(label_ref)?; + } + FIELD_NUM_EXEMPLAR_VALUE => { + validate_wire_type(wire_type, WireType::SixtyFourBit, "exemplar value")?; + exemplar.value = self.read_double()?; + } + FIELD_NUM_EXEMPLAR_TIMESTAMP => { + validate_wire_type(wire_type, WireType::Varint, "exemplar timestamp")?; + exemplar.timestamp = self.read_int64()?; + } + _ => { + self.skip_field(wire_type)?; + } + } + } + Ok(()) + } + + /// Read metric metadata from the buffer. + #[inline(always)] + pub fn read_metric_metadata(&mut self, metadata: &mut MetricMetadata) -> Result<()> { + let len = self.read_varint()? as usize; + ensure!( + self.data.remaining() >= len, + "not enough bytes for metadata" + ); + let start_remaining = self.data.remaining(); + let end_remaining = start_remaining - len; + while self.data.remaining() > end_remaining { + let (field_number, wire_type) = self.read_tag()?; + match field_number { + FIELD_NUM_METADATA_TYPE => { + validate_wire_type(wire_type, WireType::Varint, "metadata type")?; + let type_value = self.read_varint()? as i32; + metadata.metric_type = match type_value { + 0 => MetricType::Unknown, + 1 => MetricType::Counter, + 2 => MetricType::Gauge, + 3 => MetricType::Histogram, + 4 => MetricType::GaugeHistogram, + 5 => MetricType::Summary, + 6 => MetricType::Info, + 7 => MetricType::StateSet, + _ => MetricType::Unknown, + }; + } + FIELD_NUM_METADATA_FAMILY_NAME => { + validate_wire_type( + wire_type, + WireType::LengthDelimited, + "metadata family name", + )?; + metadata.metric_family_name = self.read_string()?; + } + FIELD_NUM_METADATA_HELP => { + validate_wire_type(wire_type, WireType::LengthDelimited, "metadata help")?; + metadata.help = self.read_string()?; + } + FIELD_NUM_METADATA_UNIT => { + validate_wire_type(wire_type, WireType::LengthDelimited, "metadata unit")?; + metadata.unit = self.read_string()?; + } + _ => { + self.skip_field(wire_type)?; + } + } + } + Ok(()) + } + + /// Skip an unknown field based on its wire type. + #[inline(always)] + pub fn skip_field(&mut self, wire_type: WireType) -> Result<()> { + match wire_type { + WireType::Varint => { + // For varint, read and discard the value. + self.read_varint()?; + Ok(()) + } + WireType::SixtyFourBit => { + // For 64-bit, skip 8 bytes. + ensure!( + self.data.remaining() >= 8, + "not enough bytes to skip 64-bit field" + ); + self.data.advance(8); + Ok(()) + } + WireType::LengthDelimited => { + // For length-delimited, read length then skip that many bytes. + let len = self.read_varint()? as usize; + ensure!( + self.data.remaining() >= len, + "not enough bytes to skip length-delimited field" + ); + self.data.advance(len); + Ok(()) + } + } + } +} + +#[inline(always)] +fn validate_wire_type(actual: WireType, expected: WireType, field_name: &str) -> Result<()> { + ensure!( + actual == expected, + "expected wire type {:?} for {}, but found wire type {:?}", + expected, + field_name, + actual + ); + Ok(()) +} + +/// Fill a [`WriteRequest`] instance with data from the buffer. +pub fn read_write_request(data: Bytes, request: &mut WriteRequest) -> Result<()> { + let mut reader = ProtobufReader::new(data); + while reader.remaining() > 0 { + let (field_number, wire_type) = reader.read_tag()?; + match field_number { + FIELD_NUM_TIMESERIES => { + validate_wire_type(wire_type, WireType::LengthDelimited, "timeseries")?; + let timeseries_ref = request.timeseries.push_default(); + reader.read_timeseries(timeseries_ref)?; + } + FIELD_NUM_METADATA => { + validate_wire_type(wire_type, WireType::LengthDelimited, "metadata")?; + let metadata_ref = request.metadata.push_default(); + reader.read_metric_metadata(metadata_ref)?; + } + _ => { + // Skip unknown fields instead of returning an error + reader.skip_field(wire_type)?; + } + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_read_varint_single_byte() { + let data = &[0x42]; + let mut reader = ProtobufReader::new(Bytes::copy_from_slice(data)); + assert_eq!(reader.read_varint().unwrap(), 66); + } + + #[test] + fn test_read_varint_multi_byte() { + let data = &[0x96, 0x01]; + let mut reader = ProtobufReader::new(Bytes::copy_from_slice(data)); + assert_eq!(reader.read_varint().unwrap(), 150); + } + + #[test] + fn test_read_double() { + let data = &[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xF0, 0x3F]; + let mut reader = ProtobufReader::new(Bytes::copy_from_slice(data)); + assert_eq!(reader.read_double().unwrap(), 1.0); + } + + #[test] + fn test_read_string() { + let data = &[0x05, b'h', b'e', b'l', b'l', b'o']; + let mut reader = ProtobufReader::new(Bytes::copy_from_slice(data)); + assert_eq!(reader.read_string().unwrap(), "hello"); + } + + #[test] + fn test_parse_write_request() { + use pb_types::{ + Exemplar, Label, MetricMetadata, Sample, TimeSeries, WriteRequest as PbWriteRequest, + }; + use prost::Message; + + let write_request = PbWriteRequest { + timeseries: vec![TimeSeries { + labels: vec![Label { + name: "metric_name".to_string(), + value: "test_value".to_string(), + }], + samples: vec![Sample { + value: 42.5, + timestamp: 1234567890, + }], + exemplars: vec![Exemplar { + labels: vec![Label { + name: "trace_id".to_string(), + value: "abc123".to_string(), + }], + value: 50.0, + timestamp: 1234567891, + }], + }], + metadata: vec![MetricMetadata { + r#type: 1, + metric_family_name: "test_metric".to_string(), + help: "Test metric description".to_string(), + unit: "bytes".to_string(), + }], + }; + + let encoded = write_request.encode_to_vec(); + let data = Bytes::from(encoded); + let mut pooled_request = crate::pooled_types::WriteRequest::default(); + read_write_request(data, &mut pooled_request).unwrap(); + + assert_eq!(pooled_request.timeseries.len(), 1); + let ts = &pooled_request.timeseries[0]; + assert_eq!(ts.labels.len(), 1); + let label = &ts.labels[0]; + assert_eq!(label.name, "metric_name"); + assert_eq!(label.value, "test_value"); + assert_eq!(ts.samples.len(), 1); + let sample = &ts.samples[0]; + assert_eq!(sample.value, 42.5); + assert_eq!(sample.timestamp, 1234567890); + assert_eq!(ts.exemplars.len(), 1); + let exemplar = &ts.exemplars[0]; + assert_eq!(exemplar.value, 50.0); + assert_eq!(exemplar.timestamp, 1234567891); + assert_eq!(exemplar.labels.len(), 1); + let exemplar_label = &exemplar.labels[0]; + assert_eq!(exemplar_label.name, "trace_id"); + assert_eq!(exemplar_label.value, "abc123"); + assert_eq!(pooled_request.metadata.len(), 1); + let metadata = &pooled_request.metadata[0]; + assert_eq!(metadata.metric_type, MetricType::Counter); + assert_eq!(metadata.metric_family_name, "test_metric"); + assert_eq!(metadata.help, "Test metric description"); + assert_eq!(metadata.unit, "bytes"); + } +} diff --git a/src/remote_write/src/pooled_parser.rs b/src/remote_write/src/pooled_parser.rs new file mode 100644 index 0000000000..8b2c62fb1c --- /dev/null +++ b/src/remote_write/src/pooled_parser.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pooled parser for Prometheus remote write requests. +//! +//! This crate parses the protobuf `string` type as Rust `Bytes` instances +//! instead of `String` instances to avoid allocation, and it **does not** +//! perform UTF-8 validation when parsing. Therefore, it is up to the caller to +//! decide how to make use of the parsed `Bytes` and whether to apply UTF-8 +//! validation. + +use anyhow::Result; +use bytes::Bytes; + +use crate::{ + pb_reader::read_write_request, + pooled_types::{WriteRequest, WriteRequestManager, POOL}, + repeated_field::Clear, +}; + +#[derive(Debug, Clone)] +pub struct PooledParser; + +impl PooledParser { + fn new() -> Self { + Self + } + + /// Decode a [`WriteRequest`] from the buffer and return it. + pub fn decode(&self, buf: Bytes) -> Result { + // Cannot get a WriteRequest instance from the pool in sync functions. + let mut request = WriteRequest::default(); + read_write_request(buf, &mut request)?; + Ok(request) + } + + /// Decode a [`WriteRequest`] from the buffer and return a pooled object. + /// + /// This method will reuse a [`WriteRequest`] instance from the object + /// pool. After the returned object is dropped, it will be returned to the + pub async fn decode_async( + &self, + buf: Bytes, + ) -> Result> { + let mut pooled_request = POOL + .get() + .await + .map_err(|e| anyhow::anyhow!("failed to get object from pool: {e:?}"))?; + pooled_request.clear(); + read_write_request(buf, &mut pooled_request)?; + Ok(pooled_request) + } +} + +impl Default for PooledParser { + fn default() -> Self { + Self::new() + } +} diff --git a/src/remote_write/src/pooled_types.rs b/src/remote_write/src/pooled_types.rs new file mode 100644 index 0000000000..427eb60788 --- /dev/null +++ b/src/remote_write/src/pooled_types.rs @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use bytes::Bytes; +use deadpool::managed::{Manager, Metrics, Pool, RecycleResult}; +use once_cell::sync::Lazy; + +use crate::repeated_field::{Clear, RepeatedField}; + +#[derive(Debug, Clone)] +pub struct Label { + pub name: Bytes, + pub value: Bytes, +} + +impl Default for Label { + fn default() -> Self { + Self { + name: Bytes::new(), + value: Bytes::new(), + } + } +} + +impl Clear for Label { + fn clear(&mut self) { + self.name.clear(); + self.value.clear(); + } +} + +#[derive(Debug, Clone)] +pub struct Sample { + pub value: f64, + pub timestamp: i64, +} + +impl Default for Sample { + fn default() -> Self { + Self { + value: 0.0, + timestamp: 0, + } + } +} + +impl Clear for Sample { + fn clear(&mut self) { + self.value = 0.0; + self.timestamp = 0; + } +} + +#[derive(Debug, Clone)] +pub struct Exemplar { + pub labels: RepeatedField