From 264d3bdce8c34a7906fe76e29bbc83f371ad5238 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Mon, 22 Sep 2025 17:18:37 +0530 Subject: [PATCH 1/5] feat: nack support for udsource Signed-off-by: Yashash H L --- .../examples/source/simple/SimpleSource.java | 6 +++ .../numaflow/accumulator/Service.java | 11 +--- .../numaflow/batchmapper/Service.java | 11 +--- .../numaflow/mapper/MapSupervisorActor.java | 8 +-- .../mapstreamer/MapStreamSupervisorActor.java | 10 ++-- .../io/numaproj/numaflow/reducer/Service.java | 11 ++-- .../numaflow/reducestreamer/Service.java | 12 ++--- .../numaflow/sessionreducer/Service.java | 12 ++--- .../numaflow/shared/ExceptionUtils.java | 21 ++++++++ .../io/numaproj/numaflow/sinker/Service.java | 9 +--- .../numaflow/sourcer/NackRequest.java | 14 +++++ .../numaflow/sourcer/NackRequestImpl.java | 18 +++++++ .../io/numaproj/numaflow/sourcer/Service.java | 51 +++++++++++++------ .../io/numaproj/numaflow/sourcer/Sourcer.java | 9 +++- .../TransformSupervisorActor.java | 8 +-- src/main/proto/source/v1/source.proto | 24 ++++++++- .../numaflow/mapstreamer/ServiceErrTest.java | 4 +- .../numaflow/sourcer/ServerErrTest.java | 5 ++ .../numaproj/numaflow/sourcer/ServerTest.java | 37 +++++++++++++- 19 files changed, 188 insertions(+), 93 deletions(-) create mode 100644 src/main/java/io/numaproj/numaflow/sourcer/NackRequest.java create mode 100644 src/main/java/io/numaproj/numaflow/sourcer/NackRequestImpl.java diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java index faf1003b..56ac2006 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java @@ -3,6 +3,7 @@ import com.google.common.primitives.Longs; import io.numaproj.numaflow.sourcer.AckRequest; import io.numaproj.numaflow.sourcer.Message; +import io.numaproj.numaflow.sourcer.NackRequest; import io.numaproj.numaflow.sourcer.Offset; import io.numaproj.numaflow.sourcer.OutputObserver; import io.numaproj.numaflow.sourcer.ReadRequest; @@ -79,6 +80,11 @@ public void ack(AckRequest request) { } } + @Override + public void nack(NackRequest request) { + + } + @Override public long getPending() { // number of messages not acknowledged yet diff --git a/src/main/java/io/numaproj/numaflow/accumulator/Service.java b/src/main/java/io/numaproj/numaflow/accumulator/Service.java index 0fb04008..5d33e8e6 100644 --- a/src/main/java/io/numaproj/numaflow/accumulator/Service.java +++ b/src/main/java/io/numaproj/numaflow/accumulator/Service.java @@ -38,16 +38,9 @@ static void handleFailure( try { failureFuture.get(); } catch (Exception e) { - e.printStackTrace(); + log.error("Exception occurred while performing accumulator operation", e); // Build gRPC Status - com.google.rpc.Status status = com.google.rpc.Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage( - ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : "")) - .addDetails(Any.pack(DebugInfo.newBuilder() - .setDetail(ExceptionUtils.getStackTrace(e)) - .build())) - .build(); + com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } }).start(); diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java index 4be3b812..98e2f792 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java @@ -103,16 +103,9 @@ public void onNext(MapOuterClass.MapRequest mapRequest) { } } catch (Exception e) { log.error("Encountered an error in batch map onNext", e); - shutdownSignal.completeExceptionally(e); - // Build gRPC Status - com.google.rpc.Status status = com.google.rpc.Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : "")) - .addDetails(Any.pack(DebugInfo.newBuilder() - .setDetail(ExceptionUtils.getStackTrace(e)) - .build())) - .build(); + com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); responseObserver.onError(StatusProto.toStatusRuntimeException(status)); + shutdownSignal.completeExceptionally(e); } } diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java index e6fe2109..05b4e312 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java @@ -112,13 +112,7 @@ private void handleFailure(Exception e) { // only send the very first exception to the client // one exception should trigger a container restart // Build gRPC Status - com.google.rpc.Status status = com.google.rpc.Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : "")) - .addDetails(Any.pack(DebugInfo.newBuilder() - .setDetail(ExceptionUtils.getStackTrace(e)) - .build())) - .build(); + com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } activeMapperCount--; diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java b/src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java index f0aa83b7..39409ddf 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java @@ -8,8 +8,10 @@ import akka.actor.SupervisorStrategy; import akka.japi.pf.DeciderBuilder; import io.grpc.Status; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.ExceptionUtils; import lombok.extern.slf4j.Slf4j; import java.util.Optional; @@ -110,13 +112,11 @@ public Receive createReceive() { } private void handleFailure(Exception e) { - getContext().getSystem().log().error("Encountered error in mapStreamFn", e); + getContext().getSystem().log().error("Encountered error in mapStreamFn {}", e); if (userException == null) { userException = e; - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asException()); + com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } activeMapStreamersCount--; } diff --git a/src/main/java/io/numaproj/numaflow/reducer/Service.java b/src/main/java/io/numaproj/numaflow/reducer/Service.java index 2da2625a..4845a45e 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Service.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Service.java @@ -41,14 +41,9 @@ static void handleFailure( try { failureFuture.get(); } catch (Exception e) { - e.printStackTrace(); - com.google.rpc.Status status = com.google.rpc.Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : "")) - .addDetails(Any.pack(DebugInfo.newBuilder() - .setDetail(ExceptionUtils.getStackTrace(e)) - .build())) - .build(); + log.error("Exception occurred while performing reduce operation", e); + // Build gRPC Status + com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } }).start(); diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java index 5647c721..f5ae4721 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java @@ -42,15 +42,9 @@ static void handleFailure( try { failureFuture.get(); } catch (Exception e) { - e.printStackTrace(); - com.google.rpc.Status status = com.google.rpc.Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage( - ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : "")) - .addDetails(Any.pack(DebugInfo.newBuilder() - .setDetail(ExceptionUtils.getStackTrace(e)) - .build())) - .build(); + log.error("Exception occurred while performing reduce streaming operation", e); + // Build gRPC Status + com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } }).start(); diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/Service.java b/src/main/java/io/numaproj/numaflow/sessionreducer/Service.java index 30f3b0d7..b42febf2 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/Service.java @@ -42,15 +42,9 @@ static void handleFailure( try { failureFuture.get(); } catch (Exception e) { - e.printStackTrace(); - com.google.rpc.Status status = com.google.rpc.Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage( - ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : "")) - .addDetails(Any.pack(DebugInfo.newBuilder() - .setDetail(ExceptionUtils.getStackTrace(e)) - .build())) - .build(); + log.error("Exception occurred while performing session reduce operation", e); + // Build gRPC Status + com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } }).start(); diff --git a/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java b/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java index 0fec064c..4ef8bba3 100644 --- a/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java +++ b/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java @@ -1,5 +1,10 @@ package io.numaproj.numaflow.shared; +import com.google.protobuf.Any; +import com.google.rpc.Code; +import com.google.rpc.DebugInfo; +import io.grpc.Status; + import java.io.PrintWriter; import java.io.StringWriter; import java.util.Objects; @@ -41,4 +46,20 @@ public static String getExceptionErrorString() { + Objects.requireNonNullElse(CONTAINER_NAME, "unknown-container") + ")"; } + + /** + * Builds rpc status from the user's exception. + * + * @param exception encountered in user's code. + * @return the status constructed using the exception. + */ + public static com.google.rpc.Status buildStatusFromUserException(Exception exception) { + return com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (exception.getMessage() != null ? exception.getMessage() : "")) + .addDetails(Any.pack(DebugInfo.newBuilder() + .setDetail(ExceptionUtils.getStackTrace(exception)) + .build())) + .build(); + } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 2a79c941..e4cea580 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -107,14 +107,7 @@ public void onNext(SinkOuterClass.SinkRequest request) { } catch (Exception e) { log.error("Encountered error in sinkFn onNext", e); // Build gRPC Status - com.google.rpc.Status status = com.google.rpc.Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage(ExceptionUtils.getExceptionErrorString() + ": " - + (e.getMessage() != null ? e.getMessage() : "")) - .addDetails(Any.pack(DebugInfo.newBuilder() - .setDetail(ExceptionUtils.getStackTrace(e)) - .build())) - .build(); + com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); responseObserver.onError(StatusProto.toStatusRuntimeException(status)); shutdownSignal.completeExceptionally(e); } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/NackRequest.java b/src/main/java/io/numaproj/numaflow/sourcer/NackRequest.java new file mode 100644 index 00000000..a7c23fd0 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/sourcer/NackRequest.java @@ -0,0 +1,14 @@ +package io.numaproj.numaflow.sourcer; + + +import java.util.List; + +/** + * NackRequest request for negatively acknowledging messages. + */ +public interface NackRequest { + /** + * @return the list of offsets to be negatively acknowledged. + */ + List getOffsets(); +} diff --git a/src/main/java/io/numaproj/numaflow/sourcer/NackRequestImpl.java b/src/main/java/io/numaproj/numaflow/sourcer/NackRequestImpl.java new file mode 100644 index 00000000..29a423cd --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/sourcer/NackRequestImpl.java @@ -0,0 +1,18 @@ +package io.numaproj.numaflow.sourcer; + +import lombok.AllArgsConstructor; + +import java.util.List; + +/** + * NackRequestImpl is the implementation of NackRequest. + */ +@AllArgsConstructor +class NackRequestImpl implements NackRequest { + private final List offsets; + + @Override + public List getOffsets() { + return this.offsets; + } +} diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Service.java b/src/main/java/io/numaproj/numaflow/sourcer/Service.java index 48e8d300..9627d75f 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Service.java @@ -1,9 +1,6 @@ package io.numaproj.numaflow.sourcer; -import com.google.protobuf.Any; import com.google.protobuf.Empty; -import com.google.rpc.Code; -import com.google.rpc.DebugInfo; import io.grpc.Status; import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; @@ -18,6 +15,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import static io.numaproj.numaflow.source.v1.SourceGrpc.getNackFnMethod; import static io.numaproj.numaflow.source.v1.SourceGrpc.getPendingFnMethod; /** @@ -84,16 +82,10 @@ public void onNext(SourceOuterClass.ReadRequest request) { responseObserver.onNext(response); } catch (Exception e) { log.error("Encountered error in readFn onNext", e); - shutdownSignal.completeExceptionally(e); // Build gRPC Status - com.google.rpc.Status status = com.google.rpc.Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : "")) - .addDetails(Any.pack(DebugInfo.newBuilder() - .setDetail(ExceptionUtils.getStackTrace(e)) - .build())) - .build(); + com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); responseObserver.onError(StatusProto.toStatusRuntimeException(status)); + shutdownSignal.completeExceptionally(e); } } @@ -164,22 +156,20 @@ public void onNext(SourceOuterClass.AckRequest request) { responseObserver.onNext(response); } catch (Exception e) { log.error("Encountered error in ackFn onNext", e); + com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); shutdownSignal.completeExceptionally(e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asException()); } } @Override public void onError(Throwable t) { log.error("Encountered error in ackFn onNext", t); - shutdownSignal.completeExceptionally(t); responseObserver.onError(Status.INTERNAL .withDescription(t.getMessage()) .withCause(t) .asException()); + shutdownSignal.completeExceptionally(t); } @Override @@ -189,6 +179,35 @@ public void onCompleted() { }; } + @Override + public void nackFn( + SourceOuterClass.NackRequest nackRequest, + StreamObserver responseObserver + ) { + if (this.sourcer == null) { + io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall( + getNackFnMethod(), + responseObserver); + return; + } + + try { + List offsets = new ArrayList<>(nackRequest.getRequest().getOffsetsCount()); + for (SourceOuterClass.Offset offset : nackRequest.getRequest().getOffsetsList()) { + offsets.add(new Offset( + offset.getOffset().toByteArray(), + offset.getPartitionId())); + } + NackRequestImpl nackRequestImpl = new NackRequestImpl(offsets); + this.sourcer.nack(nackRequestImpl); + } catch (Exception e) { + log.error("Encountered error in nackFn", e); + com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); + shutdownSignal.completeExceptionally(e); + } + } + /** * pendingFn is the endpoint for getting the number of pending messages from the sourcer. * diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java b/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java index 42791547..3cf58307 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java @@ -35,10 +35,17 @@ public static List defaultPartitions() { /** * method will be used for acknowledging messages from source. * - * @param request the request contains the offset to be acknowledged + * @param request the request contains the offsets to be acknowledged */ public abstract void ack(AckRequest request); + /** + * method will be used for negatively acknowledging messages from source. + * + * @param request the request contains the offsets to be negatively acknowledged. + */ + public abstract void nack(NackRequest request); + /** * method will be used for getting the number of pending messages from source. * when the return value is negative, it indicates the pending information is not available. diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java index 3c8a1cfe..fac41fb0 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java @@ -151,13 +151,7 @@ private void handleFailure(Exception e) { // one exception should trigger a container restart // Build gRPC Status - com.google.rpc.Status status = com.google.rpc.Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : "")) - .addDetails(Any.pack(DebugInfo.newBuilder() - .setDetail(ExceptionUtils.getStackTrace(e)) - .build())) - .build(); + com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } activeTransformersCount--; diff --git a/src/main/proto/source/v1/source.proto b/src/main/proto/source/v1/source.proto index 3cb4ce8c..10a7e080 100644 --- a/src/main/proto/source/v1/source.proto +++ b/src/main/proto/source/v1/source.proto @@ -23,6 +23,10 @@ service Source { // Clients sends n requests and expects n responses. rpc AckFn(stream AckRequest) returns (stream AckResponse); + // NackFn negatively acknowledges a batch of offsets. Invoked during a critical error in the mono vertex or pipeline. + // Unlike AckFn its not a streaming rpc because this is only invoked when there is a critical error (error path). + rpc NackFn(NackRequest) returns (NackResponse); + // PendingFn returns the number of pending records at the user defined source. rpc PendingFn(google.protobuf.Empty) returns (PendingResponse); @@ -112,7 +116,7 @@ message ReadResponse { */ message AckRequest { message Request { - // Required field holding the offsets to be acked + // Required field holding the offset to be acked repeated Offset offsets = 1; } // Required field holding the request. The list will be ordered and will have the same order as the original Read response. @@ -141,6 +145,24 @@ message AckResponse { optional Handshake handshake = 2; } +message NackRequest { + message Request { + // Required field holding the offset to be nacked + repeated Offset offsets = 1; + } + // Required field holding the request. The list will be ordered and will have the same order as the original Read response. + Request request = 1; +} + +message NackResponse { + message Result { + // Required field indicating the nack request is successful. + google.protobuf.Empty success = 1; + } + // Required field holding the result. + Result result = 1; +} + /* * ReadyResponse is the health check result for user defined source. */ diff --git a/src/test/java/io/numaproj/numaflow/mapstreamer/ServiceErrTest.java b/src/test/java/io/numaproj/numaflow/mapstreamer/ServiceErrTest.java index fa3572a1..9ca57fc1 100644 --- a/src/test/java/io/numaproj/numaflow/mapstreamer/ServiceErrTest.java +++ b/src/test/java/io/numaproj/numaflow/mapstreamer/ServiceErrTest.java @@ -122,9 +122,7 @@ public void testMapStreamerFailure() { responseObserver.done.get(); fail("Expected exception not thrown"); } catch (Exception e) { - assertEquals( - "io.grpc.StatusRuntimeException: INTERNAL: unknown exception", - e.getMessage()); + assert(e.getMessage().contains("UDF_EXECUTION_ERROR")); } } diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java index 57b3c6ea..796cd02e 100644 --- a/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java @@ -153,6 +153,11 @@ public void ack(AckRequest request) { } + @Override + public void nack(NackRequest request) { + throw new RuntimeException("unknown exception"); + } + @Override public List getPartitions() { return Sourcer.defaultPartitions(); diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java index 2e1796ed..ca1aa8b7 100644 --- a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java @@ -80,7 +80,7 @@ public void TestSourcer() { .build()) .build(); List ackRequests = new ArrayList<>(); - + List offsets = new ArrayList<>(); StreamObserver readRequestObserver = stub.readFn(new StreamObserver<>() { int count = 0; boolean handshake = false; @@ -99,6 +99,7 @@ public void onNext(SourceOuterClass.ReadResponse readResponse) { } count++; SourceOuterClass.Offset offset = readResponse.getResult().getOffset(); + offsets.add(offset); SourceOuterClass.AckRequest.Request ackRequest = SourceOuterClass.AckRequest .newBuilder() .getRequest() @@ -199,6 +200,20 @@ public void onCompleted() { } }); + stub.nackFn(SourceOuterClass.NackRequest.newBuilder().setRequest(SourceOuterClass.NackRequest.Request.newBuilder().addAllOffsets(offsets).build()).build(), new StreamObserver<>() { + @Override + public void onNext(SourceOuterClass.NackResponse nackResponse) { + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }); + readRequestObserver.onCompleted(); ackRequestObserver.onCompleted(); } @@ -207,6 +222,7 @@ private static class TestSourcer extends Sourcer { List messages = new ArrayList<>(); AtomicInteger readIndex = new AtomicInteger(0); Map yetToBeAcked = new ConcurrentHashMap<>(); + Map nacked = new ConcurrentHashMap<>(); public TestSourcer() { Instant eventTime = Instant.ofEpochMilli(1000L); @@ -222,6 +238,15 @@ public TestSourcer() { @Override public void read(ReadRequest request, OutputObserver observer) { + if (!nacked.isEmpty()) { + for (int i = 0; i < nacked.size(); i++) { + observer.send(messages.get(readIndex.get())); + yetToBeAcked.put(readIndex.get(), true); + readIndex.incrementAndGet(); + } + nacked.clear(); + } + if (readIndex.get() >= messages.size()) { return; } @@ -248,6 +273,16 @@ public void ack(AckRequest request) { } } + @Override + public void nack(NackRequest request) { + for (Offset offset : request.getOffsets()) { + Integer decoded_offset = ByteBuffer.wrap(offset.getValue()).getInt(); + yetToBeAcked.remove(decoded_offset); + nacked.put(decoded_offset, true); + readIndex.decrementAndGet(); + } + } + @Override public long getPending() { return messages.size() - readIndex.get(); From 5e6927f38b3cbda5e13e86a4e6603ce305271ed9 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 3 Oct 2025 17:47:49 +0530 Subject: [PATCH 2/5] tests Signed-off-by: Yashash H L --- .../io/numaproj/numaflow/sourcer/Service.java | 7 ++++ .../sourcer/NackOutputStreamObserver.java | 36 +++++++++++++++++++ .../numaflow/sourcer/ServerErrTest.java | 33 ++++++++++++++++- 3 files changed, 75 insertions(+), 1 deletion(-) create mode 100644 src/test/java/io/numaproj/numaflow/sourcer/NackOutputStreamObserver.java diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Service.java b/src/main/java/io/numaproj/numaflow/sourcer/Service.java index 9627d75f..2dc80e89 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Service.java @@ -200,6 +200,13 @@ public void nackFn( } NackRequestImpl nackRequestImpl = new NackRequestImpl(offsets); this.sourcer.nack(nackRequestImpl); + SourceOuterClass.NackResponse nackResponse = SourceOuterClass.NackResponse + .newBuilder() + .setResult(SourceOuterClass.NackResponse.Result.newBuilder().setSuccess( + Empty.newBuilder().build())) + .build(); + responseObserver.onNext(nackResponse); + responseObserver.onCompleted(); } catch (Exception e) { log.error("Encountered error in nackFn", e); com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); diff --git a/src/test/java/io/numaproj/numaflow/sourcer/NackOutputStreamObserver.java b/src/test/java/io/numaproj/numaflow/sourcer/NackOutputStreamObserver.java new file mode 100644 index 00000000..de188842 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/sourcer/NackOutputStreamObserver.java @@ -0,0 +1,36 @@ +package io.numaproj.numaflow.sourcer; + + +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.source.v1.SourceOuterClass; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +public class NackOutputStreamObserver implements StreamObserver { + private final List nackResponses = new ArrayList<>(); + public AtomicReference completed = new AtomicReference<>(false); + public Throwable t; + + public List getNackResponse() { + return nackResponses; + } + + @Override + public void onNext(SourceOuterClass.NackResponse datum) { + nackResponses.add(datum); + } + + @Override + public void onError(Throwable throwable) { + t = throwable; + this.completed.set(true); + } + + @Override + public void onCompleted() { + this.completed.set(true); + } +} + diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java index 796cd02e..f2cc9a5d 100644 --- a/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java @@ -16,6 +16,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class ServerErrTest { @@ -67,7 +68,7 @@ public void TestSourcerErr() { .build()) .build(); - StreamObserver readRequestObserver = stub.readFn(new StreamObserver() { + StreamObserver readRequestObserver = stub.readFn(new StreamObserver<>() { @Override public void onNext(SourceOuterClass.ReadResponse readResponse) { // Handle onNext @@ -140,6 +141,36 @@ public void sourceWithoutReadHandshake() { outputStreamObserver.t.getMessage()); } + @Test + public void testNackError() { + var stub = SourceGrpc.newStub(inProcessChannel); + + // Create an output stream observer + NackOutputStreamObserver outputStreamObserver = new NackOutputStreamObserver(); + + // Create a nack request with some offsets + SourceOuterClass.Offset offset = SourceOuterClass.Offset.newBuilder() + .setOffset(com.google.protobuf.ByteString.copyFrom(new byte[]{0, 0, 0, 1})) + .setPartitionId(0) + .build(); + + SourceOuterClass.NackRequest nackRequest = SourceOuterClass.NackRequest.newBuilder() + .setRequest(SourceOuterClass.NackRequest.Request.newBuilder() + .addOffsets(offset) + .build()) + .build(); + + // Invoke nackFn which should throw an exception + stub.nackFn(nackRequest, outputStreamObserver); + + // Wait for the server to process the request + while (!outputStreamObserver.completed.get()) ; + + // Check if an error was received + assertNotNull(outputStreamObserver.t); + assertTrue(outputStreamObserver.t.getMessage().contains("unknown exception")); + } + private static class TestSourcerErr extends Sourcer { @Override From 298bc25963c47c951cd334174a2eb0082d05bb71 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 3 Oct 2025 18:09:44 +0530 Subject: [PATCH 3/5] update simple source example Signed-off-by: Yashash H L --- .../examples/source/simple/SimpleSource.java | 69 +++++++++++++------ 1 file changed, 47 insertions(+), 22 deletions(-) diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java index 56ac2006..fb0b7321 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.examples.source.simple; -import com.google.common.primitives.Longs; import io.numaproj.numaflow.sourcer.AckRequest; import io.numaproj.numaflow.sourcer.Message; import io.numaproj.numaflow.sourcer.NackRequest; @@ -11,12 +10,14 @@ import io.numaproj.numaflow.sourcer.Sourcer; import lombok.extern.slf4j.Slf4j; +import java.nio.ByteBuffer; import java.time.Instant; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** * SimpleSource is a simple implementation of Sourcer. @@ -27,8 +28,9 @@ @Slf4j public class SimpleSource extends Sourcer { - private final Map messages = new ConcurrentHashMap<>(); - private long readIndex = 0; + private final Map yetToBeAcked = new ConcurrentHashMap<>(); + Map nacked = new ConcurrentHashMap<>(); + private final AtomicInteger readIndex = new AtomicInteger(0); public static void main(String[] args) throws Exception { Server server = new Server(new SimpleSource()); @@ -43,56 +45,79 @@ public static void main(String[] args) throws Exception { @Override public void read(ReadRequest request, OutputObserver observer) { long startTime = System.currentTimeMillis(); - if (messages.entrySet().size() > 0) { + + // if there are messages which got nacked, we should read them first. + if (!nacked.isEmpty()) { + for (int i = 0; i < nacked.size(); i++) { + Integer index = readIndex.incrementAndGet(); + yetToBeAcked.put(index, true); + observer.send(constructMessage(index)); + } + nacked.clear(); + } + + if (!yetToBeAcked.isEmpty()) { // if there are messages not acknowledged, return return; } + Integer index = readIndex.incrementAndGet(); + for (int i = 0; i < request.getCount(); i++) { if (System.currentTimeMillis() - startTime > request.getTimeout().toMillis()) { return; } - - Map headers = new HashMap<>(); - headers.put("x-txn-id", UUID.randomUUID().toString()); - - // create a message with increasing offset - Offset offset = new Offset(Longs.toByteArray(readIndex)); - Message message = new Message( - Long.toString(readIndex).getBytes(), - offset, - Instant.now(), - headers); // send the message to the observer - observer.send(message); + observer.send(constructMessage(index)); // keep track of the messages read and not acknowledged - messages.put(readIndex, true); - readIndex += 1; + yetToBeAcked.put(index, true); + readIndex.incrementAndGet(); } } @Override public void ack(AckRequest request) { for (Offset offset : request.getOffsets()) { - Long decoded_offset = Longs.fromByteArray(offset.getValue()); + Integer decoded_offset = ByteBuffer.wrap(offset.getValue()).getInt(); // remove the acknowledged messages from the map - messages.remove(decoded_offset); + yetToBeAcked.remove(decoded_offset); } } @Override public void nack(NackRequest request) { - + // put them to nacked offsets so that they will be retried immediately. + for (Offset offset : request.getOffsets()) { + Integer decoded_offset = ByteBuffer.wrap(offset.getValue()).getInt(); + yetToBeAcked.remove(decoded_offset); + nacked.put(decoded_offset, true); + readIndex.decrementAndGet(); + } } @Override public long getPending() { // number of messages not acknowledged yet - return messages.size(); + return yetToBeAcked.size(); } @Override public List getPartitions() { return Sourcer.defaultPartitions(); } + + private Message constructMessage(Integer readIndex) { + Map headers = new HashMap<>(); + headers.put("x-txn-id", UUID.randomUUID().toString()); + + // create a message with increasing offset + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + buffer.putInt(readIndex); + Offset offset = new Offset(buffer.array()); + return new Message( + Integer.toString(readIndex).getBytes(), + offset, + Instant.now(), + headers); + } } From 0774cc37491f92394c84c7b6c6ad808507b5752f Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 3 Oct 2025 20:41:11 +0530 Subject: [PATCH 4/5] address review comments Signed-off-by: Yashash H L --- .../examples/source/simple/SimpleSource.java | 1 - .../io/numaproj/numaflow/sourcer/Service.java | 2 +- src/main/proto/source/v1/source.proto | 4 +- .../numaproj/numaflow/sourcer/ServerTest.java | 55 ++++++++++++++++++- 4 files changed, 57 insertions(+), 5 deletions(-) diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java index fb0b7321..2148de21 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java @@ -71,7 +71,6 @@ public void read(ReadRequest request, OutputObserver observer) { observer.send(constructMessage(index)); // keep track of the messages read and not acknowledged yetToBeAcked.put(index, true); - readIndex.incrementAndGet(); } } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Service.java b/src/main/java/io/numaproj/numaflow/sourcer/Service.java index 2dc80e89..d327a7a0 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Service.java @@ -198,7 +198,7 @@ public void nackFn( offset.getOffset().toByteArray(), offset.getPartitionId())); } - NackRequestImpl nackRequestImpl = new NackRequestImpl(offsets); + NackRequest nackRequestImpl = new NackRequestImpl(offsets); this.sourcer.nack(nackRequestImpl); SourceOuterClass.NackResponse nackResponse = SourceOuterClass.NackResponse .newBuilder() diff --git a/src/main/proto/source/v1/source.proto b/src/main/proto/source/v1/source.proto index 10a7e080..a1fbdfeb 100644 --- a/src/main/proto/source/v1/source.proto +++ b/src/main/proto/source/v1/source.proto @@ -116,7 +116,7 @@ message ReadResponse { */ message AckRequest { message Request { - // Required field holding the offset to be acked + // Required field holding the offsets to be acked repeated Offset offsets = 1; } // Required field holding the request. The list will be ordered and will have the same order as the original Read response. @@ -147,7 +147,7 @@ message AckResponse { message NackRequest { message Request { - // Required field holding the offset to be nacked + // Required field holding the offsets to be nacked repeated Offset offsets = 1; } // Required field holding the request. The list will be ordered and will have the same order as the original Read response. diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java index ca1aa8b7..14c29b46 100644 --- a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java @@ -200,9 +200,17 @@ public void onCompleted() { } }); - stub.nackFn(SourceOuterClass.NackRequest.newBuilder().setRequest(SourceOuterClass.NackRequest.Request.newBuilder().addAllOffsets(offsets).build()).build(), new StreamObserver<>() { + // Nack the last 5 messages (indices 5-9) + List nackedOffsets = offsets.subList(5, 10); + stub.nackFn(SourceOuterClass.NackRequest.newBuilder() + .setRequest(SourceOuterClass.NackRequest.Request.newBuilder() + .addAllOffsets(nackedOffsets) + .build()) + .build(), new StreamObserver<>() { @Override public void onNext(SourceOuterClass.NackResponse nackResponse) { + // Verify nack was successful + assertTrue(nackResponse.hasResult()); } @Override @@ -214,6 +222,51 @@ public void onCompleted() { } }); + // After nacking, read again to verify we get the nacked messages back + List rereadOffsets = new ArrayList<>(); + StreamObserver rereadRequestObserver = stub.readFn(new StreamObserver<>() { + int count = 0; + boolean handshake = false; + + @Override + public void onNext(SourceOuterClass.ReadResponse readResponse) { + // Handle handshake response + if (readResponse.hasHandshake() && readResponse.getHandshake().getSot()) { + handshake = true; + return; + } + if (readResponse.getStatus().getEot()) { + return; + } + count++; + // Decode the offset to verify it's one of the nacked messages + SourceOuterClass.Offset offset = readResponse.getResult().getOffset(); + Integer decodedOffset = ByteBuffer.wrap(offset.getOffset().toByteArray()).getInt(); + rereadOffsets.add(decodedOffset); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + // We should have read 5 nacked messages + assertEquals(5, count); + assertTrue(handshake); + // Verify the offsets are the ones we nacked (5-9) + for (int i = 0; i < 5; i++) { + assertEquals(Integer.valueOf(5 + i), rereadOffsets.get(i)); + } + } + }); + + // Send handshake for the re-read + rereadRequestObserver.onNext(handshakeRequest); + // Request to read the nacked messages + rereadRequestObserver.onNext(request); + rereadRequestObserver.onCompleted(); + readRequestObserver.onCompleted(); ackRequestObserver.onCompleted(); } From 79452956293f9cc669502548deaae650650fa293 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 3 Oct 2025 21:09:44 +0530 Subject: [PATCH 5/5] move increment and get Signed-off-by: Yashash H L --- .../numaflow/examples/source/simple/SimpleSource.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java index 2148de21..44a108bc 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java @@ -61,12 +61,12 @@ public void read(ReadRequest request, OutputObserver observer) { return; } - Integer index = readIndex.incrementAndGet(); - for (int i = 0; i < request.getCount(); i++) { if (System.currentTimeMillis() - startTime > request.getTimeout().toMillis()) { return; } + + Integer index = readIndex.incrementAndGet(); // send the message to the observer observer.send(constructMessage(index)); // keep track of the messages read and not acknowledged