Skip to content

[FLINK-31942][Connectors/DynamoDB] Added support for conditional writ…#240

Open
losmotylos wants to merge 2 commits intoapache:mainfrom
losmotylos:FLINK-31942
Open

[FLINK-31942][Connectors/DynamoDB] Added support for conditional writ…#240
losmotylos wants to merge 2 commits intoapache:mainfrom
losmotylos:FLINK-31942

Conversation

@losmotylos
Copy link
Copy Markdown

Purpose of the change

Implements support for conditional writes in the DynamoDB connector: FLINK-31942

The DynamoDB sink currently only uses the BatchWriteItem API, which does not support conditional writes or update expressions. This change adds support for individual PutItem, UpdateItem, and DeleteItem API calls alongside the existing batch path, enabling:

  • Conditional writes via conditionExpression on PUT, DELETE, and UPDATE requests
  • Update expressions via updateExpression for partial item updates
  • Concurrent execution of batch and single-item paths for mixed workloads

Requests without conditions (PUT/DELETE) continue to use BatchWriteItem for efficiency. Requests with conditions and all UPDATE requests are sent as individual API calls. Both paths execute concurrently with results collected into shared thread-safe state and ResultHandler called exactly once after both paths complete.

Verifying this change

This change added tests and can be verified as follows:

Unit tests (DynamoDbSinkWriterTest):

  • Batch-only, single-only, and mixed batch+single request paths
  • Retryable and non-retryable error scenarios for both paths
  • ConditionalCheckFailedException on the single-request path verified as non-retryable
  • Plain PUT and DELETE without conditions verified to route through BatchWriteItem
  • Mixed batch+single with concurrent fatal/retryable errors

Unit tests (DynamoDbBeanElementConverterTest):

  • Conditional PUT with condition expression and expression attribute names
  • DELETE extracts only primary key attributes from POJO
  • UPDATE extracts primary key and sets update expression
  • Builder validation: updateExpression required for UPDATE, rejected for PUT/DELETE

Unit tests (DynamoDbWriteRequestTest):

  • Builder validation: UPDATE requires updateExpression
  • Builder constructs requests with all expression fields
  • PUT with conditionExpression and null updateExpression

Serialization tests (DynamoDbSerializationUtilTest, DynamoDbWriterStateSerializerTest):

  • Roundtrip serialization of UPDATE requests with all fields populated
  • Roundtrip serialization of conditional PUT requests
  • All-fields-populated roundtrip (updateExpression + conditionExpression + expressionAttributeNames + expressionAttributeValues)
  • Backward compatibility: version 1 checkpoint bytes deserialized correctly by version 2 serializer

Integration tests (DynamoDbSinkConditionalWriteITCase):

  • Batch PUT via DynamoDbBeanElementConverter against DynamoDB Local
  • UPDATE via builder with DynamoDbWriteRequestType.UPDATE
  • Conditional PUT preventing overwrite (attribute_not_exists) — verifies ConditionalCheckFailedException fails the job and original data is unchanged
  • Conditional DELETE only deleting items matching condition

Significant changes

Public API changes

  • DynamoDbWriteRequestType: added UPDATE enum value ((byte) 2)
  • DynamoDbWriteRequest: added optional fields updateExpression, conditionExpression, expressionAttributeNames, expressionAttributeValues with corresponding builder methods. Existing constructors and builder usage unchanged.
  • DynamoDbBeanElementConverter: added builder(Class) static method and ElementConverterBuilder for configuring type, update/condition expressions. Existing public constructors (Class) and (Class, boolean) unchanged. Multi-arg constructors made private — new features available only via builder.
  • DynamoDbSink Javadoc updated to document the three write request types and batch vs single execution model.

Serializer changes

  • DynamoDbWriterStateSerializer version bumped from 1 to 2
  • DynamoDbSerializationUtil serializes/deserializes the four new expression fields using boolean presence flags for nullable values
  • Version 1 checkpoints (without expression fields) deserialize correctly with the version 2 serializer — full backward compatibility verified by test
  • Version is passed from DynamoDbWriterStateSerializer.deserialize(int version, byte[]) to DynamoDbSerializationUtil.deserializeWriteRequest(in, version) via a ThreadLocal field, since the parent class AsyncSinkWriterStateSerializer does not forward the version to deserializeRequestFromStream(). This is a workaround — see follow-up discussion about adding version support to the base class.

Checklist

  • Dependencies have been added or upgraded
  • Public API has been changed (any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • Documented via Javadoc: DynamoDbSink class-level Javadoc describes the three write request types (PUT/DELETE/UPDATE) and batch vs single execution model. DynamoDbBeanElementConverter Javadoc documents the supported types and builder usage. DynamoDbWriteRequest Javadoc explains the semantics of item field per request type.

@boring-cyborg
Copy link
Copy Markdown

boring-cyborg bot commented Mar 28, 2026

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants