diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java index bbe63644..4f5d59c1 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java @@ -25,11 +25,27 @@ import software.amazon.awssdk.enhanced.dynamodb.mapper.BeanTableSchema; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.util.Collection; +import java.util.Map; /** * A generic {@link ElementConverter} that uses the dynamodb-enhanced client to build a {@link * DynamoDbWriteRequest} from a POJO annotated with {@link DynamoDbBean}. * + *

Supports all three write request types: + * + *

+ * + *

Condition expressions can be set for any type to enable conditional writes via individual API + * calls instead of {@code BatchWriteItem}. + * * @param The type of the {@link DynamoDbBean} to convert into {@link DynamoDbWriteRequest} */ @PublicEvolving @@ -40,15 +56,43 @@ public class DynamoDbBeanElementConverter private final Class recordType; private final boolean ignoreNulls; + private final DynamoDbWriteRequestType type; + private final String updateExpression; + private final String conditionExpression; + private final Map expressionAttributeNames; + private final Map expressionAttributeValues; private transient BeanTableSchema tableSchema; + private transient Collection keyAttributeNames; public DynamoDbBeanElementConverter(final Class recordType) { this(recordType, false); } public DynamoDbBeanElementConverter(final Class recordType, final boolean ignoreNulls) { + this(recordType, ignoreNulls, DynamoDbWriteRequestType.PUT, null, null, null, null); + } + + private DynamoDbBeanElementConverter( + final Class recordType, + final boolean ignoreNulls, + final DynamoDbWriteRequestType type, + final String updateExpression, + final String conditionExpression, + final Map expressionAttributeNames, + final Map expressionAttributeValues) { this.recordType = recordType; this.ignoreNulls = ignoreNulls; + this.type = Preconditions.checkNotNull(type, "Type must not be null"); + this.updateExpression = updateExpression; + this.conditionExpression = conditionExpression; + this.expressionAttributeNames = expressionAttributeNames; + this.expressionAttributeValues = expressionAttributeValues; + + if (type == DynamoDbWriteRequestType.UPDATE) { + Preconditions.checkNotNull( + updateExpression, + "Update expression must not be null for UPDATE requests."); + } // Attempt to create a table schema now to bubble up errors before starting job createTableSchema(recordType); @@ -57,18 +101,106 @@ public DynamoDbBeanElementConverter(final Class recordType, final boolea @Override public DynamoDbWriteRequest apply(InputT element, SinkWriter.Context context) { Preconditions.checkNotNull(tableSchema, "Table schema has not been initialized"); - return new DynamoDbWriteRequest.Builder() - .setType(DynamoDbWriteRequestType.PUT) - .setItem(tableSchema.itemToMap(element, ignoreNulls)) - .build(); + Map item = + type == DynamoDbWriteRequestType.PUT + ? tableSchema.itemToMap(element, ignoreNulls) + : tableSchema.itemToMap(element, keyAttributeNames); + DynamoDbWriteRequest.Builder builder = + new DynamoDbWriteRequest.Builder().setType(type).setItem(item); + if (updateExpression != null) { + builder.setUpdateExpression(updateExpression); + } + if (conditionExpression != null) { + builder.setConditionExpression(conditionExpression); + } + if (expressionAttributeNames != null) { + builder.setExpressionAttributeNames(expressionAttributeNames); + } + if (expressionAttributeValues != null) { + builder.setExpressionAttributeValues(expressionAttributeValues); + } + return builder.build(); } @Override public void open(WriterInitContext context) { tableSchema = createTableSchema(recordType); + // DELETE and UPDATE only need the primary key attributes from the POJO, not the full item. + // PUT uses the complete item map. + if (type == DynamoDbWriteRequestType.DELETE || type == DynamoDbWriteRequestType.UPDATE) { + keyAttributeNames = tableSchema.tableMetadata().primaryKeys(); + } } private BeanTableSchema createTableSchema(final Class recordType) { return BeanTableSchema.create(recordType); } + + /** Builder for {@link DynamoDbBeanElementConverter}. */ + public static class ElementConverterBuilder { + private final Class recordType; + private boolean ignoreNulls = false; + private DynamoDbWriteRequestType type = DynamoDbWriteRequestType.PUT; + private String updateExpression; + private String conditionExpression; + private Map expressionAttributeNames; + private Map expressionAttributeValues; + + private ElementConverterBuilder(Class recordType) { + this.recordType = recordType; + } + + public ElementConverterBuilder setIgnoreNulls(boolean ignoreNulls) { + this.ignoreNulls = ignoreNulls; + return this; + } + + public ElementConverterBuilder setType(DynamoDbWriteRequestType type) { + this.type = type; + return this; + } + + public ElementConverterBuilder setUpdateExpression(String updateExpression) { + this.updateExpression = updateExpression; + return this; + } + + public ElementConverterBuilder setConditionExpression(String conditionExpression) { + this.conditionExpression = conditionExpression; + return this; + } + + public ElementConverterBuilder setExpressionAttributeNames( + Map expressionAttributeNames) { + this.expressionAttributeNames = expressionAttributeNames; + return this; + } + + public ElementConverterBuilder setExpressionAttributeValues( + Map expressionAttributeValues) { + this.expressionAttributeValues = expressionAttributeValues; + return this; + } + + public DynamoDbBeanElementConverter build() { + Preconditions.checkArgument( + type != DynamoDbWriteRequestType.UPDATE || updateExpression != null, + "updateExpression is required for UPDATE type."); + Preconditions.checkArgument( + type == DynamoDbWriteRequestType.UPDATE || updateExpression == null, + "updateExpression is only allowed for UPDATE type."); + return new DynamoDbBeanElementConverter<>( + recordType, + ignoreNulls, + type, + updateExpression, + conditionExpression, + expressionAttributeNames, + expressionAttributeValues); + } + } + + public static ElementConverterBuilder builder(Class recordType) { + return new ElementConverterBuilder<>(recordType); + } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java index 2d3e6f03..9044c9a6 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java @@ -75,6 +75,22 @@ * single batch takes precedence. * * + *

The sink supports three write request types via {@link DynamoDbWriteRequestType}: + * + *

+ * + *

Requests without condition expressions (PUT/DELETE) are sent via {@code BatchWriteItem} for + * efficiency. Requests with condition expressions and all UPDATE requests are sent as individual + * {@code PutItem}, {@code DeleteItem}, or {@code UpdateItem} calls. Both paths execute + * concurrently. + * *

Please see the writer implementation in {@link DynamoDbSinkWriter} * * @param Type of the elements handled by this sink diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java index 66e269e0..97008bda 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java @@ -32,6 +32,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.concurrent.FutureUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,10 +40,13 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest; import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse; import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; import software.amazon.awssdk.services.dynamodb.model.DeleteRequest; import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.PutRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.services.dynamodb.model.WriteRequest; import java.util.ArrayList; @@ -51,6 +55,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonMap; import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; @@ -164,6 +170,59 @@ protected void submitRequestEntries( List requestEntries, ResultHandler resultHandler) { + List batchRequests = new ArrayList<>(); + List singleRequests = new ArrayList<>(); + + for (DynamoDbWriteRequest request : requestEntries) { + if (isSingleRequest(request)) { + singleRequests.add(request); + } else { + batchRequests.add(request); + } + } + + // Shared state to collect results from concurrent batch and single paths. + // ResultHandler is called exactly once in whenComplete after both paths finish. + ConcurrentLinkedQueue retryableFailures = + new ConcurrentLinkedQueue<>(); + AtomicReference fatalException = new AtomicReference<>(); + + CompletableFuture batchFuture = + submitBatchRequests( + batchRequests, retryableFailures, fatalException); + + CompletableFuture singleFuture = + submitSingleRequests( + singleRequests, retryableFailures, fatalException); + + CompletableFuture.allOf(batchFuture, singleFuture) + .whenComplete( + (ignored, err) -> { + if (fatalException.get() != null) { + resultHandler.completeExceptionally(fatalException.get()); + } else if (!retryableFailures.isEmpty()) { + resultHandler.retryForEntries( + new ArrayList<>(retryableFailures)); + } else { + resultHandler.complete(); + } + }); + } + + private boolean isSingleRequest(DynamoDbWriteRequest request) { + return request.getType() == DynamoDbWriteRequestType.UPDATE + || request.getConditionExpression() != null; + } + + private CompletableFuture submitBatchRequests( + List requestEntries, + ConcurrentLinkedQueue retryableFailures, + AtomicReference fatalException) { + + if (requestEntries.isEmpty()) { + return FutureUtils.completedVoidFuture(); + } + List items = new ArrayList<>(); if (CollectionUtil.isNullOrEmpty(overwriteByPartitionKeys)) { @@ -181,28 +240,32 @@ protected void submitRequestEntries( items.addAll(container.values()); } - CompletableFuture future = - clientProvider - .getClient() - .batchWriteItem( - BatchWriteItemRequest.builder() - .requestItems(singletonMap(tableName, items)) - .build()); - - future.whenComplete( - (response, err) -> { - if (err != null) { - handleFullyFailedRequest(err, requestEntries, resultHandler); - } else if (!CollectionUtil.isNullOrEmpty(response.unprocessedItems())) { - handlePartiallyUnprocessedRequest(response, resultHandler); - } else { - resultHandler.complete(); - } - }); + return clientProvider + .getClient() + .batchWriteItem( + BatchWriteItemRequest.builder() + .requestItems(singletonMap(tableName, items)) + .build()) + .handle( + (response, err) -> { + if (err != null) { + handleFullyFailedRequest( + err, + requestEntries, + retryableFailures, + fatalException); + } else if (!CollectionUtil.isNullOrEmpty( + response.unprocessedItems())) { + handlePartiallyUnprocessedRequest( + response, retryableFailures); + } + return null; + }); } private void handlePartiallyUnprocessedRequest( - BatchWriteItemResponse response, ResultHandler resultHandler) { + BatchWriteItemResponse response, + ConcurrentLinkedQueue retryableFailures) { List unprocessed = new ArrayList<>(); for (WriteRequest writeRequest : response.unprocessedItems().get(tableName)) { @@ -213,39 +276,97 @@ private void handlePartiallyUnprocessedRequest( numRecordsSendErrorsCounter.inc(unprocessed.size()); numRecordsSendPartialFailure.inc(unprocessed.size()); - resultHandler.retryForEntries(unprocessed); + retryableFailures.addAll(unprocessed); } private void handleFullyFailedRequest( Throwable err, List requestEntries, - ResultHandler resultHandler) { + ConcurrentLinkedQueue retryableFailures, + AtomicReference fatalException) { LOG.warn( "DynamoDB Sink failed to persist and will retry {} entries.", requestEntries.size(), err); numRecordsSendErrorsCounter.inc(requestEntries.size()); - if (isRetryable(err.getCause(), resultHandler)) { - resultHandler.retryForEntries(requestEntries); + if (isRetryable(err.getCause(), fatalException)) { + retryableFailures.addAll(requestEntries); } } - private boolean isRetryable(Throwable err, ResultHandler resultHandler) { + private boolean isRetryable(Throwable err, AtomicReference fatalException) { // isFatal() is really isNotFatal() if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal( - err, resultHandler::completeExceptionally)) { + err, ex -> fatalException.compareAndSet(null, ex))) { return false; } if (failOnError) { - resultHandler.completeExceptionally( - new DynamoDbSinkException.DynamoDbSinkFailFastException(err)); + fatalException.compareAndSet( + null, new DynamoDbSinkException.DynamoDbSinkFailFastException(err)); return false; } return true; } + private CompletableFuture submitSingleRequests( + List singleRequests, + ConcurrentLinkedQueue retryableFailures, + AtomicReference fatalException) { + + if (singleRequests.isEmpty()) { + return FutureUtils.completedVoidFuture(); + } + + CompletableFuture[] futures = + singleRequests.stream() + .map( + request -> + submitSingleRequest(request) + .handle( + (response, err) -> { + if (err != null) { + handleSingleRequestError( + err, + request, + retryableFailures, + fatalException); + } + return null; + })) + .toArray(CompletableFuture[]::new); + + return CompletableFuture.allOf(futures); + } + + private CompletableFuture submitSingleRequest(DynamoDbWriteRequest request) { + switch (request.getType()) { + case PUT: + return clientProvider.getClient().putItem(convertToPutItemRequest(request)); + case DELETE: + return clientProvider.getClient().deleteItem(convertToDeleteItemRequest(request)); + case UPDATE: + return clientProvider.getClient().updateItem(convertToUpdateItemRequest(request)); + default: + throw new IllegalArgumentException( + "Unsupported DynamoDb Write Request Type: " + request.getType()); + } + } + + private void handleSingleRequestError( + Throwable err, + DynamoDbWriteRequest request, + ConcurrentLinkedQueue retryableFailures, + AtomicReference fatalException) { + LOG.warn("DynamoDB Sink single write failed for {} request.", request.getType(), err); + numRecordsSendErrorsCounter.inc(); + + if (isRetryable(err.getCause(), fatalException)) { + retryableFailures.add(request); + } + } + @Override protected long getSizeInBytes(DynamoDbWriteRequest requestEntry) { // dynamodb calculates item size as a sum of all attributes and all values, to calculate it @@ -291,4 +412,52 @@ private DynamoDbWriteRequest convertToDynamoDbWriteRequest(WriteRequest writeReq "Unsupported Write Request, consider updating the convertToDynamoDbWriteRequest method"); } } + + private PutItemRequest convertToPutItemRequest(DynamoDbWriteRequest request) { + PutItemRequest.Builder builder = + PutItemRequest.builder().tableName(tableName).item(request.getItem()); + if (request.getConditionExpression() != null) { + builder.conditionExpression(request.getConditionExpression()); + } + if (request.getExpressionAttributeNames() != null) { + builder.expressionAttributeNames(request.getExpressionAttributeNames()); + } + if (request.getExpressionAttributeValues() != null) { + builder.expressionAttributeValues(request.getExpressionAttributeValues()); + } + return builder.build(); + } + + private DeleteItemRequest convertToDeleteItemRequest(DynamoDbWriteRequest request) { + DeleteItemRequest.Builder builder = + DeleteItemRequest.builder().tableName(tableName).key(request.getItem()); + if (request.getConditionExpression() != null) { + builder.conditionExpression(request.getConditionExpression()); + } + if (request.getExpressionAttributeNames() != null) { + builder.expressionAttributeNames(request.getExpressionAttributeNames()); + } + if (request.getExpressionAttributeValues() != null) { + builder.expressionAttributeValues(request.getExpressionAttributeValues()); + } + return builder.build(); + } + + private UpdateItemRequest convertToUpdateItemRequest(DynamoDbWriteRequest request) { + UpdateItemRequest.Builder builder = + UpdateItemRequest.builder() + .tableName(tableName) + .key(request.getItem()) + .updateExpression(request.getUpdateExpression()); + if (request.getConditionExpression() != null) { + builder.conditionExpression(request.getConditionExpression()); + } + if (request.getExpressionAttributeNames() != null) { + builder.expressionAttributeNames(request.getExpressionAttributeNames()); + } + if (request.getExpressionAttributeValues() != null) { + builder.expressionAttributeValues(request.getExpressionAttributeValues()); + } + return builder.build(); + } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java index c8e3c886..aa181f60 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java @@ -28,7 +28,11 @@ /** * Represents a single Write Request to DynamoDb. Contains the item to be written as well as the - * type of the Write Request (PUT/DELETE) + * type of the Write Request (PUT/DELETE/UPDATE). + * + *

For PUT requests, {@code item} contains the full item attributes. For DELETE requests, {@code + * item} contains the primary key attributes. For UPDATE requests, {@code item} contains the primary + * key attributes and the update expression fields must be set. */ @PublicEvolving public class DynamoDbWriteRequest implements Serializable { @@ -37,10 +41,24 @@ public class DynamoDbWriteRequest implements Serializable { private final Map item; private final DynamoDbWriteRequestType type; - - private DynamoDbWriteRequest(Map item, DynamoDbWriteRequestType type) { + private final String updateExpression; + private final Map expressionAttributeNames; + private final Map expressionAttributeValues; + private final String conditionExpression; + + private DynamoDbWriteRequest( + Map item, + DynamoDbWriteRequestType type, + String updateExpression, + Map expressionAttributeNames, + Map expressionAttributeValues, + String conditionExpression) { this.item = item; this.type = type; + this.updateExpression = updateExpression; + this.expressionAttributeNames = expressionAttributeNames; + this.expressionAttributeValues = expressionAttributeValues; + this.conditionExpression = conditionExpression; } public Map getItem() { @@ -51,19 +69,54 @@ public DynamoDbWriteRequestType getType() { return type; } + public String getUpdateExpression() { + return updateExpression; + } + + public Map getExpressionAttributeNames() { + return expressionAttributeNames; + } + + public Map getExpressionAttributeValues() { + return expressionAttributeValues; + } + + public String getConditionExpression() { + return conditionExpression; + } + public static Builder builder() { return new Builder(); } @Override public String toString() { - return "DynamoDbWriteRequest{" + "item=" + item + ", type=" + type + '}'; + return "DynamoDbWriteRequest{" + + "item=" + + item + + ", type=" + + type + + ", updateExpression='" + + updateExpression + + '\'' + + ", expressionAttributeNames=" + + expressionAttributeNames + + ", expressionAttributeValues=" + + expressionAttributeValues + + ", conditionExpression='" + + conditionExpression + + '\'' + + '}'; } /** Builder for DynamoDbWriteRequest. */ public static class Builder { private Map item; private DynamoDbWriteRequestType type; + private String updateExpression; + private Map expressionAttributeNames; + private Map expressionAttributeValues; + private String conditionExpression; public Builder setItem(Map item) { this.item = item; @@ -75,12 +128,45 @@ public Builder setType(DynamoDbWriteRequestType type) { return this; } + public Builder setUpdateExpression(String updateExpression) { + this.updateExpression = updateExpression; + return this; + } + + public Builder setExpressionAttributeNames( + Map expressionAttributeNames) { + this.expressionAttributeNames = expressionAttributeNames; + return this; + } + + public Builder setExpressionAttributeValues( + Map expressionAttributeValues) { + this.expressionAttributeValues = expressionAttributeValues; + return this; + } + + public Builder setConditionExpression(String conditionExpression) { + this.conditionExpression = conditionExpression; + return this; + } + public DynamoDbWriteRequest build() { Preconditions.checkNotNull( - item, "No Item was supplied to the " + "DynamoDbWriteRequest builder."); + item, "No Item was supplied to the DynamoDbWriteRequest builder."); Preconditions.checkNotNull( - type, "No type was supplied to the " + "DynamoDbWriteRequest builder."); - return new DynamoDbWriteRequest(item, type); + type, "No type was supplied to the DynamoDbWriteRequest builder."); + if (type == DynamoDbWriteRequestType.UPDATE) { + Preconditions.checkNotNull( + updateExpression, + "No updateExpression was supplied for UPDATE DynamoDbWriteRequest."); + } + return new DynamoDbWriteRequest( + item, + type, + updateExpression, + expressionAttributeNames, + expressionAttributeValues, + conditionExpression); } } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestType.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestType.java index 7a69e436..f0b783e2 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestType.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestType.java @@ -26,6 +26,7 @@ *

*/ @PublicEvolving @@ -34,7 +35,8 @@ public enum DynamoDbWriteRequestType { // Note: Enums have no stable hash code across different JVMs, use toByteValue() for // this purpose. PUT((byte) 0), - DELETE((byte) 1); + DELETE((byte) 1), + UPDATE((byte) 2); private final byte value; DynamoDbWriteRequestType(byte value) { @@ -48,6 +50,7 @@ public enum DynamoDbWriteRequestType { * */ public byte toByteValue() { @@ -66,6 +69,8 @@ public static DynamoDbWriteRequestType fromByteValue(byte value) { return PUT; case 1: return DELETE; + case 2: + return UPDATE; default: throw new UnsupportedOperationException( "Unsupported byte value '" + value + "' for DynamoDb request type."); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializer.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializer.java index d06155cc..ba56673e 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializer.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializer.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.dynamodb.util.DynamoDbSerializationUtil; import java.io.DataInputStream; @@ -31,6 +32,12 @@ public class DynamoDbWriterStateSerializer extends AsyncSinkWriterStateSerializer { + // The parent class does not pass the version to deserializeRequestFromStream(), + // so we stash it here before calling super.deserialize() and read it back in + // deserializeRequestFromStream(). + private final ThreadLocal deserializingVersion = + ThreadLocal.withInitial(this::getVersion); + /** * Serializes {@link DynamoDbWriteRequest} in form of * [TABLE_NAME,WRITE_REQUEST_TYPE(PUT/DELETE),WRITE_REQUEST]. @@ -44,11 +51,22 @@ protected void serializeRequestToStream(DynamoDbWriteRequest request, DataOutput @Override protected DynamoDbWriteRequest deserializeRequestFromStream( long requestSize, DataInputStream in) throws IOException { - return DynamoDbSerializationUtil.deserializeWriteRequest(in); + return DynamoDbSerializationUtil.deserializeWriteRequest(in, deserializingVersion.get()); + } + + @Override + public BufferedRequestState deserialize(int version, byte[] serialized) + throws IOException { + deserializingVersion.set(version); + try { + return super.deserialize(version, serialized); + } finally { + deserializingVersion.set(getVersion()); + } } @Override public int getVersion() { - return 1; + return 2; } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtil.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtil.java index 0e4a8eb9..0b6dd018 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtil.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtil.java @@ -43,20 +43,112 @@ public class DynamoDbSerializationUtil { public static void serializeWriteRequest( DynamoDbWriteRequest dynamoDbWriteRequest, DataOutputStream out) throws IOException { out.writeByte(dynamoDbWriteRequest.getType().toByteValue()); - Map item = dynamoDbWriteRequest.getItem(); - serializeItem(item, out); + serializeItem(dynamoDbWriteRequest.getItem(), out); + serializeNullableString(dynamoDbWriteRequest.getUpdateExpression(), out); + serializeNullableString(dynamoDbWriteRequest.getConditionExpression(), out); + serializeNullableStringMap(dynamoDbWriteRequest.getExpressionAttributeNames(), out); + serializeNullableAttributeValueMap( + dynamoDbWriteRequest.getExpressionAttributeValues(), out); } public static DynamoDbWriteRequest deserializeWriteRequest(DataInputStream in) throws IOException { + return deserializeWriteRequest(in, 2); + } + + public static DynamoDbWriteRequest deserializeWriteRequest(DataInputStream in, int version) + throws IOException { int writeRequestType = in.read(); DynamoDbWriteRequestType dynamoDbWriteRequestType = DynamoDbWriteRequestType.fromByteValue((byte) writeRequestType); Map item = deserializeItem(in); - return DynamoDbWriteRequest.builder() - .setType(dynamoDbWriteRequestType) - .setItem(item) - .build(); + + DynamoDbWriteRequest.Builder builder = + DynamoDbWriteRequest.builder() + .setType(dynamoDbWriteRequestType) + .setItem(item); + + // Version 2 added expression fields for conditional and update writes + if (version >= 2) { + String updateExpression = deserializeNullableString(in); + String conditionExpression = deserializeNullableString(in); + Map expressionAttributeNames = deserializeNullableStringMap(in); + Map expressionAttributeValues = + deserializeNullableAttributeValueMap(in); + if (updateExpression != null) { + builder.setUpdateExpression(updateExpression); + } + if (conditionExpression != null) { + builder.setConditionExpression(conditionExpression); + } + if (expressionAttributeNames != null) { + builder.setExpressionAttributeNames(expressionAttributeNames); + } + if (expressionAttributeValues != null) { + builder.setExpressionAttributeValues(expressionAttributeValues); + } + } + + return builder.build(); + } + + private static void serializeNullableString(String value, DataOutputStream out) + throws IOException { + if (value == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(value); + } + } + + private static String deserializeNullableString(DataInputStream in) throws IOException { + boolean present = in.readBoolean(); + return present ? in.readUTF() : null; + } + + private static void serializeNullableStringMap(Map map, DataOutputStream out) + throws IOException { + if (map == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeInt(map.size()); + for (Map.Entry entry : map.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } + } + } + + private static Map deserializeNullableStringMap(DataInputStream in) + throws IOException { + boolean present = in.readBoolean(); + if (!present) { + return null; + } + int size = in.readInt(); + Map map = new HashMap<>(size); + for (int i = 0; i < size; i++) { + map.put(in.readUTF(), in.readUTF()); + } + return map; + } + + private static void serializeNullableAttributeValueMap( + Map map, DataOutputStream out) throws IOException { + if (map == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + serializeItem(map, out); + } + } + + private static Map deserializeNullableAttributeValueMap( + DataInputStream in) throws IOException { + boolean present = in.readBoolean(); + return present ? deserializeItem(in) : null; } private static void serializeItem(Map item, DataOutputStream out) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java index 13a733f7..afa201e3 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java @@ -19,11 +19,17 @@ package org.apache.flink.connector.dynamodb.sink; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.dynamodb.testutils.TestItem; import org.apache.flink.connector.dynamodb.util.Order; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import java.util.Collections; + +import static org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.DELETE; import static org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.PUT; +import static org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.UPDATE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -88,4 +94,87 @@ void testConvertWithClosedConvertedThrowsException() { .isThrownBy(() -> elementConverter.apply(order, null)) .withMessageContaining("Table schema has not been initialized"); } + + @Test + void testConditionalPutSetsConditionExpression() { + ElementConverter elementConverter = + DynamoDbBeanElementConverter.builder(TestItem.class) + .setType(PUT) + .setConditionExpression("attribute_not_exists(pk)") + .setExpressionAttributeNames(Collections.singletonMap("#pk", "pk")) + .build(); + elementConverter.open(null); + TestItem item = new TestItem("1", "a", "data", "0"); + + DynamoDbWriteRequest actual = elementConverter.apply(item, null); + + assertThat(actual.getType()).isEqualTo(PUT); + assertThat(actual.getConditionExpression()).isEqualTo("attribute_not_exists(pk)"); + assertThat(actual.getExpressionAttributeNames()).containsEntry("#pk", "pk"); + assertThat(actual.getItem()).containsKeys("pk", "sk", "payload", "counter"); + } + + @Test + void testDeleteExtractsOnlyKeyAttributes() { + ElementConverter elementConverter = + DynamoDbBeanElementConverter.builder(TestItem.class) + .setType(DELETE) + .build(); + elementConverter.open(null); + TestItem item = new TestItem("1", "a", "data", "0"); + + DynamoDbWriteRequest actual = elementConverter.apply(item, null); + + assertThat(actual.getType()).isEqualTo(DELETE); + assertThat(actual.getItem()).containsOnlyKeys("pk", "sk"); + assertThat(actual.getItem().get("pk").s()).isEqualTo("1"); + assertThat(actual.getItem().get("sk").s()).isEqualTo("a"); + } + + @Test + void testUpdateExtractsKeyAndSetsUpdateExpression() { + ElementConverter elementConverter = + DynamoDbBeanElementConverter.builder(TestItem.class) + .setType(UPDATE) + .setUpdateExpression("SET #c = :val") + .setExpressionAttributeNames(Collections.singletonMap("#c", "counter")) + .setExpressionAttributeValues( + Collections.singletonMap( + ":val", AttributeValue.builder().s("42").build())) + .build(); + elementConverter.open(null); + TestItem item = new TestItem("1", "a", "data", "0"); + + DynamoDbWriteRequest actual = elementConverter.apply(item, null); + + assertThat(actual.getType()).isEqualTo(UPDATE); + assertThat(actual.getUpdateExpression()).isEqualTo("SET #c = :val"); + assertThat(actual.getItem()).containsOnlyKeys("pk", "sk"); + assertThat(actual.getExpressionAttributeNames()).containsEntry("#c", "counter"); + assertThat(actual.getExpressionAttributeValues()) + .containsEntry(":val", AttributeValue.builder().s("42").build()); + } + + @Test + void testUpdateWithoutUpdateExpressionThrows() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> + DynamoDbBeanElementConverter.builder(TestItem.class) + .setType(UPDATE) + .build()) + .withMessageContaining("updateExpression is required for UPDATE type"); + } + + @Test + void testPutWithUpdateExpressionThrows() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> + DynamoDbBeanElementConverter.builder(TestItem.class) + .setType(PUT) + .setUpdateExpression("SET #a = :val") + .build()) + .withMessageContaining("updateExpression is only allowed for UPDATE type"); + } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkConditionalWriteITCase.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkConditionalWriteITCase.java new file mode 100644 index 00000000..c241d5a6 --- /dev/null +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkConditionalWriteITCase.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers; +import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer; +import org.apache.flink.connector.dynamodb.testutils.TestItem; +import org.apache.flink.connector.dynamodb.util.DockerImageVersions; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES; + +/** + * Integration test for batch and single writes in {@link DynamoDbSink} using {@link DynamoDbBeanElementConverter}. + */ +@Testcontainers +@ExtendWith(MiniClusterExtension.class) +public class DynamoDbSinkConditionalWriteITCase { + + private static final String PARTITION_KEY = "pk"; + private static final String SORT_KEY = "sk"; + private static DynamoDBHelpers dynamoDBHelpers; + private static String testTableName; + + @Container + public static final DynamoDbContainer LOCALSTACK = + new DynamoDbContainer(DockerImageName.parse(DockerImageVersions.DYNAMODB)) + .withNetwork(Network.newNetwork()) + .withNetworkAliases("dynamodb"); + + @BeforeEach + public void setup() throws URISyntaxException { + testTableName = "test_" + UUID.randomUUID().toString().replace("-", ""); + dynamoDBHelpers = new DynamoDBHelpers(LOCALSTACK.getHostClient()); + } + + @Test + public void testBatchPutWithBeanConverter() throws Exception { + dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY); + + List items = + Arrays.asList( + new TestItem("1", "a", "data1", "0"), + new TestItem("2", "b", "data2", "0")); + + StreamExecutionEnvironment env = createEnv(); + DataStream stream = env.fromCollection(items); + stream.sinkTo(buildSink(new DynamoDbBeanElementConverter<>(TestItem.class))); + env.execute("Batch PUT via BeanConverter"); + + Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName)).isEqualTo(2); + } + + @Test + public void testUpdateWithBeanUpdateConverter() throws Exception { + dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY); + + List items = + Arrays.asList( + new TestItem("1", "a", "data1", "0"), + new TestItem("2", "b", "data2", "0")); + + StreamExecutionEnvironment env = createEnv(); + DataStream putStream = env.fromCollection(items); + putStream.sinkTo(buildSink(new DynamoDbBeanElementConverter<>(TestItem.class))); + env.execute("Initial PUT"); + + Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName)).isEqualTo(2); + + Map expressionNames = Collections.singletonMap("#c", "counter"); + Map expressionValues = + Collections.singletonMap(":val", AttributeValue.builder().s("42").build()); + + env = createEnv(); + DataStream updateStream = + env.fromCollection( + Collections.singletonList(new TestItem("1", "a", null, null))); + updateStream.sinkTo( + buildSink( + DynamoDbBeanElementConverter.builder(TestItem.class) + .setType(DynamoDbWriteRequestType.UPDATE) + .setUpdateExpression("SET #c = :val") + .setExpressionAttributeNames(expressionNames) + .setExpressionAttributeValues(expressionValues) + .build())); + env.execute("UPDATE via BeanConverter"); + + Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName)).isEqualTo(2); + Assertions.assertThat( + dynamoDBHelpers.containsAttributeValue(testTableName, "counter", "42")) + .isTrue(); + } + + @Test + public void testConditionalPutWithBeanConverterPreventsOverwrite() throws Exception { + dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY); + + StreamExecutionEnvironment env = createEnv(); + DataStream putStream = + env.fromCollection( + Collections.singletonList(new TestItem("1", "a", "original", "0"))); + putStream.sinkTo(buildSink(new DynamoDbBeanElementConverter<>(TestItem.class))); + env.execute("Initial PUT"); + + Map exprNames = new HashMap<>(); + exprNames.put("#pk", PARTITION_KEY); + exprNames.put("#sk", SORT_KEY); + + StreamExecutionEnvironment conditionalEnv = createEnv(); + DataStream conditionalStream = + conditionalEnv.fromCollection( + Collections.singletonList(new TestItem("1", "a", "overwritten", "0"))); + conditionalStream.sinkTo( + buildSink( + DynamoDbBeanElementConverter.builder(TestItem.class) + .setType(DynamoDbWriteRequestType.PUT) + .setConditionExpression( + "attribute_not_exists(#pk) AND attribute_not_exists(#sk)") + .setExpressionAttributeNames(exprNames) + .build())); + + Assertions.assertThatExceptionOfType(JobExecutionException.class) + .isThrownBy(() -> conditionalEnv.execute("Conditional PUT should fail")) + .havingCause() + .havingCause() + .withMessageContaining("conditional check"); + + Assertions.assertThat( + dynamoDBHelpers.containsAttributeValue(testTableName, "payload", "original")) + .isTrue(); + } + + @Test + public void testConditionalDeleteWithBeanConverter() throws Exception { + dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY); + + List items = + Arrays.asList( + new TestItem("1", "a", "keep", "0"), + new TestItem("2", "b", "remove", "0")); + + StreamExecutionEnvironment env = createEnv(); + DataStream putStream = env.fromCollection(items); + putStream.sinkTo(buildSink(new DynamoDbBeanElementConverter<>(TestItem.class))); + env.execute("Insert items"); + + Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName)).isEqualTo(2); + + env = createEnv(); + DataStream deleteStream = + env.fromCollection( + Collections.singletonList(new TestItem("2", "b", null, null))); + deleteStream.sinkTo( + buildSink( + DynamoDbBeanElementConverter.builder(TestItem.class) + .setType(DynamoDbWriteRequestType.DELETE) + .setConditionExpression("#p = :val") + .setExpressionAttributeNames( + Collections.singletonMap("#p", "payload")) + .setExpressionAttributeValues( + Collections.singletonMap( + ":val", + AttributeValue.builder() + .s("remove") + .build())) + .build())); + env.execute("Conditional DELETE"); + + Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName)).isEqualTo(1); + Assertions.assertThat( + dynamoDBHelpers.containsAttributeValue(testTableName, "payload", "keep")) + .isTrue(); + } + + private DynamoDbSink buildSink( + ElementConverter converter) { + return DynamoDbSink.builder() + .setElementConverter(converter) + .setMaxTimeInBufferMS(1000) + .setMaxInFlightRequests(1) + .setMaxBatchSize(25) + .setFailOnError(true) + .setMaxBufferedRequests(1000) + .setTableName(testTableName) + .setOverwriteByPartitionKeys(Collections.emptyList()) + .setDynamoDbProperties(getProperties()) + .build(); + } + + private StreamExecutionEnvironment createEnv() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration config = new Configuration(); + config.set( + RestartStrategyOptions.RESTART_STRATEGY, + RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY.getMainValue()); + env.configure(config); + env.setParallelism(1); + return env; + } + + private Properties getProperties() { + Properties properties = new Properties(); + properties.setProperty(AWS_ENDPOINT, LOCALSTACK.getHostEndpointUrl()); + properties.setProperty(AWS_ACCESS_KEY_ID, LOCALSTACK.getAccessKey()); + properties.setProperty(AWS_SECRET_ACCESS_KEY, LOCALSTACK.getSecretKey()); + properties.setProperty(AWS_REGION, LOCALSTACK.getRegion().toString()); + properties.setProperty(TRUST_ALL_CERTIFICATES, "true"); + properties.setProperty(HTTP_PROTOCOL_VERSION, "HTTP1_1"); + return properties; + } +} diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java index 44adf5cf..63551f2b 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java @@ -31,11 +31,17 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest; import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse; import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse; import software.amazon.awssdk.services.dynamodb.model.DeleteRequest; import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutItemResponse; import software.amazon.awssdk.services.dynamodb.model.PutRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse; import software.amazon.awssdk.services.dynamodb.model.WriteRequest; import software.amazon.awssdk.services.sts.model.StsException; @@ -321,6 +327,235 @@ public void testClientClosesWhenWriterIsClosed() throws IOException { assertThat(testAsyncDynamoDbClientProvider.getCloseCount()).isEqualTo(1); } + @Test + public void testUpdateRequestUsesUpdateItemApi() throws Exception { + TrackingDynamoDbAsyncClient trackingClient = new TrackingDynamoDbAsyncClient(); + DynamoDbSinkWriter> dynamoDbSinkWriter = + getDefaultSinkWriter( + true, Collections.emptyList(), () -> trackingClient); + TestingResultHandler resultHandler = new TestingResultHandler(); + + Map key = singletonMap( + PARTITION_KEY, AttributeValue.builder().s("pk1").build()); + DynamoDbWriteRequest updateRequest = + DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.UPDATE) + .setItem(key) + .setUpdateExpression("SET #a = :val") + .setExpressionAttributeNames(singletonMap("#a", "counter")) + .setExpressionAttributeValues( + singletonMap(":val", AttributeValue.builder().n("1").build())) + .build(); + + dynamoDbSinkWriter.submitRequestEntries(singletonList(updateRequest), resultHandler); + + assertThat(resultHandler.isComplete()).isTrue(); + assertThat(trackingClient.getUpdateItemRequests()).hasSize(1); + assertThat(trackingClient.getUpdateItemRequests().get(0).updateExpression()) + .isEqualTo("SET #a = :val"); + assertThat(trackingClient.getRequestHistory()).isEmpty(); + } + + @Test + public void testConditionalPutUsesIndividualPutItemApi() throws Exception { + TrackingDynamoDbAsyncClient trackingClient = new TrackingDynamoDbAsyncClient(); + DynamoDbSinkWriter> dynamoDbSinkWriter = + getDefaultSinkWriter( + true, Collections.emptyList(), () -> trackingClient); + TestingResultHandler resultHandler = new TestingResultHandler(); + + DynamoDbWriteRequest conditionalPut = + DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.PUT) + .setItem(item("pk", "1")) + .setConditionExpression("attribute_not_exists(pk)") + .build(); + + dynamoDbSinkWriter.submitRequestEntries(singletonList(conditionalPut), resultHandler); + + assertThat(resultHandler.isComplete()).isTrue(); + assertThat(trackingClient.getPutItemRequests()).hasSize(1); + assertThat(trackingClient.getPutItemRequests().get(0).conditionExpression()) + .isEqualTo("attribute_not_exists(pk)"); + assertThat(trackingClient.getRequestHistory()).isEmpty(); + } + + @Test + public void testConditionalDeleteUsesIndividualDeleteItemApi() throws Exception { + TrackingDynamoDbAsyncClient trackingClient = new TrackingDynamoDbAsyncClient(); + DynamoDbSinkWriter> dynamoDbSinkWriter = + getDefaultSinkWriter( + true, Collections.emptyList(), () -> trackingClient); + TestingResultHandler resultHandler = new TestingResultHandler(); + + DynamoDbWriteRequest conditionalDelete = + DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.DELETE) + .setItem(item("pk", "1")) + .setConditionExpression("attribute_exists(pk)") + .build(); + + dynamoDbSinkWriter.submitRequestEntries(singletonList(conditionalDelete), resultHandler); + + assertThat(resultHandler.isComplete()).isTrue(); + assertThat(trackingClient.getDeleteItemRequests()).hasSize(1); + assertThat(trackingClient.getDeleteItemRequests().get(0).conditionExpression()) + .isEqualTo("attribute_exists(pk)"); + assertThat(trackingClient.getRequestHistory()).isEmpty(); + } + + @Test + public void testMixedBatchAndSingleRequests() throws Exception { + TrackingDynamoDbAsyncClient trackingClient = new TrackingDynamoDbAsyncClient(); + DynamoDbSinkWriter> dynamoDbSinkWriter = + getDefaultSinkWriter( + true, Collections.emptyList(), () -> trackingClient); + TestingResultHandler resultHandler = new TestingResultHandler(); + + Map key = singletonMap( + PARTITION_KEY, AttributeValue.builder().s("pk1").build()); + List inputRequests = + Arrays.asList( + sinkPutRequest(item("pk", "1")), + DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.UPDATE) + .setItem(key) + .setUpdateExpression("SET #a = :val") + .setExpressionAttributeNames(singletonMap("#a", "counter")) + .setExpressionAttributeValues( + singletonMap( + ":val", + AttributeValue.builder().n("1").build())) + .build()); + + dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler); + + assertThat(resultHandler.isComplete()).isTrue(); + assertThat(trackingClient.getRequestHistory()).hasSize(1); + assertThat(trackingClient.getUpdateItemRequests()).hasSize(1); + } + + @Test + public void testPutWithoutConditionUsesBatchPath() throws Exception { + TrackingDynamoDbAsyncClient trackingClient = new TrackingDynamoDbAsyncClient(); + DynamoDbSinkWriter> dynamoDbSinkWriter = + getDefaultSinkWriter( + true, Collections.emptyList(), () -> trackingClient); + TestingResultHandler resultHandler = new TestingResultHandler(); + + dynamoDbSinkWriter.submitRequestEntries( + singletonList(sinkPutRequest(item("pk", "1"))), resultHandler); + + assertThat(resultHandler.isComplete()).isTrue(); + assertThat(trackingClient.getRequestHistory()).hasSize(1); + assertThat(trackingClient.getPutItemRequests()).isEmpty(); + } + + @Test + public void testDeleteWithoutConditionUsesBatchPath() throws Exception { + TrackingDynamoDbAsyncClient trackingClient = new TrackingDynamoDbAsyncClient(); + DynamoDbSinkWriter> dynamoDbSinkWriter = + getDefaultSinkWriter( + true, Collections.emptyList(), () -> trackingClient); + TestingResultHandler resultHandler = new TestingResultHandler(); + + dynamoDbSinkWriter.submitRequestEntries( + singletonList(sinkDeleteRequest(item("pk", "1"))), resultHandler); + + assertThat(resultHandler.isComplete()).isTrue(); + assertThat(trackingClient.getRequestHistory()).hasSize(1); + assertThat(trackingClient.getDeleteItemRequests()).isEmpty(); + } + + @Test + public void testConditionalCheckFailedOnSingleRequestIsNonRetryable() throws Exception { + TrackingDynamoDbAsyncClient trackingClient = + new TrackingDynamoDbAsyncClient() { + @Override + public CompletableFuture putItem( + PutItemRequest putItemRequest) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( + DynamoDbException.builder() + .cause( + ConditionalCheckFailedException.builder() + .build()) + .build()); + return future; + } + }; + DynamoDbSinkWriter> dynamoDbSinkWriter = + getDefaultSinkWriter( + false, Collections.emptyList(), () -> trackingClient); + TestingResultHandler resultHandler = new TestingResultHandler(); + + DynamoDbWriteRequest conditionalPut = + DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.PUT) + .setItem(item("pk", "1")) + .setConditionExpression("attribute_not_exists(pk)") + .build(); + + dynamoDbSinkWriter.submitRequestEntries(singletonList(conditionalPut), resultHandler); + + assertThat(resultHandler.isComplete()).isFalse(); + assertThat(resultHandler.getRetryEntries()).isEmpty(); + assertThat(resultHandler.getException()).isNotNull(); + } + + @Test + public void testMixedBatchAndSingleRetryableExceptionRetriesBoth() throws Exception { + TrackingDynamoDbAsyncClient failingClient = + new TrackingDynamoDbAsyncClient() { + @Override + public CompletableFuture batchWriteItem( + BatchWriteItemRequest batchWriteItemRequest) { + CompletableFuture future = + new CompletableFuture<>(); + future.completeExceptionally( + DynamoDbException.builder() + .cause(getGenericRetryableException().get()) + .build()); + return future; + } + + @Override + public CompletableFuture updateItem( + UpdateItemRequest updateItemRequest) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( + DynamoDbException.builder() + .cause(getGenericRetryableException().get()) + .build()); + return future; + } + }; + DynamoDbSinkWriter> dynamoDbSinkWriter = + getDefaultSinkWriter( + false, Collections.emptyList(), () -> failingClient); + TestingResultHandler resultHandler = new TestingResultHandler(); + + Map key = + singletonMap(PARTITION_KEY, AttributeValue.builder().s("pk1").build()); + DynamoDbWriteRequest batchPut = sinkPutRequest(item("pk", "1")); + DynamoDbWriteRequest updateRequest = + DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.UPDATE) + .setItem(key) + .setUpdateExpression("SET #a = :val") + .setExpressionAttributeNames(singletonMap("#a", "counter")) + .setExpressionAttributeValues( + singletonMap(":val", AttributeValue.builder().n("1").build())) + .build(); + + dynamoDbSinkWriter.submitRequestEntries( + Arrays.asList(batchPut, updateRequest), resultHandler); + + assertThat(resultHandler.isComplete()).isFalse(); + assertThat(resultHandler.getRetryEntries()) + .containsExactlyInAnyOrder(batchPut, updateRequest); + } + private void assertThatRequestsAreNotRetried( boolean failOnError, Optional exceptionToThrow) throws IOException { ThrowingDynamoDbAsyncClient throwingDynamoDbAsyncClient = @@ -507,6 +742,9 @@ public int getCloseCount() { private static class TrackingDynamoDbAsyncClient implements DynamoDbAsyncClient { private List> requestHistory = new ArrayList<>(); + private List putItemRequests = new ArrayList<>(); + private List deleteItemRequests = new ArrayList<>(); + private List updateItemRequests = new ArrayList<>(); @Override public String serviceName() { @@ -523,6 +761,26 @@ public CompletableFuture batchWriteItem( return CompletableFuture.completedFuture(BatchWriteItemResponse.builder().build()); } + @Override + public CompletableFuture putItem(PutItemRequest putItemRequest) { + putItemRequests.add(putItemRequest); + return CompletableFuture.completedFuture(PutItemResponse.builder().build()); + } + + @Override + public CompletableFuture deleteItem( + DeleteItemRequest deleteItemRequest) { + deleteItemRequests.add(deleteItemRequest); + return CompletableFuture.completedFuture(DeleteItemResponse.builder().build()); + } + + @Override + public CompletableFuture updateItem( + UpdateItemRequest updateItemRequest) { + updateItemRequests.add(updateItemRequest); + return CompletableFuture.completedFuture(UpdateItemResponse.builder().build()); + } + @Override public DynamoDbServiceClientConfiguration serviceClientConfiguration() { return DynamoDbServiceClientConfiguration.builder().build(); @@ -531,6 +789,18 @@ public DynamoDbServiceClientConfiguration serviceClientConfiguration() { public List> getRequestHistory() { return requestHistory; } + + public List getPutItemRequests() { + return putItemRequests; + } + + public List getDeleteItemRequests() { + return deleteItemRequests; + } + + public List getUpdateItemRequests() { + return updateItemRequests; + } } private static class ThrowingDynamoDbAsyncClient diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java index 1c43c3c9..18c377a0 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java @@ -29,6 +29,7 @@ import static java.util.Collections.singletonMap; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; class DynamoDbWriteRequestTest { @@ -46,6 +47,62 @@ public void testAttributeValueExpectedFields() { .hasSize(11); } + @Test + public void testUpdateRequiresUpdateExpression() { + assertThatExceptionOfType(NullPointerException.class) + .isThrownBy( + () -> + DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.UPDATE) + .setItem( + singletonMap( + "pk", + AttributeValue.builder() + .s("1") + .build())) + .build()) + .withMessageContaining("updateExpression"); + } + + @Test + public void testUpdateBuildsWithAllFields() { + DynamoDbWriteRequest request = + DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.UPDATE) + .setItem( + singletonMap( + "pk", AttributeValue.builder().s("1").build())) + .setUpdateExpression("SET #c = :val") + .setConditionExpression("attribute_exists(pk)") + .setExpressionAttributeNames(singletonMap("#c", "counter")) + .setExpressionAttributeValues( + singletonMap( + ":val", AttributeValue.builder().n("1").build())) + .build(); + + assertThat(request.getType()).isEqualTo(DynamoDbWriteRequestType.UPDATE); + assertThat(request.getUpdateExpression()).isEqualTo("SET #c = :val"); + assertThat(request.getConditionExpression()).isEqualTo("attribute_exists(pk)"); + assertThat(request.getExpressionAttributeNames()).containsEntry("#c", "counter"); + assertThat(request.getExpressionAttributeValues()).containsKey(":val"); + } + + @Test + public void testPutBuildsWithConditionExpression() { + DynamoDbWriteRequest request = + DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.PUT) + .setItem( + singletonMap( + "pk", AttributeValue.builder().s("1").build())) + .setConditionExpression("attribute_not_exists(pk)") + .build(); + + assertThat(request.getType()).isEqualTo(DynamoDbWriteRequestType.PUT); + assertThat(request.getConditionExpression()).isEqualTo("attribute_not_exists(pk)"); + assertThat(request.getUpdateExpression()).isNull(); + } + @Test public void testToString() { DynamoDbWriteRequest dynamoDbWriteRequest = diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java index 958efee5..e73c7537 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java @@ -18,14 +18,19 @@ package org.apache.flink.connector.dynamodb.sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper; +import org.apache.flink.connector.dynamodb.util.DynamoDbSerializationUtil; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; import static java.util.Collections.singletonMap; import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState; @@ -53,18 +58,117 @@ public void testSerializeAndDeserialize() throws IOException { DynamoDbWriterStateSerializer serializer = new DynamoDbWriterStateSerializer(); BufferedRequestState actualState = - serializer.deserialize(1, serializer.serialize(expectedState)); + serializer.deserialize( + serializer.getVersion(), serializer.serialize(expectedState)); assertThat(actualState).usingRecursiveComparison().isEqualTo(expectedState); } + @Test + public void testSerializeAndDeserializeUpdateRequest() throws IOException { + Map key = + singletonMap("pk", AttributeValue.builder().s("key1").build()); + DynamoDbWriteRequest updateRequest = + DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.UPDATE) + .setItem(key) + .setUpdateExpression("SET #c = :val") + .setExpressionAttributeNames(singletonMap("#c", "counter")) + .setExpressionAttributeValues( + singletonMap(":val", AttributeValue.builder().n("1").build())) + .build(); + + BufferedRequestState expectedState = + new BufferedRequestState<>( + Collections.singletonList(new RequestEntryWrapper<>(updateRequest, 100))); + + DynamoDbWriterStateSerializer serializer = new DynamoDbWriterStateSerializer(); + BufferedRequestState actualState = + serializer.deserialize( + serializer.getVersion(), serializer.serialize(expectedState)); + + assertThat(actualState).usingRecursiveComparison().isEqualTo(expectedState); + } + + @Test + public void testSerializeAndDeserializeConditionalPut() throws IOException { + DynamoDbWriteRequest conditionalPut = + DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.PUT) + .setItem(singletonMap("pk", AttributeValue.builder().s("key1").build())) + .setConditionExpression("attribute_not_exists(pk)") + .setExpressionAttributeNames(singletonMap("#pk", "pk")) + .build(); + + BufferedRequestState expectedState = + new BufferedRequestState<>( + Collections.singletonList(new RequestEntryWrapper<>(conditionalPut, 100))); + + DynamoDbWriterStateSerializer serializer = new DynamoDbWriterStateSerializer(); + BufferedRequestState actualState = + serializer.deserialize( + serializer.getVersion(), serializer.serialize(expectedState)); + + assertThat(actualState).usingRecursiveComparison().isEqualTo(expectedState); + } + + @Test + public void testDeserializeVersion1State() throws IOException { + BufferedRequestState v1State = + getTestState(ELEMENT_CONVERTER, this::getRequestSize); + + DynamoDbWriterStateSerializerV1 v1Serializer = new DynamoDbWriterStateSerializerV1(); + byte[] v1Bytes = v1Serializer.serialize(v1State); + + DynamoDbWriterStateSerializer v2Serializer = new DynamoDbWriterStateSerializer(); + BufferedRequestState actualState = + v2Serializer.deserialize(1, v1Bytes); + + assertThat(actualState).usingRecursiveComparison().isEqualTo(v1State); + } + @Test public void testVersion() { DynamoDbWriterStateSerializer serializer = new DynamoDbWriterStateSerializer(); - assertThat(serializer.getVersion()).isEqualTo(1); + assertThat(serializer.getVersion()).isEqualTo(2); } private int getRequestSize(DynamoDbWriteRequest requestEntry) { return requestEntry.getItem().toString().getBytes(StandardCharsets.UTF_8).length; } + + /** + * Simulates version 1 serializer that only writes type + item (no expression fields). Used to + * test backward compatibility of the version 2 deserializer. + */ + private static class DynamoDbWriterStateSerializerV1 + extends AsyncSinkWriterStateSerializer { + + @Override + protected void serializeRequestToStream( + DynamoDbWriteRequest request, java.io.DataOutputStream out) + throws IOException { + // V1 format: only type byte + item map (no expression fields) + out.writeByte(request.getType().toByteValue()); + // Write item map: size + entries (key + attribute value) + Map item = request.getItem(); + out.writeInt(item.size()); + for (Map.Entry entry : item.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeByte(0); + out.writeUTF(entry.getValue().s()); + } + } + + @Override + protected DynamoDbWriteRequest deserializeRequestFromStream( + long requestSize, java.io.DataInputStream in) throws IOException { + return DynamoDbSerializationUtil.deserializeWriteRequest(in, 1); + } + + @Override + public int getVersion() { + return 1; + } + } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/TestItem.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/TestItem.java new file mode 100644 index 00000000..a16990b2 --- /dev/null +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/TestItem.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.testutils; + +import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; +import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; +import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSortKey; + +/** A test DynamoDbBean with partition key, sort key and payload attributes. */ +@DynamoDbBean +public class TestItem { + + private String pk; + private String sk; + private String payload; + private String counter; + + public TestItem() {} + + public TestItem(String pk, String sk, String payload, String counter) { + this.pk = pk; + this.sk = sk; + this.payload = payload; + this.counter = counter; + } + + @DynamoDbPartitionKey + public String getPk() { + return pk; + } + + public void setPk(String pk) { + this.pk = pk; + } + + @DynamoDbSortKey + public String getSk() { + return sk; + } + + public void setSk(String sk) { + this.sk = sk; + } + + public String getPayload() { + return payload; + } + + public void setPayload(String payload) { + this.payload = payload; + } + + public String getCounter() { + return counter; + } + + public void setCounter(String counter) { + this.counter = counter; + } +} diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java index 1c128fb3..e073d4ed 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java @@ -127,6 +127,51 @@ public void testDeleteItemSerializeDeserialize() throws IOException { } } + @Test + public void testUpdateWithAllFieldsSerializeDeserialize() throws IOException { + final Map key = new HashMap<>(); + key.put("pk", AttributeValue.builder().s("key1").build()); + key.put("sk", AttributeValue.builder().s("sort1").build()); + + DynamoDbWriteRequest dynamoDbWriteRequest = + DynamoDbWriteRequest.builder() + .setItem(key) + .setType(DynamoDbWriteRequestType.UPDATE) + .setUpdateExpression("SET #c = #c + :inc, #u = :ts") + .setConditionExpression("attribute_exists(pk)") + .setExpressionAttributeNames( + new HashMap() { + { + put("#c", "counter"); + put("#u", "updatedAt"); + } + }) + .setExpressionAttributeValues( + new HashMap() { + { + put(":inc", AttributeValue.builder().n("1").build()); + put(":ts", AttributeValue.builder().s("2026-03-28").build()); + } + }) + .build(); + + byte[] serialized; + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(outputStream)) { + DynamoDbSerializationUtil.serializeWriteRequest(dynamoDbWriteRequest, out); + serialized = outputStream.toByteArray(); + } + + try (InputStream inputStream = new ByteArrayInputStream(serialized); + DataInputStream dataInputStream = new DataInputStream(inputStream)) { + DynamoDbWriteRequest deserializedWriteRequest = + DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream); + assertThat(deserializedWriteRequest) + .usingRecursiveComparison() + .isEqualTo(dynamoDbWriteRequest); + } + } + @Test public void testSerializeEmptyAttributeValueThrowsException() { final Map item = new HashMap<>();