From 2bf9e3ed873abd43e0e40899cba807b0ec5bab35 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Wed, 4 Feb 2026 17:20:10 +0530 Subject: [PATCH 1/4] Sink optimizations - reduce message cloning Signed-off-by: Sreekanth --- rust/numaflow-core/src/sinker/actor.rs | 49 ++++++++++++++------------ 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/rust/numaflow-core/src/sinker/actor.rs b/rust/numaflow-core/src/sinker/actor.rs index 29eaae110f..0924db3935 100644 --- a/rust/numaflow-core/src/sinker/actor.rs +++ b/rust/numaflow-core/src/sinker/actor.rs @@ -63,7 +63,8 @@ where } // State to accumulate outcomes across retries - let mut messages_to_retry = messages.clone(); + // Take ownership directly instead of cloning + let mut messages_to_retry = messages; let mut fallback_messages = Vec::new(); let mut serving_messages = Vec::new(); let mut dropped_messages = Vec::new(); @@ -84,7 +85,7 @@ where let mut error_map = HashMap::new(); loop { - // send batch to sink + // send batch to sink (clone needed since we need messages for classification) let responses = self.sink.sink(messages_to_retry.clone()).await?; // Create a map of id to result @@ -93,35 +94,40 @@ where .map(|resp| (resp.id, resp.status)) .collect::>(); - // Classify messages based on responses - let mut failed_ids = Vec::new(); - - messages_to_retry.retain_mut(|msg| { - match result_map.remove(&msg.id.to_string()) { + // Classify messages based on responses using index-based iteration + // with swap_remove to avoid cloning when moving messages to result vectors + let mut i = 0; + while i < messages_to_retry.len() { + let msg_id = messages_to_retry[i].id.to_string(); + match result_map.remove(&msg_id) { Some(ResponseStatusFromSink::Success) => { - false // remove from retry list + // Remove from retry list, don't need the message anymore + messages_to_retry.swap_remove(i); + // Don't increment i, the swapped element needs to be checked } Some(ResponseStatusFromSink::Failed(err_msg)) => { - failed_ids.push(msg.id.to_string()); - *error_map.entry(err_msg.clone()).or_insert(0) += 1; - true // keep for retry + *error_map.entry(err_msg).or_insert(0) += 1; + i += 1; // keep for retry } Some(ResponseStatusFromSink::Fallback) => { - fallback_messages.push(msg.clone()); - false // remove from retry list + // Move message to fallback without cloning + fallback_messages.push(messages_to_retry.swap_remove(i)); } Some(ResponseStatusFromSink::Serve(serve_response)) => { + // Move message to serving without cloning + let mut msg = messages_to_retry.swap_remove(i); if let Some(serve_response) = serve_response { msg.value = serve_response.into(); } - serving_messages.push(msg.clone()); - false // remove from retry list + serving_messages.push(msg); } Some(ResponseStatusFromSink::OnSuccess(on_success_msg)) => { + // Move message to on_success without cloning + let msg = messages_to_retry.swap_remove(i); if let Some(on_success_msg) = on_success_msg { let on_success_md: Option = on_success_msg.metadata.map(|md| md.into()); - let new_md = match &mut msg.metadata { + let new_md = match &msg.metadata { // Following clones are required explicitly since Arc doesn't allow // interior mutability, so we cannot move the required fields out of Arc // without cloning, unless we can guarantee there is only a single reference to Arc @@ -144,18 +150,17 @@ where value: on_success_msg.value.into(), keys: on_success_msg.keys.into(), metadata: Some(Arc::new(new_md)), - ..msg.clone() + ..msg }; - on_success_messages.push(new_msg.clone()); + on_success_messages.push(new_msg); } else { // Send the original message if no payload was provided to the onSuccess sink - on_success_messages.push(msg.clone()); + on_success_messages.push(msg); } - false // remove from retry list } - None => unreachable!("should have response for all messages"), // remove if no response + None => unreachable!("should have response for all messages"), } - }); + } if messages_to_retry.is_empty() { // success path, all messages processed From bf8845c9d305ddd2e4d95f66ec14f7d9ff17264a Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Wed, 4 Feb 2026 19:23:17 +0530 Subject: [PATCH 2/4] Use bytes::Bytes in proto files to avoid Vec conversions Signed-off-by: Sreekanth --- rust/numaflow-core/src/mapper/map.rs | 2 +- rust/numaflow-core/src/message.rs | 2 +- .../reduce/reducer/aligned/user_defined.rs | 6 ++--- .../unaligned/user_defined/accumulator.rs | 2 +- .../reducer/unaligned/user_defined/session.rs | 2 +- rust/numaflow-core/src/reduce/wal.rs | 2 +- rust/numaflow-core/src/sinker/sink.rs | 3 ++- .../src/sinker/sink/serve/user_defined.rs | 2 +- .../src/sinker/sink/user_defined.rs | 2 +- rust/numaflow-core/src/source/user_defined.rs | 3 ++- rust/numaflow-core/src/transformer.rs | 9 +++++--- .../src/transformer/user_defined.rs | 2 +- .../numaflow-pb/src/clients/accumulator.v1.rs | 4 ++-- rust/numaflow-pb/src/clients/map.v1.rs | 8 +++---- rust/numaflow-pb/src/clients/mapstream.v1.rs | 8 +++---- rust/numaflow-pb/src/clients/reduce.v1.rs | 8 +++---- rust/numaflow-pb/src/clients/serving.v1.rs | 4 ++-- .../src/clients/sessionreduce.v1.rs | 8 +++---- rust/numaflow-pb/src/clients/sideinput.v1.rs | 4 ++-- rust/numaflow-pb/src/clients/sink.v1.rs | 12 +++++----- rust/numaflow-pb/src/clients/source.v1.rs | 8 +++---- .../src/clients/sourcetransformer.v1.rs | 8 +++---- rust/numaflow-pb/src/common/metadata.rs | 2 +- rust/numaflow-pb/src/main.rs | 23 +++++++++++++++++-- rust/numaflow-pb/src/objects/isb.rs | 4 ++-- .../src/app/store/datastore/user_defined.rs | 2 +- 26 files changed, 82 insertions(+), 58 deletions(-) diff --git a/rust/numaflow-core/src/mapper/map.rs b/rust/numaflow-core/src/mapper/map.rs index 649d3f95d1..09e50cedd4 100644 --- a/rust/numaflow-core/src/mapper/map.rs +++ b/rust/numaflow-core/src/mapper/map.rs @@ -71,7 +71,7 @@ impl From for MapRequest { Self { request: Some(map::map_request::Request { keys: message.keys.to_vec(), - value: message.value.to_vec(), + value: message.value.clone(), event_time: Some(prost_timestamp_from_utc(message.event_time)), watermark: message.watermark.map(prost_timestamp_from_utc), headers: Arc::unwrap_or_clone(message.headers), diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index b42efb2d75..bc6d92f2dc 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -366,7 +366,7 @@ impl TryFrom for BytesMut { }), }), body: Some(numaflow_pb::objects::isb::Body { - payload: message.value.to_vec(), + payload: message.value.clone(), }), }; diff --git a/rust/numaflow-core/src/reduce/reducer/aligned/user_defined.rs b/rust/numaflow-core/src/reduce/reducer/aligned/user_defined.rs index 7f783184fb..605424c44e 100644 --- a/rust/numaflow-core/src/reduce/reducer/aligned/user_defined.rs +++ b/rust/numaflow-core/src/reduce/reducer/aligned/user_defined.rs @@ -18,7 +18,7 @@ impl From for reduce_request::Payload { fn from(msg: Message) -> Self { Self { keys: msg.keys.to_vec(), - value: msg.value.to_vec(), + value: msg.value.clone(), event_time: Some(prost_timestamp_from_utc(msg.event_time)), watermark: msg.watermark.map(prost_timestamp_from_utc), headers: Arc::unwrap_or_clone(msg.headers), @@ -31,7 +31,7 @@ impl From<&Message> for reduce_request::Payload { fn from(msg: &Message) -> Self { Self { keys: msg.keys.to_vec(), - value: msg.value.to_vec(), + value: msg.value.clone(), event_time: Some(prost_timestamp_from_utc(msg.event_time)), watermark: msg.watermark.map(prost_timestamp_from_utc), headers: (*msg.headers).clone(), @@ -78,7 +78,7 @@ impl From for ReduceRequest { ReduceRequest { payload: Some(reduce_request::Payload { keys: vec![], - value: vec![], + value: bytes::Bytes::new(), event_time: None, watermark: None, headers: Default::default(), diff --git a/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/accumulator.rs b/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/accumulator.rs index b95b3fca24..241214560b 100644 --- a/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/accumulator.rs +++ b/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/accumulator.rs @@ -20,7 +20,7 @@ impl From for accumulator::Payload { fn from(msg: Message) -> Self { Self { keys: msg.keys.to_vec(), - value: msg.value.to_vec(), + value: msg.value.clone(), event_time: Some(prost_timestamp_from_utc(msg.event_time)), watermark: msg.watermark.map(prost_timestamp_from_utc), id: msg.id.to_string(), diff --git a/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/session.rs b/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/session.rs index dd6f740043..f32f8ba62e 100644 --- a/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/session.rs +++ b/rust/numaflow-core/src/reduce/reducer/unaligned/user_defined/session.rs @@ -22,7 +22,7 @@ impl From for session_reduce_request::Payload { fn from(msg: Message) -> Self { Self { keys: msg.keys.to_vec(), - value: msg.value.to_vec(), + value: msg.value.clone(), event_time: Some(prost_timestamp_from_utc(msg.event_time)), watermark: msg.watermark.map(prost_timestamp_from_utc), headers: Arc::unwrap_or_clone(msg.headers), diff --git a/rust/numaflow-core/src/reduce/wal.rs b/rust/numaflow-core/src/reduce/wal.rs index 9678174003..2ee83d379d 100644 --- a/rust/numaflow-core/src/reduce/wal.rs +++ b/rust/numaflow-core/src/reduce/wal.rs @@ -48,7 +48,7 @@ impl TryFrom for Bytes { metadata: message.metadata.map(|m| Arc::unwrap_or_clone(m).into()), }), body: Some(numaflow_pb::objects::isb::Body { - payload: message.value.to_vec(), + payload: message.value.clone(), }), }), read_offset: int_offset.offset, diff --git a/rust/numaflow-core/src/sinker/sink.rs b/rust/numaflow-core/src/sinker/sink.rs index eb6c70b945..b30d41e53c 100644 --- a/rust/numaflow-core/src/sinker/sink.rs +++ b/rust/numaflow-core/src/sinker/sink.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use crate::Result; use crate::config::pipeline::VERTEX_TYPE_SINK; use crate::config::{get_vertex_name, is_mono_vertex}; @@ -686,7 +687,7 @@ pub(crate) enum ResponseStatusFromSink { /// Write to FallBack Sink. Fallback, /// Write to serving store. - Serve(Option>), + Serve(Option), OnSuccess(Option), } diff --git a/rust/numaflow-core/src/sinker/sink/serve/user_defined.rs b/rust/numaflow-core/src/sinker/sink/serve/user_defined.rs index 9bef4556e1..849e4c4a6e 100644 --- a/rust/numaflow-core/src/sinker/sink/serve/user_defined.rs +++ b/rust/numaflow-core/src/sinker/sink/serve/user_defined.rs @@ -38,7 +38,7 @@ impl UserDefinedStore { id: payload.id, payloads: vec![Payload { origin: origin.clone(), - value: payload.value.to_vec(), + value: payload.value.clone(), }], }; client.put(request).await.map_err(|e| { diff --git a/rust/numaflow-core/src/sinker/sink/user_defined.rs b/rust/numaflow-core/src/sinker/sink/user_defined.rs index 4e8867c3d7..4f4d8eef5c 100644 --- a/rust/numaflow-core/src/sinker/sink/user_defined.rs +++ b/rust/numaflow-core/src/sinker/sink/user_defined.rs @@ -31,7 +31,7 @@ impl From for SinkRequest { Self { request: Some(numaflow_pb::clients::sink::sink_request::Request { keys: message.keys.to_vec(), - value: message.value.to_vec(), + value: message.value.clone(), event_time: Some(prost_timestamp_from_utc(message.event_time)), watermark: message.watermark.map(prost_timestamp_from_utc), id: message.id.to_string(), diff --git a/rust/numaflow-core/src/source/user_defined.rs b/rust/numaflow-core/src/source/user_defined.rs index ef5bca6909..5dd9244242 100644 --- a/rust/numaflow-core/src/source/user_defined.rs +++ b/rust/numaflow-core/src/source/user_defined.rs @@ -195,7 +195,8 @@ impl TryFrom for source::Offset { }) => Ok(source::Offset { offset: BASE64_STANDARD .decode(offset) - .expect("we control the encoding, so this should never fail"), + .expect("we control the encoding, so this should never fail") + .into(), partition_id: partition_idx as i32, }), Offset::Int(_) => Err(Error::Source("IntOffset not supported".to_string())), diff --git a/rust/numaflow-core/src/transformer.rs b/rust/numaflow-core/src/transformer.rs index e4adbc8439..490b0e3997 100644 --- a/rust/numaflow-core/src/transformer.rs +++ b/rust/numaflow-core/src/transformer.rs @@ -195,6 +195,8 @@ impl Transformer { .inc_by(messages.len() as u64); } + let message_count = messages.len(); + let tasks: Vec<_> = messages .into_iter() .map(|read_msg| { @@ -209,9 +211,10 @@ impl Transformer { })?; let _permit = permit; + let offset = read_msg.offset.clone(); let transformed_messages = Transformer::transform( transform_handle, - read_msg.clone(), + read_msg, hard_shutdown_token.clone(), ) .await?; @@ -219,7 +222,7 @@ impl Transformer { // update the tracker with the number of responses for each message tracker .serving_update( - &read_msg.offset, + &offset, transformed_messages .iter() .map(|m| m.tags.clone()) @@ -232,7 +235,7 @@ impl Transformer { }) .collect(); - let mut transformed_messages = Vec::new(); + let mut transformed_messages = Vec::with_capacity(message_count); for task in tasks { match task.await { Ok(Ok(mut msgs)) => transformed_messages.append(&mut msgs), diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index c8b4036f49..6c31bbacc6 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -96,7 +96,7 @@ impl From for SourceTransformRequest { request: Some(sourcetransformer::source_transform_request::Request { id: message.offset.to_string(), keys: message.keys.to_vec(), - value: message.value.to_vec(), + value: message.value.clone(), event_time: Some(prost_timestamp_from_utc(message.event_time)), watermark: message.watermark.map(prost_timestamp_from_utc), headers: Arc::unwrap_or_clone(message.headers), diff --git a/rust/numaflow-pb/src/clients/accumulator.v1.rs b/rust/numaflow-pb/src/clients/accumulator.v1.rs index 47bfc6bb02..bbadd8077d 100644 --- a/rust/numaflow-pb/src/clients/accumulator.v1.rs +++ b/rust/numaflow-pb/src/clients/accumulator.v1.rs @@ -4,8 +4,8 @@ pub struct Payload { #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, #[prost(message, optional, tag = "3")] pub event_time: ::core::option::Option<::prost_types::Timestamp>, #[prost(message, optional, tag = "4")] diff --git a/rust/numaflow-pb/src/clients/map.v1.rs b/rust/numaflow-pb/src/clients/map.v1.rs index 7b50496a9d..6a0ec31622 100644 --- a/rust/numaflow-pb/src/clients/map.v1.rs +++ b/rust/numaflow-pb/src/clients/map.v1.rs @@ -20,8 +20,8 @@ pub mod map_request { pub struct Request { #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, #[prost(message, optional, tag = "3")] pub event_time: ::core::option::Option<::prost_types::Timestamp>, #[prost(message, optional, tag = "4")] @@ -70,8 +70,8 @@ pub mod map_response { pub struct Result { #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, #[prost(string, repeated, tag = "3")] pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// Metadata is the metadata of the message diff --git a/rust/numaflow-pb/src/clients/mapstream.v1.rs b/rust/numaflow-pb/src/clients/mapstream.v1.rs index ab22597d73..34357e8ab4 100644 --- a/rust/numaflow-pb/src/clients/mapstream.v1.rs +++ b/rust/numaflow-pb/src/clients/mapstream.v1.rs @@ -6,8 +6,8 @@ pub struct MapStreamRequest { #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, #[prost(message, optional, tag = "3")] pub event_time: ::core::option::Option<::prost_types::Timestamp>, #[prost(message, optional, tag = "4")] @@ -35,8 +35,8 @@ pub mod map_stream_response { pub struct Result { #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, #[prost(string, repeated, tag = "3")] pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// Metadata is the metadata of the message diff --git a/rust/numaflow-pb/src/clients/reduce.v1.rs b/rust/numaflow-pb/src/clients/reduce.v1.rs index 477b530594..30d66e9834 100644 --- a/rust/numaflow-pb/src/clients/reduce.v1.rs +++ b/rust/numaflow-pb/src/clients/reduce.v1.rs @@ -65,8 +65,8 @@ pub mod reduce_request { pub struct Payload { #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, #[prost(message, optional, tag = "3")] pub event_time: ::core::option::Option<::prost_types::Timestamp>, #[prost(message, optional, tag = "4")] @@ -111,8 +111,8 @@ pub mod reduce_response { pub struct Result { #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, #[prost(string, repeated, tag = "3")] pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// Metadata is the metadata of the message diff --git a/rust/numaflow-pb/src/clients/serving.v1.rs b/rust/numaflow-pb/src/clients/serving.v1.rs index 962b97fb1d..a342957a3f 100644 --- a/rust/numaflow-pb/src/clients/serving.v1.rs +++ b/rust/numaflow-pb/src/clients/serving.v1.rs @@ -6,8 +6,8 @@ pub struct Payload { #[prost(string, tag = "1")] pub origin: ::prost::alloc::string::String, /// Value is the result of the computation. - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, } /// PutRequest is the request sent to the Store. #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/rust/numaflow-pb/src/clients/sessionreduce.v1.rs b/rust/numaflow-pb/src/clients/sessionreduce.v1.rs index 3d6b025a93..31e5a41176 100644 --- a/rust/numaflow-pb/src/clients/sessionreduce.v1.rs +++ b/rust/numaflow-pb/src/clients/sessionreduce.v1.rs @@ -84,8 +84,8 @@ pub mod session_reduce_request { pub struct Payload { #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, #[prost(message, optional, tag = "3")] pub event_time: ::core::option::Option<::prost_types::Timestamp>, #[prost(message, optional, tag = "4")] @@ -119,8 +119,8 @@ pub mod session_reduce_response { pub struct Result { #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, #[prost(string, repeated, tag = "3")] pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// Metadata is the metadata of the message diff --git a/rust/numaflow-pb/src/clients/sideinput.v1.rs b/rust/numaflow-pb/src/clients/sideinput.v1.rs index d669440075..efc56e9d64 100644 --- a/rust/numaflow-pb/src/clients/sideinput.v1.rs +++ b/rust/numaflow-pb/src/clients/sideinput.v1.rs @@ -5,8 +5,8 @@ #[derive(Clone, PartialEq, ::prost::Message)] pub struct SideInputResponse { /// value represents the latest value of the side input payload - #[prost(bytes = "vec", tag = "1")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "1")] + pub value: ::prost::bytes::Bytes, /// noBroadcast indicates whether the side input value should be broadcasted to all /// True if value should not be broadcasted /// False if value should be broadcasted diff --git a/rust/numaflow-pb/src/clients/sink.v1.rs b/rust/numaflow-pb/src/clients/sink.v1.rs index 931f63ff4e..5b24c7a51e 100644 --- a/rust/numaflow-pb/src/clients/sink.v1.rs +++ b/rust/numaflow-pb/src/clients/sink.v1.rs @@ -21,8 +21,8 @@ pub mod sink_request { pub struct Request { #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, #[prost(message, optional, tag = "3")] pub event_time: ::core::option::Option<::prost_types::Timestamp>, #[prost(message, optional, tag = "4")] @@ -87,8 +87,8 @@ pub mod sink_response { /// err_msg is the error message, set it if success is set to false. #[prost(string, tag = "3")] pub err_msg: ::prost::alloc::string::String, - #[prost(bytes = "vec", optional, tag = "4")] - pub serve_response: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(bytes = "bytes", optional, tag = "4")] + pub serve_response: ::core::option::Option<::prost::bytes::Bytes>, #[prost(message, optional, tag = "5")] pub on_success_msg: ::core::option::Option, } @@ -96,8 +96,8 @@ pub mod sink_response { pub mod result { #[derive(Clone, PartialEq, ::prost::Message)] pub struct Message { - #[prost(bytes = "vec", tag = "1")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "1")] + pub value: ::prost::bytes::Bytes, #[prost(string, repeated, tag = "2")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, #[prost(message, optional, tag = "3")] diff --git a/rust/numaflow-pb/src/clients/source.v1.rs b/rust/numaflow-pb/src/clients/source.v1.rs index 3af65bc1a6..d6c131ee2a 100644 --- a/rust/numaflow-pb/src/clients/source.v1.rs +++ b/rust/numaflow-pb/src/clients/source.v1.rs @@ -47,8 +47,8 @@ pub mod read_response { #[derive(Clone, PartialEq, ::prost::Message)] pub struct Result { /// Required field holding the payload of the datum. - #[prost(bytes = "vec", tag = "1")] - pub payload: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "1")] + pub payload: ::prost::bytes::Bytes, /// Required field indicating the offset information of the datum. #[prost(message, optional, tag = "2")] pub offset: ::core::option::Option, @@ -285,8 +285,8 @@ pub struct Offset { /// We define Offset as a byte array because different input data sources can have different representations for Offset. /// The only way to generalize it is to define it as a byte array, /// Such that we can let the UDSource to de-serialize the offset using its own interpretation logics. - #[prost(bytes = "vec", tag = "1")] - pub offset: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "1")] + pub offset: ::prost::bytes::Bytes, /// Optional partition_id indicates which partition of the source the datum belongs to. /// It is useful for sources that have multiple partitions. e.g. Kafka. /// If the partition_id is not specified, it is assumed that the source has a single partition. diff --git a/rust/numaflow-pb/src/clients/sourcetransformer.v1.rs b/rust/numaflow-pb/src/clients/sourcetransformer.v1.rs index c7fb6243cc..a983d17419 100644 --- a/rust/numaflow-pb/src/clients/sourcetransformer.v1.rs +++ b/rust/numaflow-pb/src/clients/sourcetransformer.v1.rs @@ -22,8 +22,8 @@ pub mod source_transform_request { pub struct Request { #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, #[prost(message, optional, tag = "3")] pub event_time: ::core::option::Option<::prost_types::Timestamp>, #[prost(message, optional, tag = "4")] @@ -61,8 +61,8 @@ pub mod source_transform_response { pub struct Result { #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(bytes = "vec", tag = "2")] - pub value: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "2")] + pub value: ::prost::bytes::Bytes, #[prost(message, optional, tag = "3")] pub event_time: ::core::option::Option<::prost_types::Timestamp>, #[prost(string, repeated, tag = "4")] diff --git a/rust/numaflow-pb/src/common/metadata.rs b/rust/numaflow-pb/src/common/metadata.rs index 3d8beccc4a..88ca618342 100644 --- a/rust/numaflow-pb/src/common/metadata.rs +++ b/rust/numaflow-pb/src/common/metadata.rs @@ -19,5 +19,5 @@ pub struct Metadata { pub struct KeyValueGroup { #[prost(map = "string, bytes", tag = "1")] pub key_value: - ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::vec::Vec>, + ::std::collections::HashMap<::prost::alloc::string::String, ::prost::bytes::Bytes>, } diff --git a/rust/numaflow-pb/src/main.rs b/rust/numaflow-pb/src/main.rs index 550ee3d2f9..4597d55b18 100644 --- a/rust/numaflow-pb/src/main.rs +++ b/rust/numaflow-pb/src/main.rs @@ -13,8 +13,12 @@ fn main() { } fn build_common() { - prost_build::Config::new() + let mut config = prost_build::Config::new(); + config .out_dir("src/common") + // Use bytes::Bytes instead of Vec for bytes fields to avoid copying + .bytes(&[".metadata"]); + config .compile_protos(&["proto/metadata.proto"], &["proto"]) .expect("failed to compile common protos"); } @@ -25,6 +29,17 @@ fn build_client() { .build_server(false) .out_dir("src/clients") .extern_path(".metadata", "crate::common::metadata") + // Use bytes::Bytes instead of Vec for bytes fields to avoid copying + .bytes(".sink.v1") + .bytes(".source.v1") + .bytes(".sourcetransformer.v1") + .bytes(".map.v1") + .bytes(".mapstream.v1") + .bytes(".reduce.v1") + .bytes(".sessionreduce.v1") + .bytes(".sideinput.v1") + .bytes(".serving.v1") + .bytes(".accumulator.v1") .compile_protos( &[ "proto/source/v1/source.proto", @@ -61,9 +76,13 @@ fn build_server() { } fn build_objects() { - prost_build::Config::new() + let mut config = prost_build::Config::new(); + config .out_dir("src/objects") .extern_path(".metadata", "crate::common::metadata") + // Use bytes::Bytes instead of Vec for bytes fields to avoid copying + .bytes(&[".isb", ".wal"]); + config .compile_protos( &[ "proto/isb/message.proto", diff --git a/rust/numaflow-pb/src/objects/isb.rs b/rust/numaflow-pb/src/objects/isb.rs index 738b423ea0..616f10ee26 100644 --- a/rust/numaflow-pb/src/objects/isb.rs +++ b/rust/numaflow-pb/src/objects/isb.rs @@ -58,8 +58,8 @@ pub struct MessageId { #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct Body { /// Payload is the actual data of the message - #[prost(bytes = "vec", tag = "1")] - pub payload: ::prost::alloc::vec::Vec, + #[prost(bytes = "bytes", tag = "1")] + pub payload: ::prost::bytes::Bytes, } /// Message is inter step message #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/rust/serving/src/app/store/datastore/user_defined.rs b/rust/serving/src/app/store/datastore/user_defined.rs index 9bcda2b69c..c8d8e50cf0 100644 --- a/rust/serving/src/app/store/datastore/user_defined.rs +++ b/rust/serving/src/app/store/datastore/user_defined.rs @@ -46,7 +46,7 @@ impl DataStore for UserDefinedStore { .await .map_err(|e| StoreError::StoreRead(format!("gRPC Get request failed: {e:?}")))?; let payloads = response.into_inner().payloads; - Ok(payloads.iter().map(|p| p.value.clone()).collect()) + Ok(payloads.iter().map(|p| p.value.to_vec()).collect()) } async fn stream_data( From 730385d39fcd0b8dee3d34c78f8f5bef147d193e Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Wed, 4 Feb 2026 19:31:31 +0530 Subject: [PATCH 3/4] Fix formatting Signed-off-by: Sreekanth --- rust/numaflow-core/src/sinker/sink.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/numaflow-core/src/sinker/sink.rs b/rust/numaflow-core/src/sinker/sink.rs index b30d41e53c..65abfda19f 100644 --- a/rust/numaflow-core/src/sinker/sink.rs +++ b/rust/numaflow-core/src/sinker/sink.rs @@ -1,4 +1,3 @@ -use bytes::Bytes; use crate::Result; use crate::config::pipeline::VERTEX_TYPE_SINK; use crate::config::{get_vertex_name, is_mono_vertex}; @@ -9,6 +8,7 @@ use crate::metrics::{ pipeline_drop_metric_labels, pipeline_metric_labels, pipeline_metrics, }; use crate::sinker::actor::{SinkActorMessage, SinkActorResponse}; +use bytes::Bytes; use numaflow_kafka::sink::KafkaSink; use numaflow_pb::clients::sink::Status::{Failure, Fallback, OnSuccess, Serve, Success}; use numaflow_pb::clients::sink::sink_client::SinkClient; From 67f95f44cd683450f35c9db9d91d906dcfa511e7 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Wed, 4 Feb 2026 20:36:58 +0530 Subject: [PATCH 4/4] Avoid spawns - single task concurrency Signed-off-by: Sreekanth --- rust/numaflow-core/src/transformer.rs | 83 +++++++++++++-------------- 1 file changed, 39 insertions(+), 44 deletions(-) diff --git a/rust/numaflow-core/src/transformer.rs b/rust/numaflow-core/src/transformer.rs index 490b0e3997..d51f0940ce 100644 --- a/rust/numaflow-core/src/transformer.rs +++ b/rust/numaflow-core/src/transformer.rs @@ -1,9 +1,9 @@ use bytes::Bytes; +use futures::stream::{self, StreamExt}; use numaflow_monitor::runtime; use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient; -use std::sync::Arc; use std::time::Duration; -use tokio::sync::{Semaphore, mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tonic::transport::Channel; use tonic::{Code, Status}; @@ -159,7 +159,7 @@ impl Transformer { let batch_start_time = tokio::time::Instant::now(); let transform_handle = self.sender.clone(); let tracker = self.tracker.clone(); - let semaphore = Arc::new(Semaphore::new(self.concurrency)); + let concurrency = self.concurrency; let mut labels = pipeline_metric_labels(VERTEX_TYPE_SOURCE).clone(); labels.push(( PIPELINE_PARTITION_NAME_LABEL.to_string(), @@ -197,49 +197,43 @@ impl Transformer { let message_count = messages.len(); - let tasks: Vec<_> = messages - .into_iter() - .map(|read_msg| { - let permit_fut = Arc::clone(&semaphore).acquire_owned(); - let transform_handle = transform_handle.clone(); - let tracker = tracker.clone(); - let hard_shutdown_token = hard_shutdown_token.clone(); - - tokio::spawn(async move { - let permit = permit_fut.await.map_err(|e| { - Error::Transformer(format!("failed to acquire semaphore: {e}")) - })?; - let _permit = permit; - - let offset = read_msg.offset.clone(); - let transformed_messages = Transformer::transform( - transform_handle, - read_msg, - hard_shutdown_token.clone(), + // Create futures for each message transformation (not spawned, just lazy futures) + let transform_futs = messages.into_iter().map(|read_msg| { + let transform_handle = transform_handle.clone(); + let tracker = tracker.clone(); + let hard_shutdown_token = hard_shutdown_token.clone(); + + async move { + let offset = read_msg.offset.clone(); + let transformed_messages = + Transformer::transform(transform_handle, read_msg, hard_shutdown_token).await?; + + // update the tracker with the number of responses for each message + tracker + .serving_update( + &offset, + transformed_messages + .iter() + .map(|m| m.tags.clone()) + .collect(), ) .await?; - // update the tracker with the number of responses for each message - tracker - .serving_update( - &offset, - transformed_messages - .iter() - .map(|m| m.tags.clone()) - .collect(), - ) - .await?; - - Ok::, Error>(transformed_messages) - }) - }) - .collect(); - - let mut transformed_messages = Vec::with_capacity(message_count); - for task in tasks { - match task.await { - Ok(Ok(mut msgs)) => transformed_messages.append(&mut msgs), - Ok(Err(e)) => { + Ok::, Error>(transformed_messages) + } + }); + + // Use buffer_unordered to limit concurrency without spawning tasks. + // This polls up to `concurrency` futures at a time, reducing scheduling overhead. + let mut stream = stream::iter(transform_futs).buffer_unordered(concurrency); + + // Pre-size with 2x capacity to reduce reallocation for transformer fan-out + let mut transformed_messages = Vec::with_capacity(message_count * 2); + + while let Some(result) = stream.next().await { + match result { + Ok(mut msgs) => transformed_messages.append(&mut msgs), + Err(e) => { // increment transform error metric for pipeline // error here indicates that there was some problem in transformation if !is_mono_vertex() { @@ -249,9 +243,9 @@ impl Transformer { .get_or_create(&labels) .inc(); } + // Early exit - remaining futures are dropped when stream goes out of scope return Err(e); } - Err(e) => return Err(Error::Transformer(format!("task join failed: {e}"))), } } // batch transformation was successful @@ -332,6 +326,7 @@ mod tests { use numaflow::shared::ServerExtras; use numaflow::sourcetransform; use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient; + use std::sync::Arc; use std::time::Duration; use tempfile::TempDir; use tokio::sync::oneshot;