Skip to content

Conversation

@BaldDemian
Copy link
Contributor

Rationale

A hand-written remote write request parser to replace prost, with the primary goal of achieving zero-allocation parsing.

Detailed Changes

  • The new remote_write directory contains the core implementation of the hand-written parser.

  • Add benchmarks.

  • Add a .proto file in pb_types.

  • Fix CI errors.

Test Plan

  • Unit tests can be found at the end of the pb_reader.rs.

  • A comprehensive equivalence test is available in equivalence_test.rs, which validates the correctness of the hand-written parser by comparing its output with that of the prost auto-generated parser.

@BaldDemian BaldDemian changed the title Add hand-written remote write request parser as a new crate feat: add hand-written Prometheus remote write request parser Aug 22, 2025
@github-actions github-actions bot added the feature New feature or request label Aug 22, 2025
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a hand-written Prometheus remote write request parser optimized for zero-allocation parsing to replace prost. The implementation leverages object pooling, custom data structures, and optional unsafe optimizations to achieve better performance while maintaining compatibility with the existing protobuf format.

Key changes:

  • Hand-written protobuf parser with zero-allocation parsing capabilities
  • Object pooling mechanism using deadpool for memory reuse
  • Optional unsafe optimization feature for zero-copy bytes splitting

Reviewed Changes

Copilot reviewed 24 out of 36 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/remote_write/src/pooled_parser.rs Main parser interface providing sync/async decode methods
src/remote_write/src/pb_reader.rs Core protobuf reading implementation with manual parsing logic
src/remote_write/src/pooled_types.rs Pooled data structures and object pool management
src/remote_write/src/repeated_field.rs Custom RepeatedField implementation for memory optimization
src/remote_write/tests/equivalence_test.rs Comprehensive test comparing hand-written parser with prost
src/pb_types/protos/remote_write.proto Protobuf schema definition for remote write requests
src/benchmarks/ Benchmark infrastructure and memory analysis tools

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines 47 to 52
impl PooledLabel {
pub fn clear(&mut self) {
self.name.clear();
self.value.clear();
}
}
Copy link

Copilot AI Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PooledLabel struct has both a Clear trait implementation (line 40-45) and a duplicate clear method (line 48-52). This duplication is unnecessary since the Clear trait already provides the same functionality.

Suggested change
impl PooledLabel {
pub fn clear(&mut self) {
self.name.clear();
self.value.clear();
}
}
// impl PooledLabel {
// pub fn clear(&mut self) {
// self.name.clear();
// self.value.clear();
// }
// }

Copilot uses AI. Check for mistakes.
Comment on lines 157 to 163
impl PooledTimeSeries {
pub fn clear(&mut self) {
self.labels.clear();
self.samples.clear();
self.exemplars.clear();
}
}
Copy link

Copilot AI Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PooledTimeSeries struct has both a Clear trait implementation (line 149-155) and a duplicate clear method (line 158-163). This duplication is unnecessary since the Clear trait already provides the same functionality.

Suggested change
impl PooledTimeSeries {
pub fn clear(&mut self) {
self.labels.clear();
self.samples.clear();
self.exemplars.clear();
}
}
// impl PooledTimeSeries {
// pub fn clear(&mut self) {
// self.labels.clear();
// self.samples.clear();
// self.exemplars.clear();
// }
// }

Copilot uses AI. Check for mistakes.
Comment on lines 178 to 183
impl PooledWriteRequest {
pub fn clear(&mut self) {
self.timeseries.clear();
self.metadata.clear();
}
}
Copy link

Copilot AI Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PooledWriteRequest struct has both a Clear trait implementation (line 171-176) and a duplicate clear method (line 179-183). This duplication is unnecessary since the Clear trait already provides the same functionality.

Suggested change
impl PooledWriteRequest {
pub fn clear(&mut self) {
self.timeseries.clear();
self.metadata.clear();
}
}
// Removed redundant inherent clear method; Clear trait implementation is sufficient.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@jiacai2050 jiacai2050 requested a review from Copilot October 24, 2025 08:01
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 24 out of 36 changed files in this pull request and generated 4 comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

}

impl<T> Clear for Option<T> {
fn clear(&mut self) {
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Clear trait documentation mentions 'make it equivalent to newly created object' but the implementation for RepeatedField at line 82 only sets len to 0 without clearing the actual data in the backing Vec. While this is correct for the pooling use case, the trait documentation should clarify that 'clear' means resetting to a logically empty state, not necessarily deallocating or zeroing memory.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the comments as suggested by copilot.

pub fn skip_field(&mut self, wire_type: u8) -> Result<()> {
match wire_type {
WIRE_TYPE_VARINT => {
// For varint, read and discard the value..
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected double period '..' to single period '.'.

Suggested change
// For varint, read and discard the value..
// For varint, read and discard the value.

Copilot uses AI. Check for mistakes.
}

// Fix package namespace conflicts and inner attributes using sed.
let _ = Command::new("sed")
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The build script silently ignores errors from sed commands by using let _. If sed fails, the generated code may be incorrect but the build continues. Consider propagating these errors or at least logging warnings when sed commands fail.

Copilot uses AI. Check for mistakes.
# specific language governing permissions and limitations
# under the License.

#!/usr/bin/env python3
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shebang line appears after the license header rather than as the first line of the file. For proper script execution, the shebang should be the very first line. Move line 18 to line 1.

Copilot uses AI. Check for mistakes.
static ALLOC: Jemalloc = Jemalloc;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All those files under bin have similar structure, could you refactor then into one file and use params to choose which method to decode the data?

//! decide how to make use of the parsed `Bytes` and whether to apply UTF-8
//! validation.
//!
//! ## Basic Usage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer we remove all comments below L26 , the API is not stable now, and very likely the comments become out of date when we refactor the API.

/// This method will allocate a new [`PooledWriteRequest`] instance since it
/// is unable to use the object pool in sync functions.
///
/// # Example
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete comment below, the reason is the same as before.

/// # Ok(())
/// # }
/// ```
pub fn decode(&self, buf: Bytes) -> Result<PooledWriteRequest, String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer we use anyhow as the Error type for this crate, it has more features like backtrace.

/// # Ok(())
/// # }
/// ```
pub async fn decode_async(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this async functions, the encode/decode is a CPU-bound work, use async has little benefits.

use crate::repeated_field::{Clear, RepeatedField};

#[derive(Debug, Clone)]
pub struct PooledLabel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name this Label, same to other structures in this file.

We shouldn't add the Pooled prefix just because it's used in Pool, it could be used in a non-pool case.

}

impl<T> Clear for Option<T> {
fn clear(&mut self) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the comments as suggested by copilot.

metadata: Vec::new(),
};

for pooled_ts in pooled_request.timeseries.iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for pooled_ts in pooled_request.timeseries.iter() {
for pooled_ts in &pooled_request.timeseries {

The same changes apply to for loops below also.


This crate parses the protobuf `string` type as Rust `Bytes` instances instead of `String` instances to avoid allocation, and it **does not** perform UTF-8 validation when parsing. Also, it **does not** check the semantics of the parsed data, such as [Labels](https://prometheus.io/docs/specs/prw/remote_write_spec/#labels). Therefore, it is up to the caller to decide how to make use of the parsed `Bytes` and whether to apply extra validation or error handling.

### Basic Usage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this section

PooledTimeSeries, PooledWriteRequest,
};

const WIRE_TYPE_VARINT: u8 = 0;
Copy link
Contributor

@jiacai2050 jiacai2050 Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer define this wire type as enum like this

#[repr(u8)]
enum WriteType {
  ... 
}

const FIELD_NUM_METADATA_UNIT: u32 = 5;

// Taken from https://github.com/v0y4g3r/prom-write-request-bench/blob/step6/optimize-slice/src/bytes.rs under Apache License 2.0.
#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete those dead code functions.

/// for better performance.
#[inline(always)]
pub fn read_varint(&mut self) -> Result<u64> {
if !self.data.has_remaining() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After you define Error as anyhow, you can simplify it like

ensure!(data.has_remaining(), "no enough byte for varbint")

@jiacai2050 jiacai2050 merged commit 9cec563 into apache:main Oct 28, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants