Skip to content

Always surface latest activity failure when a Workflow ends and an Activity is/was retrying #10354

@colan-dremio

Description

@colan-dremio

Is your feature request related to a problem? Please describe.
In most cases, Activities with retriable failures will show the latest failure, for example while the Workflow is still running and the Activity is in the backoff retry time. This is very helpful for troubleshooting. There are some cases, particularly Workflow cancellation, where the latest failure is lost. While a retriable failure is not itself the direct cause of the cancellation, it is still helpful to have the context serialized into the Workflow execution history for debugging. The context for the failure will be available in worker logs, but saving the need to dig through logs is good for observability.

This "missing" observability issue may be specific to Activities that are in the backoff retry time at the time of cancellation. In contrast, cancelling an Activity that has heartbeats and WAIT_CANCELLATION_COMPLETED option will persist the last failure as part of the Activity Task Started event.

For Activities in backoff retry time at the time of cancellation, the Activity Task Started event is not registered at all. The docs are a bit ambiguous here, it mentions that this event will be persisted on terminal events, but does not exhaustively list what events are terminal. Cancellation seems like it should be terminal, but is not mentioned.

Describe the solution you'd like
Make sure that the latest activity failure is serialized into the Workflow execution history even on less traditional Workflow terminations (e.g. cancellation). In this case, I would still write an Activity Task Started even to the history even though the activity has failed retriably and we are in the backoff wait time (e.g. have not started the next retry).

Additional context
An example screenshot from an Activity with heartbeats and the WAIT_CANCELLATION_COMPLETED option. Note that the Activity Task Started event is persisted, this is the desired behavior even for other types of cancellation.

Image

Sample test
A failing Java unit test demonstrating that there is no Activity Task Started event on cancellation. In this case, the timing makes it nearly guaranteed that the cancellation comes in during a backoff retry time.

public class TestCancellation {

  @WorkflowInterface
  public interface MyWorkflow {
    @WorkflowMethod void run();
  }

  public static class MyWorkflowImpl implements MyWorkflow {
    private static final Duration INTERNAL_WORKFLOW_TIMEOUT = Duration.ofSeconds(60);
    private final MyActivities myActivities = Workflow.newActivityStub(
        MyActivities.class,
        ActivityOptions.newBuilder()
            .setStartToCloseTimeout(Duration.ofSeconds(30))
            .setRetryOptions(RetryOptions.newBuilder()
                .setMaximumAttempts(10)
                .build())
            .build());

    // Uncomment me to see basic workflow working with all expected events
//    @Override
//    public void run() {
//      doRun();
//    }

    @Override
    public void run() {
      CancellationScope mainScope = Workflow.newCancellationScope(this::doRun);

      CancellationScope timerScope =
          Workflow.newCancellationScope(
              () ->
                  Workflow.newTimer(INTERNAL_WORKFLOW_TIMEOUT)
                      .thenApply(
                          ignored -> {
                            mainScope.cancel("exceeded internal timeout");
                            return null;
                          }));
      timerScope.run();

      try {
        mainScope.run();
      } catch (Exception e) {
        timerScope.cancel();
        throw e;
      }
    }

    private void doRun() {
      myActivities.activityOne();
      myActivities.activityTwo();
    }
  }

  @ActivityInterface
  public interface MyActivities {
    @ActivityMethod void activityOne();
    @ActivityMethod void activityTwo();
  }

  public static class MyActivitiesImpl implements MyActivities {
    public void activityOne() {
      // No-op
    }
    public void activityTwo() {
      throw ApplicationFailure.newFailure("retriable failure", "type");
    }
  }

  private final MyActivities myActivities = spy(MyActivitiesImpl.class);

  @RegisterExtension
  public static final TestWorkflowExtension TEST_WORKFLOW_EXTENSION =
      TestWorkflowExtension.newBuilder()
          .registerWorkflowImplementationTypes(MyWorkflowImpl.class)
          .setDoNotStart(true)
          .build();

  @Test
  void testBasic(
      TestWorkflowEnvironment testEnv, Worker worker, WorkflowOptions workflowOptions) {
    worker.registerActivitiesImplementations(myActivities);
    testEnv.start();

    MyWorkflow myWorkflow = testEnv.getWorkflowClient()
        .newWorkflowStub(MyWorkflow.class, workflowOptions);
    WorkflowExecution execution = WorkflowClient.start(myWorkflow::run);

    testEnv.sleep(INTERNAL_WORKFLOW_TIMEOUT.plus(Duration.ofSeconds(5)));

    assertThatThrownBy(() -> WorkflowStub.fromTyped(myWorkflow).getResult(Void.class))
        .isInstanceOf(WorkflowFailedException.class);

    WorkflowExecutionHistory history =
        testEnv.getWorkflowClient().fetchHistory(execution.getWorkflowId());

    assertThat(getActivityTypesThatWereScheduledAndStarted(history))
        .containsExactlyInAnyOrder("ActivityOne");
  }

  @Test
  void testCancellation(
      TestWorkflowEnvironment testEnv, Worker worker, WorkflowOptions workflowOptions) {
    worker.registerActivitiesImplementations(myActivities);
    testEnv.start();

    MyWorkflow myWorkflow = testEnv.getWorkflowClient()
        .newWorkflowStub(MyWorkflow.class, workflowOptions);
    WorkflowExecution execution = WorkflowClient.start(myWorkflow::run);

    testEnv.sleep(INTERNAL_WORKFLOW_TIMEOUT.plus(Duration.ofSeconds(5)));

    assertThatThrownBy(() -> WorkflowStub.fromTyped(myWorkflow).getResult(Void.class))
        .isInstanceOf(WorkflowFailedException.class);

    WorkflowExecutionHistory history =
        testEnv.getWorkflowClient().fetchHistory(execution.getWorkflowId());

    // Test will fail here, not finding an Activity Task Started event for ActivityTwo
    // Having an Activity Task Started event for ActivityTwo with the latest failure would improve observability
    assertThat(getActivityTypesThatWereScheduledAndStarted(history))
        .containsExactlyInAnyOrder("ActivityOne", "ActivityTwo");
  }

  private List<String> getActivityTypesThatWereScheduledAndStarted(WorkflowExecutionHistory history) {
    // We're really just looking for Activity _started_ events here
    // It looks more convoluted because we need to look up the Activity _scheduled_ event to determine the Activity type
    return history.getEvents().stream()
        .filter(HistoryEvent::hasActivityTaskStartedEventAttributes)
        .map(HistoryEvent::getActivityTaskStartedEventAttributes)
        .map(ActivityTaskStartedEventAttributes::getScheduledEventId)
        .map(activityTaskStartedEventId -> history.getEvents().stream()
            .filter(e -> activityTaskStartedEventId.equals(e.getEventId()))
            .findFirst()
            .orElseThrow())
        .map(HistoryEvent::getActivityTaskScheduledEventAttributes)
        .map(ActivityTaskScheduledEventAttributes::getActivityType)
        .map(ActivityType::getName)
        .toList();
  }
}

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions