You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Flink Agents currently supports async execution for Python Actions. With execute_async, Actions can submit functions to a thread pool when handling high-latency I/O, allowing the operator to yield execution. This enables non-blocking progress within the same key sequence and parallel execution across multiple Actions. The current Python async execution API is as follows:
classRunnerContext(ABC):
@abstractmethoddefexecute_async(self, func: Callable[[Any], Any], *args: Tuple[Any, ...], **kwargs: Dict[str, Any]) ->Any:
"""Asynchronously execute the provided function. Access to memory is prohibited within the function. Parameters ---------- func : Callable The function need to be asynchronously processing. *args : tuple Positional arguments to pass to the function. **kwargs : dict Keyword arguments to pass to the function. Returns: ------- Any The result of the function. """
Through this API, an Action is divided into multiple code blocks (i.e., a single function call wrapped by execute or execute_async); the code block becomes the minimum granularity for Action execution and scheduling.
Per-Action State Consistency
Per-Action State Consistency works by writing Memory changes after an Action completes to the ActionStateStore. When a job fails over and recovers from a checkpoint, it can replay data from the ActionStateStore to avoid re-executing Actions.
The Problem
The minimum granularity of these two mechanisms is inconsistent: recovery and deduplication operate at the "Action" level, while execution and scheduling operate at the "code block" level. When an async code block has completed but the Action hasn't finished and a failover occurs, that code block will be re-executed upon recovery, causing:
Duplicate external calls (potential side effects)
Unnecessary waiting and costs
To address this, we want to reduce the granularity of consistency and recovery to the "code block" level. This introduces fine-grained durable execution: record and persist the return result of each call to execute_async (and synchronous execute); during job recovery, if the same call is encountered (same function and arguments, same call order), directly return the stored result to avoid re-execution. This mechanism requires calls to be deterministic (same inputs and order) and that the function body doesn't access Memory, ensuring results can be cached and replayed.
Public Interface
Update RunnerContext Interface
This section enhances the existing execute_async and introduces the execute interface for RunnerContext: when a job recovers from a checkpoint and encounters the "same call" as before the failure (same function and arguments, same call order, same key and sequence number), it directly returns the historical result to avoid re-execution.
The core difference between the two interfaces is the execution mode:
execute_async is an asynchronous method that submits the function to a thread pool and yields execution, allowing the operator to process other events while waiting.
execute is a synchronous method that executes directly in the current thread synchronously, blocking the operator during execution. The return value is obtained directly upon calling.
classFlinkRunnerContext(RunnerContext):
@overridedefexecute_async(
self,
func: Callable[[Any], Any],
*args: Tuple[Any, ...],
**kwargs: Dict[str, Any],
) ->Any:
"""Asynchronously execute the provided function. Access to memory is prohibited within the function. The result of the function will be stored and returned when the same execute_async call is made again during job recovery. The arguments and the result must be serializable. The action that calls this API should be deterministic, meaning that the it will always make the execute_async call with the same arguments and in the same order during job recovery. Otherwise, the behavior is undefined. Parameters ---------- func : Callable The function need to be asynchronously processing. *args : tuple Positional arguments to pass to the function. **kwargs : dict Keyword arguments to pass to the function. Returns: ------- Any The result of the function. """@overridedefexecute(
self,
func: Callable[[Any], Any],
*args: Tuple[Any, ...],
**kwargs: Dict[str, Any],
) ->Any:
"""Synchronously execute the provided function. Access to memory is prohibited within the function. The result of the function will be stored and returned when the same execute call is made again during job recovery. The arguments and the result must be serializable. The function is executed synchronously in the current thread, blocking the operator until completion. The action that calls this API should be deterministic, meaning that the it will always make the execute call with the same arguments and in the same order during job recovery. Otherwise, the behavior is undefined. Parameters ---------- func : Callable The function need to be executing. *args : tuple Positional arguments to pass to the function. **kwargs : dict Keyword arguments to pass to the function. Returns: ------- Any The result of the function. """
Key Points
execute and execute_async have different execution modes, but identical persistence semantics: results from both calls are cached, and during recovery, they are matched by call order and the cached result is returned directly.
Calls must be deterministic: given the same input, the call order and arguments of execute / execute_async must be consistent. If inconsistency is detected during recovery, the framework clears the cache and re-executes.
Upon first completion, the return value (or exception) is persisted; when hit during recovery, it's returned directly (or the same exception is re-raised), without re-executing the function body.
Accessing Memory is prohibited within the function body; Memory changes are persisted by the framework after the Action completes.
Currently, code block level automatic retry is not provided; users need to handle it in their defined function body. Future plans include providing framework-level retry mechanisms (see Future Work).
Example Explanation
The following example demonstrates code block partitioning and recovery behavior:
@action(InputEvent)@staticmethodasyncdefprocess(event: Event, ctx: RunnerContext):
user_id=event.input# ════════════════════════════════════════════════════════════════════# Code Block 1: functionId="fetch_user_profile", argsDigest=hash(user_id)# - First execution: call fetch_user_profile → persist result to callRecords[0]# - On recovery: validate functionId/argsDigest → if hit, skip execution, return cached result# ════════════════════════════════════════════════════════════════════deffetch_user_profile(uid: str) ->dict:
print(f"Fetching profile for {uid}") # Will not execute if recovery hits cachereturnrequests.get(f"https://api.example.com/users/{uid}").json()
profile=awaitctx.execute_async(fetch_user_profile, user_id)
# The following code is outside the code block, will be re-executed on recoveryprint(f"Got profile: {profile['name']}") # Will execute again on recovery# ════════════════════════════════════════════════════════════════════# Code Block 2: functionId="compute_score", argsDigest=hash(profile)# - First execution: call compute_score → persist result to callRecords[1]# - On recovery: validate functionId/argsDigest → if hit, skip execution, return cached result# ════════════════════════════════════════════════════════════════════defcompute_score(p: dict) ->int:
print(f"Computing score for {p['name']}") # Will not execute if recovery hits cachereturnp['activity_count'] *10+p['follower_count']
score=ctx.execute(compute_score, profile)
# The following code is outside the code block, will be re-executed on recoveryprint(f"Computed score: {score}") # Will execute again on recovery# ════════════════════════════════════════════════════════════════════# Memory changes: not part of code block, will be re-executed on recovery# - First execution: write to Memory → persisted by framework after Action completes# - On recovery: framework first replays Memory to the state at Action start, then re-executes the following code# ════════════════════════════════════════════════════════════════════ctx.short_term_memory.set("last_user", user_id) # Will be re-executed on recoveryctx.short_term_memory.set("score", score) # Will be re-executed on recoveryctx.send_event(OutputEvent(output=score))
Summary:
Function calls wrapped by execute / execute_async are code blocks; results are persisted, and on recovery, if hit, execution is skipped
Code outside code blocks (including Memory changes) will be re-executed on recovery
Implementation Details
State Model
This section defines the storage structure for function calls in ActionState.
Introduce CallRecord to record the execution result of function calls.
// NEW: embedded record for fine-grained callspublicstaticfinalclassCallRecord {
privatefinalStringfunctionId; // module+qualname or Java signatureprivatefinalStringargsDigest; // stable digest of serialized argsprivatebyte[] resultPayload; // serialized return value (nullable)privatebyte[] exceptionPayload; // serialized exception info (nullable)
}
During recovery, success/failure is determined by whether exceptionPayload is null: if exceptionPayload != null, throw the exception; otherwise, return resultPayload.
Add callRecords and completed fields to ActionState.
To persist results after each code block completes, the timing of ActionState writes needs to be adjusted:
Timing
Content Written
completed
callRecords
After each code block completes
callRecords update
false
Retained
After Action completes
memoryUpdates + outputEvents
true
Cleared
When code block completes: Write ActionState containing the new CallRecord to ActionStateStore (e.g., Kafka).
When Action completes: Set completed = true and write the final Memory changes and Output Events together. Also clear callRecords, since the Action is complete and recovery will skip the entire Action, so code block level recovery information is no longer needed. This reduces storage space and serialization overhead.
Recovery Logic
The current implementation determines whether an Action needs execution by checking whether ActionState exists. Since the new design writes ActionState after code block completion, the judgment logic needs to be modified:
ActionState actionState = actionStateStore.get(key, seqNum, action, event);
if (actionState != null && actionState.isCompleted()) {
// Action completed, skip execution, directly replay Memory and OutputEvents
} else {
// Action not completed or ActionState doesn't exist
// Re-execute Action, use callRecords to skip completed code blocks
}
ActionState exists and completed = true: Action completed, skip execution, directly replay Memory and OutputEvents.
ActionState exists but completed = false: Action execution was interrupted (failover), re-execute Action. During execution, if an existing CallRecord is encountered, validate and return the cached result, skipping actual execution.
ActionState doesn't exist: First execution, normal flow.
Write and Recovery Matching
Write: When a call completes (success or failure), record the corresponding CallRecord and persist resultPayload or exceptionPayload.
Hit: During recovery, query the corresponding ActionState by (key, sequenceNumber, actionName), and get the CallRecord by the current call's order position (the Nth submission corresponds to the Nth record); after a hit, first validate that functionId and argsDigest match:
Validation fails: Determined as non-deterministic call order, clear this CallRecord and subsequent records, output WARN log, then re-execute
Validation passes: Check if exceptionPayload is null
exceptionPayload == null: Directly return the decoded resultPayload
exceptionPayload != null: Decode and throw the equivalent exception
Miss: Execute normally and add a new record upon completion.
Error handling: Deserialization failures or content inconsistencies should fail fast, outputting diagnostic information including the call sequence number and validation digest.
Failure Semantics
Decoding failures
When decoding resultPayload/exceptionPayload fails, fail fast immediately, outputting information including (key, sequenceNumber, actionName) and the call sequence number for diagnosis.
At-least-once semantics
The current design tries to avoid duplicate side effects but cannot guarantee it. For example, if a code block completes execution but the CallRecord hasn't been persisted to ActionStateStore when a failover occurs, that code block will be re-executed, potentially causing duplicate side effects (such as duplicate LLM calls, additional costs, etc.).
For operations with side effects, users should ensure operations are idempotent or can tolerate occasional duplicate execution.
Framework provides unique Call ID: Automatically generate a unique identifier for each execute / execute_async call (based on key + sequenceNumber + actionName + callIndex), which users can pass to external systems as an idempotency key. Example:
asyncdefmake_payment(event: Event, ctx: RunnerContext):
defcharge(call_id: str, amount: int) ->dict:
# External system implements idempotency via call_idreturnpayment_api.charge(idempotency_key=call_id, amount=amount)
# Framework automatically injects call_id as the first argumentresult=awaitctx.execute_async(charge, amount=100)
Future Work
Framework-Level Retry Support
The current design doesn't provide code block level automatic retry; users need to implement retry logic in the function body themselves. Future considerations include supporting user-specified retry policies at the framework level:
# Possible API formresult=awaitctx.execute_async(
func,
arg,
retry=RetryPolicy(max_retries=3, backoff="exponential")
)
Implementation needs to consider the following issues:
Diversity of retry policies: Different scenarios require different retry policies (retry count, backoff strategy, retry conditions, etc.), requiring a flexible RetryPolicy interface design.
Compatibility with existing semantics: After a successful retry, only the final successful result is stored; after all retries fail, the last exception is stored. During recovery, the final result is returned directly or the exception is thrown, maintaining consistency with current behavior.
Exactly-once Semantics Support
The current design is based on at-least-once semantics, relying on users to implement idempotency to avoid side effects from duplicate execution. Future work may consider extending the capabilities of execute_async to support true exactly-once semantics.
The core idea is: execute_async would no longer be limited to receiving a simple Callable, but could accept objects implementing specific protocols, choosing the appropriate commit strategy based on the external system's transactional capabilities:
# Possible API formclassTwoPhaseCommitCallable(Protocol):
defprepare(self) ->PrepareResult:
"""Prepare phase: execute the operation but don't commit yet"""
...
defcommit(self, prepare_result: PrepareResult) ->None:
"""Commit phase: confirm the prepared operation"""
...
defabort(self, prepare_result: PrepareResult) ->None:
"""Abort phase: cancel the prepared operation"""
...
# Usage exampleresult=awaitctx.execute_async(MyTwoPhaseCommitOperation(...))
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
Motivation
Async Execution
Flink Agents currently supports async execution for Python Actions. With
execute_async, Actions can submit functions to a thread pool when handling high-latency I/O, allowing the operator to yield execution. This enables non-blocking progress within the same key sequence and parallel execution across multiple Actions. The current Python async execution API is as follows:Through this API, an Action is divided into multiple code blocks (i.e., a single function call wrapped by
executeorexecute_async); the code block becomes the minimum granularity for Action execution and scheduling.Per-Action State Consistency
Per-Action State Consistency works by writing Memory changes after an Action completes to the ActionStateStore. When a job fails over and recovers from a checkpoint, it can replay data from the ActionStateStore to avoid re-executing Actions.
The Problem
The minimum granularity of these two mechanisms is inconsistent: recovery and deduplication operate at the "Action" level, while execution and scheduling operate at the "code block" level. When an async code block has completed but the Action hasn't finished and a failover occurs, that code block will be re-executed upon recovery, causing:
Duplicate external calls (potential side effects)
Unnecessary waiting and costs
To address this, we want to reduce the granularity of consistency and recovery to the "code block" level. This introduces fine-grained durable execution: record and persist the return result of each call to
execute_async(and synchronousexecute); during job recovery, if the same call is encountered (same function and arguments, same call order), directly return the stored result to avoid re-execution. This mechanism requires calls to be deterministic (same inputs and order) and that the function body doesn't access Memory, ensuring results can be cached and replayed.Public Interface
Update RunnerContext Interface
This section enhances the existing
execute_asyncand introduces theexecuteinterface forRunnerContext: when a job recovers from a checkpoint and encounters the "same call" as before the failure (same function and arguments, same call order, same key and sequence number), it directly returns the historical result to avoid re-execution.The core difference between the two interfaces is the execution mode:
execute_asyncis an asynchronous method that submits the function to a thread pool and yields execution, allowing the operator to process other events while waiting.executeis a synchronous method that executes directly in the current thread synchronously, blocking the operator during execution. The return value is obtained directly upon calling.Key Points
executeandexecute_asynchave different execution modes, but identical persistence semantics: results from both calls are cached, and during recovery, they are matched by call order and the cached result is returned directly.Calls must be deterministic: given the same input, the call order and arguments of
execute/execute_asyncmust be consistent. If inconsistency is detected during recovery, the framework clears the cache and re-executes.Upon first completion, the return value (or exception) is persisted; when hit during recovery, it's returned directly (or the same exception is re-raised), without re-executing the function body.
Accessing Memory is prohibited within the function body; Memory changes are persisted by the framework after the Action completes.
Currently, code block level automatic retry is not provided; users need to handle it in their defined function body. Future plans include providing framework-level retry mechanisms (see Future Work).
Example Explanation
The following example demonstrates code block partitioning and recovery behavior:
Summary:
execute/execute_asyncare code blocks; results are persisted, and on recovery, if hit, execution is skippedImplementation Details
State Model
This section defines the storage structure for function calls in ActionState.
CallRecordto record the execution result of function calls.During recovery, success/failure is determined by whether
exceptionPayloadis null: ifexceptionPayload != null, throw the exception; otherwise, returnresultPayload.callRecordsandcompletedfields toActionState.Persistence Timing
To persist results after each code block completes, the timing of ActionState writes needs to be adjusted:
completedcallRecordscallRecordsupdatefalsememoryUpdates+outputEventstrueWhen code block completes: Write ActionState containing the new
CallRecordto ActionStateStore (e.g., Kafka).When Action completes: Set
completed = trueand write the final Memory changes and Output Events together. Also clearcallRecords, since the Action is complete and recovery will skip the entire Action, so code block level recovery information is no longer needed. This reduces storage space and serialization overhead.Recovery Logic
The current implementation determines whether an Action needs execution by checking
whether ActionState exists. Since the new design writes ActionState after code block completion, the judgment logic needs to be modified:ActionState exists and
completed = true: Action completed, skip execution, directly replay Memory and OutputEvents.ActionState exists but
completed = false: Action execution was interrupted (failover), re-execute Action. During execution, if an existingCallRecordis encountered, validate and return the cached result, skipping actual execution.ActionState doesn't exist: First execution, normal flow.
Write and Recovery Matching
Write: When a call completes (success or failure), record the corresponding
CallRecordand persistresultPayloadorexceptionPayload.Hit: During recovery, query the corresponding
ActionStateby(key, sequenceNumber, actionName), and get theCallRecordby the current call's order position (the Nth submission corresponds to the Nth record); after a hit, first validate thatfunctionIdandargsDigestmatch:Validation fails: Determined as non-deterministic call order, clear this
CallRecordand subsequent records, output WARN log, then re-executeValidation passes: Check if
exceptionPayloadis nullexceptionPayload == null: Directly return the decodedresultPayloadexceptionPayload != null: Decode and throw the equivalent exceptionMiss: Execute normally and add a new record upon completion.
Error handling: Deserialization failures or content inconsistencies should fail fast, outputting diagnostic information including the call sequence number and validation digest.
Failure Semantics
Decoding failures
resultPayload/exceptionPayloadfails, fail fast immediately, outputting information including(key, sequenceNumber, actionName)and the call sequence number for diagnosis.At-least-once semantics
The current design tries to avoid duplicate side effects but cannot guarantee it. For example, if a code block completes execution but the CallRecord hasn't been persisted to ActionStateStore when a failover occurs, that code block will be re-executed, potentially causing duplicate side effects (such as duplicate LLM calls, additional costs, etc.).
For operations with side effects, users should ensure operations are idempotent or can tolerate occasional duplicate execution.
Framework provides unique Call ID: Automatically generate a unique identifier for each
execute/execute_asynccall (based onkey + sequenceNumber + actionName + callIndex), which users can pass to external systems as an idempotency key. Example:Future Work
Framework-Level Retry Support
The current design doesn't provide code block level automatic retry; users need to implement retry logic in the function body themselves. Future considerations include supporting user-specified retry policies at the framework level:
Implementation needs to consider the following issues:
Diversity of retry policies: Different scenarios require different retry policies (retry count, backoff strategy, retry conditions, etc.), requiring a flexible
RetryPolicyinterface design.Compatibility with existing semantics: After a successful retry, only the final successful result is stored; after all retries fail, the last exception is stored. During recovery, the final result is returned directly or the exception is thrown, maintaining consistency with current behavior.
Exactly-once Semantics Support
The current design is based on at-least-once semantics, relying on users to implement idempotency to avoid side effects from duplicate execution. Future work may consider extending the capabilities of
execute_asyncto support true exactly-once semantics.The core idea is:
execute_asyncwould no longer be limited to receiving a simpleCallable, but could accept objects implementing specific protocols, choosing the appropriate commit strategy based on the external system's transactional capabilities:Beta Was this translation helpful? Give feedback.
All reactions