Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/mapper/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl From<Message> for MapRequest {
Self {
request: Some(map::map_request::Request {
keys: message.keys.to_vec(),
value: message.value.to_vec(),
value: message.value.clone(),
event_time: Some(prost_timestamp_from_utc(message.event_time)),
watermark: message.watermark.map(prost_timestamp_from_utc),
headers: Arc::unwrap_or_clone(message.headers),
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl TryFrom<Message> for BytesMut {
}),
}),
body: Some(numaflow_pb::objects::isb::Body {
payload: message.value.to_vec(),
payload: message.value.clone(),
}),
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl From<Message> for reduce_request::Payload {
fn from(msg: Message) -> Self {
Self {
keys: msg.keys.to_vec(),
value: msg.value.to_vec(),
value: msg.value.clone(),
event_time: Some(prost_timestamp_from_utc(msg.event_time)),
watermark: msg.watermark.map(prost_timestamp_from_utc),
headers: Arc::unwrap_or_clone(msg.headers),
Expand All @@ -31,7 +31,7 @@ impl From<&Message> for reduce_request::Payload {
fn from(msg: &Message) -> Self {
Self {
keys: msg.keys.to_vec(),
value: msg.value.to_vec(),
value: msg.value.clone(),
event_time: Some(prost_timestamp_from_utc(msg.event_time)),
watermark: msg.watermark.map(prost_timestamp_from_utc),
headers: (*msg.headers).clone(),
Expand Down Expand Up @@ -78,7 +78,7 @@ impl From<AlignedWindowMessage> for ReduceRequest {
ReduceRequest {
payload: Some(reduce_request::Payload {
keys: vec![],
value: vec![],
value: bytes::Bytes::new(),
event_time: None,
watermark: None,
headers: Default::default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl From<Message> for accumulator::Payload {
fn from(msg: Message) -> Self {
Self {
keys: msg.keys.to_vec(),
value: msg.value.to_vec(),
value: msg.value.clone(),
event_time: Some(prost_timestamp_from_utc(msg.event_time)),
watermark: msg.watermark.map(prost_timestamp_from_utc),
id: msg.id.to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl From<Message> for session_reduce_request::Payload {
fn from(msg: Message) -> Self {
Self {
keys: msg.keys.to_vec(),
value: msg.value.to_vec(),
value: msg.value.clone(),
event_time: Some(prost_timestamp_from_utc(msg.event_time)),
watermark: msg.watermark.map(prost_timestamp_from_utc),
headers: Arc::unwrap_or_clone(msg.headers),
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/reduce/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl TryFrom<WalMessage> for Bytes {
metadata: message.metadata.map(|m| Arc::unwrap_or_clone(m).into()),
}),
body: Some(numaflow_pb::objects::isb::Body {
payload: message.value.to_vec(),
payload: message.value.clone(),
}),
}),
read_offset: int_offset.offset,
Expand Down
49 changes: 27 additions & 22 deletions rust/numaflow-core/src/sinker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ where
}

// State to accumulate outcomes across retries
let mut messages_to_retry = messages.clone();
// Take ownership directly instead of cloning
let mut messages_to_retry = messages;
let mut fallback_messages = Vec::new();
let mut serving_messages = Vec::new();
let mut dropped_messages = Vec::new();
Expand All @@ -84,7 +85,7 @@ where
let mut error_map = HashMap::new();

loop {
// send batch to sink
// send batch to sink (clone needed since we need messages for classification)
let responses = self.sink.sink(messages_to_retry.clone()).await?;

// Create a map of id to result
Expand All @@ -93,35 +94,40 @@ where
.map(|resp| (resp.id, resp.status))
.collect::<HashMap<_, _>>();

// Classify messages based on responses
let mut failed_ids = Vec::new();

messages_to_retry.retain_mut(|msg| {
match result_map.remove(&msg.id.to_string()) {
// Classify messages based on responses using index-based iteration
// with swap_remove to avoid cloning when moving messages to result vectors
let mut i = 0;
while i < messages_to_retry.len() {
let msg_id = messages_to_retry[i].id.to_string();
match result_map.remove(&msg_id) {
Some(ResponseStatusFromSink::Success) => {
false // remove from retry list
// Remove from retry list, don't need the message anymore
messages_to_retry.swap_remove(i);
// Don't increment i, the swapped element needs to be checked
}
Some(ResponseStatusFromSink::Failed(err_msg)) => {
failed_ids.push(msg.id.to_string());
*error_map.entry(err_msg.clone()).or_insert(0) += 1;
true // keep for retry
*error_map.entry(err_msg).or_insert(0) += 1;
i += 1; // keep for retry
}
Some(ResponseStatusFromSink::Fallback) => {
fallback_messages.push(msg.clone());
false // remove from retry list
// Move message to fallback without cloning
fallback_messages.push(messages_to_retry.swap_remove(i));
}
Some(ResponseStatusFromSink::Serve(serve_response)) => {
// Move message to serving without cloning
let mut msg = messages_to_retry.swap_remove(i);
if let Some(serve_response) = serve_response {
msg.value = serve_response.into();
}
serving_messages.push(msg.clone());
false // remove from retry list
serving_messages.push(msg);
}
Some(ResponseStatusFromSink::OnSuccess(on_success_msg)) => {
// Move message to on_success without cloning
let msg = messages_to_retry.swap_remove(i);
if let Some(on_success_msg) = on_success_msg {
let on_success_md: Option<Metadata> =
on_success_msg.metadata.map(|md| md.into());
let new_md = match &mut msg.metadata {
let new_md = match &msg.metadata {
// Following clones are required explicitly since Arc doesn't allow
// interior mutability, so we cannot move the required fields out of Arc
// without cloning, unless we can guarantee there is only a single reference to Arc
Expand All @@ -144,18 +150,17 @@ where
value: on_success_msg.value.into(),
keys: on_success_msg.keys.into(),
metadata: Some(Arc::new(new_md)),
..msg.clone()
..msg
};
on_success_messages.push(new_msg.clone());
on_success_messages.push(new_msg);
} else {
// Send the original message if no payload was provided to the onSuccess sink
on_success_messages.push(msg.clone());
on_success_messages.push(msg);
}
false // remove from retry list
}
None => unreachable!("should have response for all messages"), // remove if no response
None => unreachable!("should have response for all messages"),
}
});
}

if messages_to_retry.is_empty() {
// success path, all messages processed
Expand Down
3 changes: 2 additions & 1 deletion rust/numaflow-core/src/sinker/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::metrics::{
pipeline_drop_metric_labels, pipeline_metric_labels, pipeline_metrics,
};
use crate::sinker::actor::{SinkActorMessage, SinkActorResponse};
use bytes::Bytes;
use numaflow_kafka::sink::KafkaSink;
use numaflow_pb::clients::sink::Status::{Failure, Fallback, OnSuccess, Serve, Success};
use numaflow_pb::clients::sink::sink_client::SinkClient;
Expand Down Expand Up @@ -686,7 +687,7 @@ pub(crate) enum ResponseStatusFromSink {
/// Write to FallBack Sink.
Fallback,
/// Write to serving store.
Serve(Option<Vec<u8>>),
Serve(Option<Bytes>),
OnSuccess(Option<sink_response::result::Message>),
}

Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/sinker/sink/serve/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl UserDefinedStore {
id: payload.id,
payloads: vec![Payload {
origin: origin.clone(),
value: payload.value.to_vec(),
value: payload.value.clone(),
}],
};
client.put(request).await.map_err(|e| {
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/sinker/sink/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl From<Message> for SinkRequest {
Self {
request: Some(numaflow_pb::clients::sink::sink_request::Request {
keys: message.keys.to_vec(),
value: message.value.to_vec(),
value: message.value.clone(),
event_time: Some(prost_timestamp_from_utc(message.event_time)),
watermark: message.watermark.map(prost_timestamp_from_utc),
id: message.id.to_string(),
Expand Down
3 changes: 2 additions & 1 deletion rust/numaflow-core/src/source/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ impl TryFrom<Offset> for source::Offset {
}) => Ok(source::Offset {
offset: BASE64_STANDARD
.decode(offset)
.expect("we control the encoding, so this should never fail"),
.expect("we control the encoding, so this should never fail")
.into(),
partition_id: partition_idx as i32,
}),
Offset::Int(_) => Err(Error::Source("IntOffset not supported".to_string())),
Expand Down
84 changes: 41 additions & 43 deletions rust/numaflow-core/src/transformer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use bytes::Bytes;
use futures::stream::{self, StreamExt};
use numaflow_monitor::runtime;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Semaphore, mpsc, oneshot};
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;
use tonic::{Code, Status};
Expand Down Expand Up @@ -159,7 +159,7 @@ impl Transformer {
let batch_start_time = tokio::time::Instant::now();
let transform_handle = self.sender.clone();
let tracker = self.tracker.clone();
let semaphore = Arc::new(Semaphore::new(self.concurrency));
let concurrency = self.concurrency;
let mut labels = pipeline_metric_labels(VERTEX_TYPE_SOURCE).clone();
labels.push((
PIPELINE_PARTITION_NAME_LABEL.to_string(),
Expand Down Expand Up @@ -195,48 +195,45 @@ impl Transformer {
.inc_by(messages.len() as u64);
}

let tasks: Vec<_> = messages
.into_iter()
.map(|read_msg| {
let permit_fut = Arc::clone(&semaphore).acquire_owned();
let transform_handle = transform_handle.clone();
let tracker = tracker.clone();
let hard_shutdown_token = hard_shutdown_token.clone();

tokio::spawn(async move {
let permit = permit_fut.await.map_err(|e| {
Error::Transformer(format!("failed to acquire semaphore: {e}"))
})?;
let _permit = permit;

let transformed_messages = Transformer::transform(
transform_handle,
read_msg.clone(),
hard_shutdown_token.clone(),
let message_count = messages.len();

// Create futures for each message transformation (not spawned, just lazy futures)
let transform_futs = messages.into_iter().map(|read_msg| {
let transform_handle = transform_handle.clone();
let tracker = tracker.clone();
let hard_shutdown_token = hard_shutdown_token.clone();

async move {
let offset = read_msg.offset.clone();
let transformed_messages =
Transformer::transform(transform_handle, read_msg, hard_shutdown_token).await?;

// update the tracker with the number of responses for each message
tracker
.serving_update(
&offset,
transformed_messages
.iter()
.map(|m| m.tags.clone())
.collect(),
)
.await?;

// update the tracker with the number of responses for each message
tracker
.serving_update(
&read_msg.offset,
transformed_messages
.iter()
.map(|m| m.tags.clone())
.collect(),
)
.await?;

Ok::<Vec<Message>, Error>(transformed_messages)
})
})
.collect();

let mut transformed_messages = Vec::new();
for task in tasks {
match task.await {
Ok(Ok(mut msgs)) => transformed_messages.append(&mut msgs),
Ok(Err(e)) => {
Ok::<Vec<Message>, Error>(transformed_messages)
}
});

// Use buffer_unordered to limit concurrency without spawning tasks.
// This polls up to `concurrency` futures at a time, reducing scheduling overhead.
let mut stream = stream::iter(transform_futs).buffer_unordered(concurrency);

// Pre-size with 2x capacity to reduce reallocation for transformer fan-out
let mut transformed_messages = Vec::with_capacity(message_count * 2);

while let Some(result) = stream.next().await {
match result {
Ok(mut msgs) => transformed_messages.append(&mut msgs),
Err(e) => {
// increment transform error metric for pipeline
// error here indicates that there was some problem in transformation
if !is_mono_vertex() {
Expand All @@ -246,9 +243,9 @@ impl Transformer {
.get_or_create(&labels)
.inc();
}
// Early exit - remaining futures are dropped when stream goes out of scope
return Err(e);
}
Err(e) => return Err(Error::Transformer(format!("task join failed: {e}"))),
}
}
// batch transformation was successful
Expand Down Expand Up @@ -329,6 +326,7 @@ mod tests {
use numaflow::shared::ServerExtras;
use numaflow::sourcetransform;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use tokio::sync::oneshot;
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/transformer/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl From<Message> for SourceTransformRequest {
request: Some(sourcetransformer::source_transform_request::Request {
id: message.offset.to_string(),
keys: message.keys.to_vec(),
value: message.value.to_vec(),
value: message.value.clone(),
event_time: Some(prost_timestamp_from_utc(message.event_time)),
watermark: message.watermark.map(prost_timestamp_from_utc),
headers: Arc::unwrap_or_clone(message.headers),
Expand Down
4 changes: 2 additions & 2 deletions rust/numaflow-pb/src/clients/accumulator.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
pub struct Payload {
#[prost(string, repeated, tag = "1")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(bytes = "vec", tag = "2")]
pub value: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "bytes", tag = "2")]
pub value: ::prost::bytes::Bytes,
#[prost(message, optional, tag = "3")]
pub event_time: ::core::option::Option<::prost_types::Timestamp>,
#[prost(message, optional, tag = "4")]
Expand Down
8 changes: 4 additions & 4 deletions rust/numaflow-pb/src/clients/map.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub mod map_request {
pub struct Request {
#[prost(string, repeated, tag = "1")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(bytes = "vec", tag = "2")]
pub value: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "bytes", tag = "2")]
pub value: ::prost::bytes::Bytes,
#[prost(message, optional, tag = "3")]
pub event_time: ::core::option::Option<::prost_types::Timestamp>,
#[prost(message, optional, tag = "4")]
Expand Down Expand Up @@ -70,8 +70,8 @@ pub mod map_response {
pub struct Result {
#[prost(string, repeated, tag = "1")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(bytes = "vec", tag = "2")]
pub value: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "bytes", tag = "2")]
pub value: ::prost::bytes::Bytes,
#[prost(string, repeated, tag = "3")]
pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
/// Metadata is the metadata of the message
Expand Down
Loading
Loading