[FLINK-31942][Connectors/DynamoDB] Added support for conditional writ…#240
Open
losmotylos wants to merge 2 commits intoapache:mainfrom
Open
[FLINK-31942][Connectors/DynamoDB] Added support for conditional writ…#240losmotylos wants to merge 2 commits intoapache:mainfrom
losmotylos wants to merge 2 commits intoapache:mainfrom
Conversation
…es in DynamoDB connector
|
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
…es in DynamoDB connector
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose of the change
Implements support for conditional writes in the DynamoDB connector: FLINK-31942
The DynamoDB sink currently only uses the BatchWriteItem API, which does not support conditional writes or update expressions. This change adds support for individual PutItem, UpdateItem, and DeleteItem API calls alongside the existing batch path, enabling:
conditionExpressionon PUT, DELETE, and UPDATE requestsupdateExpressionfor partial item updatesRequests without conditions (PUT/DELETE) continue to use
BatchWriteItemfor efficiency. Requests with conditions and all UPDATE requests are sent as individual API calls. Both paths execute concurrently with results collected into shared thread-safe state andResultHandlercalled exactly once after both paths complete.Verifying this change
This change added tests and can be verified as follows:
Unit tests (
DynamoDbSinkWriterTest):ConditionalCheckFailedExceptionon the single-request path verified as non-retryableBatchWriteItemUnit tests (
DynamoDbBeanElementConverterTest):updateExpressionrequired for UPDATE, rejected for PUT/DELETEUnit tests (
DynamoDbWriteRequestTest):updateExpressionconditionExpressionand nullupdateExpressionSerialization tests (
DynamoDbSerializationUtilTest,DynamoDbWriterStateSerializerTest):updateExpression+conditionExpression+expressionAttributeNames+expressionAttributeValues)Integration tests (
DynamoDbSinkConditionalWriteITCase):DynamoDbBeanElementConverteragainst DynamoDB LocalDynamoDbWriteRequestType.UPDATEattribute_not_exists) — verifiesConditionalCheckFailedExceptionfails the job and original data is unchangedSignificant changes
Public API changes
DynamoDbWriteRequestType: addedUPDATEenum value ((byte) 2)DynamoDbWriteRequest: added optional fieldsupdateExpression,conditionExpression,expressionAttributeNames,expressionAttributeValueswith corresponding builder methods. Existing constructors and builder usage unchanged.DynamoDbBeanElementConverter: addedbuilder(Class)static method andElementConverterBuilderfor configuring type, update/condition expressions. Existing public constructors(Class)and(Class, boolean)unchanged. Multi-arg constructors made private — new features available only via builder.DynamoDbSinkJavadoc updated to document the three write request types and batch vs single execution model.Serializer changes
DynamoDbWriterStateSerializerversion bumped from 1 to 2DynamoDbSerializationUtilserializes/deserializes the four new expression fields using boolean presence flags for nullable valuesDynamoDbWriterStateSerializer.deserialize(int version, byte[])toDynamoDbSerializationUtil.deserializeWriteRequest(in, version)via aThreadLocalfield, since the parent classAsyncSinkWriterStateSerializerdoes not forward the version todeserializeRequestFromStream(). This is a workaround — see follow-up discussion about adding version support to the base class.Checklist
@Public(Evolving))DynamoDbSinkclass-level Javadoc describes the three write request types (PUT/DELETE/UPDATE) and batch vs single execution model.DynamoDbBeanElementConverterJavadoc documents the supported types and builder usage.DynamoDbWriteRequestJavadoc explains the semantics ofitemfield per request type.