diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java
index bbe63644..4f5d59c1 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java
@@ -25,11 +25,27 @@
import software.amazon.awssdk.enhanced.dynamodb.mapper.BeanTableSchema;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.util.Collection;
+import java.util.Map;
/**
* A generic {@link ElementConverter} that uses the dynamodb-enhanced client to build a {@link
* DynamoDbWriteRequest} from a POJO annotated with {@link DynamoDbBean}.
*
+ *
Supports all three write request types:
+ *
+ *
+ * - {@link DynamoDbWriteRequestType#PUT} - maps the full POJO to an item
+ *
- {@link DynamoDbWriteRequestType#DELETE} - extracts only the primary key from the POJO
+ *
- {@link DynamoDbWriteRequestType#UPDATE} - extracts the primary key from the POJO and uses
+ * the provided update expression
+ *
+ *
+ * Condition expressions can be set for any type to enable conditional writes via individual API
+ * calls instead of {@code BatchWriteItem}.
+ *
* @param The type of the {@link DynamoDbBean} to convert into {@link DynamoDbWriteRequest}
*/
@PublicEvolving
@@ -40,15 +56,43 @@ public class DynamoDbBeanElementConverter
private final Class recordType;
private final boolean ignoreNulls;
+ private final DynamoDbWriteRequestType type;
+ private final String updateExpression;
+ private final String conditionExpression;
+ private final Map expressionAttributeNames;
+ private final Map expressionAttributeValues;
private transient BeanTableSchema tableSchema;
+ private transient Collection keyAttributeNames;
public DynamoDbBeanElementConverter(final Class recordType) {
this(recordType, false);
}
public DynamoDbBeanElementConverter(final Class recordType, final boolean ignoreNulls) {
+ this(recordType, ignoreNulls, DynamoDbWriteRequestType.PUT, null, null, null, null);
+ }
+
+ private DynamoDbBeanElementConverter(
+ final Class recordType,
+ final boolean ignoreNulls,
+ final DynamoDbWriteRequestType type,
+ final String updateExpression,
+ final String conditionExpression,
+ final Map expressionAttributeNames,
+ final Map expressionAttributeValues) {
this.recordType = recordType;
this.ignoreNulls = ignoreNulls;
+ this.type = Preconditions.checkNotNull(type, "Type must not be null");
+ this.updateExpression = updateExpression;
+ this.conditionExpression = conditionExpression;
+ this.expressionAttributeNames = expressionAttributeNames;
+ this.expressionAttributeValues = expressionAttributeValues;
+
+ if (type == DynamoDbWriteRequestType.UPDATE) {
+ Preconditions.checkNotNull(
+ updateExpression,
+ "Update expression must not be null for UPDATE requests.");
+ }
// Attempt to create a table schema now to bubble up errors before starting job
createTableSchema(recordType);
@@ -57,18 +101,106 @@ public DynamoDbBeanElementConverter(final Class recordType, final boolea
@Override
public DynamoDbWriteRequest apply(InputT element, SinkWriter.Context context) {
Preconditions.checkNotNull(tableSchema, "Table schema has not been initialized");
- return new DynamoDbWriteRequest.Builder()
- .setType(DynamoDbWriteRequestType.PUT)
- .setItem(tableSchema.itemToMap(element, ignoreNulls))
- .build();
+ Map item =
+ type == DynamoDbWriteRequestType.PUT
+ ? tableSchema.itemToMap(element, ignoreNulls)
+ : tableSchema.itemToMap(element, keyAttributeNames);
+ DynamoDbWriteRequest.Builder builder =
+ new DynamoDbWriteRequest.Builder().setType(type).setItem(item);
+ if (updateExpression != null) {
+ builder.setUpdateExpression(updateExpression);
+ }
+ if (conditionExpression != null) {
+ builder.setConditionExpression(conditionExpression);
+ }
+ if (expressionAttributeNames != null) {
+ builder.setExpressionAttributeNames(expressionAttributeNames);
+ }
+ if (expressionAttributeValues != null) {
+ builder.setExpressionAttributeValues(expressionAttributeValues);
+ }
+ return builder.build();
}
@Override
public void open(WriterInitContext context) {
tableSchema = createTableSchema(recordType);
+ // DELETE and UPDATE only need the primary key attributes from the POJO, not the full item.
+ // PUT uses the complete item map.
+ if (type == DynamoDbWriteRequestType.DELETE || type == DynamoDbWriteRequestType.UPDATE) {
+ keyAttributeNames = tableSchema.tableMetadata().primaryKeys();
+ }
}
private BeanTableSchema createTableSchema(final Class recordType) {
return BeanTableSchema.create(recordType);
}
+
+ /** Builder for {@link DynamoDbBeanElementConverter}. */
+ public static class ElementConverterBuilder {
+ private final Class recordType;
+ private boolean ignoreNulls = false;
+ private DynamoDbWriteRequestType type = DynamoDbWriteRequestType.PUT;
+ private String updateExpression;
+ private String conditionExpression;
+ private Map expressionAttributeNames;
+ private Map expressionAttributeValues;
+
+ private ElementConverterBuilder(Class recordType) {
+ this.recordType = recordType;
+ }
+
+ public ElementConverterBuilder setIgnoreNulls(boolean ignoreNulls) {
+ this.ignoreNulls = ignoreNulls;
+ return this;
+ }
+
+ public ElementConverterBuilder setType(DynamoDbWriteRequestType type) {
+ this.type = type;
+ return this;
+ }
+
+ public ElementConverterBuilder setUpdateExpression(String updateExpression) {
+ this.updateExpression = updateExpression;
+ return this;
+ }
+
+ public ElementConverterBuilder setConditionExpression(String conditionExpression) {
+ this.conditionExpression = conditionExpression;
+ return this;
+ }
+
+ public ElementConverterBuilder setExpressionAttributeNames(
+ Map expressionAttributeNames) {
+ this.expressionAttributeNames = expressionAttributeNames;
+ return this;
+ }
+
+ public ElementConverterBuilder setExpressionAttributeValues(
+ Map expressionAttributeValues) {
+ this.expressionAttributeValues = expressionAttributeValues;
+ return this;
+ }
+
+ public DynamoDbBeanElementConverter build() {
+ Preconditions.checkArgument(
+ type != DynamoDbWriteRequestType.UPDATE || updateExpression != null,
+ "updateExpression is required for UPDATE type.");
+ Preconditions.checkArgument(
+ type == DynamoDbWriteRequestType.UPDATE || updateExpression == null,
+ "updateExpression is only allowed for UPDATE type.");
+ return new DynamoDbBeanElementConverter<>(
+ recordType,
+ ignoreNulls,
+ type,
+ updateExpression,
+ conditionExpression,
+ expressionAttributeNames,
+ expressionAttributeValues);
+ }
+ }
+
+ public static ElementConverterBuilder builder(Class recordType) {
+ return new ElementConverterBuilder<>(recordType);
+ }
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
index 2d3e6f03..9044c9a6 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
@@ -75,6 +75,22 @@
* single batch takes precedence.
*
*
+ * The sink supports three write request types via {@link DynamoDbWriteRequestType}:
+ *
+ *
+ * - {@code PUT} - writes a full item using {@code BatchWriteItem} or {@code PutItem} (when a
+ * condition expression is set)
+ *
- {@code DELETE} - deletes an item by key using {@code BatchWriteItem} or {@code DeleteItem}
+ * (when a condition expression is set)
+ *
- {@code UPDATE} - updates specific attributes of an item using {@code UpdateItem}, always
+ * processed as individual requests since {@code BatchWriteItem} does not support updates
+ *
+ *
+ * Requests without condition expressions (PUT/DELETE) are sent via {@code BatchWriteItem} for
+ * efficiency. Requests with condition expressions and all UPDATE requests are sent as individual
+ * {@code PutItem}, {@code DeleteItem}, or {@code UpdateItem} calls. Both paths execute
+ * concurrently.
+ *
*
Please see the writer implementation in {@link DynamoDbSinkWriter}
*
* @param Type of the elements handled by this sink
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
index 66e269e0..97008bda 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
@@ -32,6 +32,7 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,10 +40,13 @@
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import java.util.ArrayList;
@@ -51,6 +55,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.singletonMap;
import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
@@ -164,6 +170,59 @@ protected void submitRequestEntries(
List requestEntries,
ResultHandler resultHandler) {
+ List batchRequests = new ArrayList<>();
+ List singleRequests = new ArrayList<>();
+
+ for (DynamoDbWriteRequest request : requestEntries) {
+ if (isSingleRequest(request)) {
+ singleRequests.add(request);
+ } else {
+ batchRequests.add(request);
+ }
+ }
+
+ // Shared state to collect results from concurrent batch and single paths.
+ // ResultHandler is called exactly once in whenComplete after both paths finish.
+ ConcurrentLinkedQueue retryableFailures =
+ new ConcurrentLinkedQueue<>();
+ AtomicReference fatalException = new AtomicReference<>();
+
+ CompletableFuture batchFuture =
+ submitBatchRequests(
+ batchRequests, retryableFailures, fatalException);
+
+ CompletableFuture singleFuture =
+ submitSingleRequests(
+ singleRequests, retryableFailures, fatalException);
+
+ CompletableFuture.allOf(batchFuture, singleFuture)
+ .whenComplete(
+ (ignored, err) -> {
+ if (fatalException.get() != null) {
+ resultHandler.completeExceptionally(fatalException.get());
+ } else if (!retryableFailures.isEmpty()) {
+ resultHandler.retryForEntries(
+ new ArrayList<>(retryableFailures));
+ } else {
+ resultHandler.complete();
+ }
+ });
+ }
+
+ private boolean isSingleRequest(DynamoDbWriteRequest request) {
+ return request.getType() == DynamoDbWriteRequestType.UPDATE
+ || request.getConditionExpression() != null;
+ }
+
+ private CompletableFuture submitBatchRequests(
+ List requestEntries,
+ ConcurrentLinkedQueue retryableFailures,
+ AtomicReference fatalException) {
+
+ if (requestEntries.isEmpty()) {
+ return FutureUtils.completedVoidFuture();
+ }
+
List items = new ArrayList<>();
if (CollectionUtil.isNullOrEmpty(overwriteByPartitionKeys)) {
@@ -181,28 +240,32 @@ protected void submitRequestEntries(
items.addAll(container.values());
}
- CompletableFuture future =
- clientProvider
- .getClient()
- .batchWriteItem(
- BatchWriteItemRequest.builder()
- .requestItems(singletonMap(tableName, items))
- .build());
-
- future.whenComplete(
- (response, err) -> {
- if (err != null) {
- handleFullyFailedRequest(err, requestEntries, resultHandler);
- } else if (!CollectionUtil.isNullOrEmpty(response.unprocessedItems())) {
- handlePartiallyUnprocessedRequest(response, resultHandler);
- } else {
- resultHandler.complete();
- }
- });
+ return clientProvider
+ .getClient()
+ .batchWriteItem(
+ BatchWriteItemRequest.builder()
+ .requestItems(singletonMap(tableName, items))
+ .build())
+ .handle(
+ (response, err) -> {
+ if (err != null) {
+ handleFullyFailedRequest(
+ err,
+ requestEntries,
+ retryableFailures,
+ fatalException);
+ } else if (!CollectionUtil.isNullOrEmpty(
+ response.unprocessedItems())) {
+ handlePartiallyUnprocessedRequest(
+ response, retryableFailures);
+ }
+ return null;
+ });
}
private void handlePartiallyUnprocessedRequest(
- BatchWriteItemResponse response, ResultHandler resultHandler) {
+ BatchWriteItemResponse response,
+ ConcurrentLinkedQueue retryableFailures) {
List unprocessed = new ArrayList<>();
for (WriteRequest writeRequest : response.unprocessedItems().get(tableName)) {
@@ -213,39 +276,97 @@ private void handlePartiallyUnprocessedRequest(
numRecordsSendErrorsCounter.inc(unprocessed.size());
numRecordsSendPartialFailure.inc(unprocessed.size());
- resultHandler.retryForEntries(unprocessed);
+ retryableFailures.addAll(unprocessed);
}
private void handleFullyFailedRequest(
Throwable err,
List requestEntries,
- ResultHandler resultHandler) {
+ ConcurrentLinkedQueue retryableFailures,
+ AtomicReference fatalException) {
LOG.warn(
"DynamoDB Sink failed to persist and will retry {} entries.",
requestEntries.size(),
err);
numRecordsSendErrorsCounter.inc(requestEntries.size());
- if (isRetryable(err.getCause(), resultHandler)) {
- resultHandler.retryForEntries(requestEntries);
+ if (isRetryable(err.getCause(), fatalException)) {
+ retryableFailures.addAll(requestEntries);
}
}
- private boolean isRetryable(Throwable err, ResultHandler resultHandler) {
+ private boolean isRetryable(Throwable err, AtomicReference fatalException) {
// isFatal() is really isNotFatal()
if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal(
- err, resultHandler::completeExceptionally)) {
+ err, ex -> fatalException.compareAndSet(null, ex))) {
return false;
}
if (failOnError) {
- resultHandler.completeExceptionally(
- new DynamoDbSinkException.DynamoDbSinkFailFastException(err));
+ fatalException.compareAndSet(
+ null, new DynamoDbSinkException.DynamoDbSinkFailFastException(err));
return false;
}
return true;
}
+ private CompletableFuture submitSingleRequests(
+ List singleRequests,
+ ConcurrentLinkedQueue retryableFailures,
+ AtomicReference fatalException) {
+
+ if (singleRequests.isEmpty()) {
+ return FutureUtils.completedVoidFuture();
+ }
+
+ CompletableFuture>[] futures =
+ singleRequests.stream()
+ .map(
+ request ->
+ submitSingleRequest(request)
+ .handle(
+ (response, err) -> {
+ if (err != null) {
+ handleSingleRequestError(
+ err,
+ request,
+ retryableFailures,
+ fatalException);
+ }
+ return null;
+ }))
+ .toArray(CompletableFuture[]::new);
+
+ return CompletableFuture.allOf(futures);
+ }
+
+ private CompletableFuture> submitSingleRequest(DynamoDbWriteRequest request) {
+ switch (request.getType()) {
+ case PUT:
+ return clientProvider.getClient().putItem(convertToPutItemRequest(request));
+ case DELETE:
+ return clientProvider.getClient().deleteItem(convertToDeleteItemRequest(request));
+ case UPDATE:
+ return clientProvider.getClient().updateItem(convertToUpdateItemRequest(request));
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported DynamoDb Write Request Type: " + request.getType());
+ }
+ }
+
+ private void handleSingleRequestError(
+ Throwable err,
+ DynamoDbWriteRequest request,
+ ConcurrentLinkedQueue retryableFailures,
+ AtomicReference fatalException) {
+ LOG.warn("DynamoDB Sink single write failed for {} request.", request.getType(), err);
+ numRecordsSendErrorsCounter.inc();
+
+ if (isRetryable(err.getCause(), fatalException)) {
+ retryableFailures.add(request);
+ }
+ }
+
@Override
protected long getSizeInBytes(DynamoDbWriteRequest requestEntry) {
// dynamodb calculates item size as a sum of all attributes and all values, to calculate it
@@ -291,4 +412,52 @@ private DynamoDbWriteRequest convertToDynamoDbWriteRequest(WriteRequest writeReq
"Unsupported Write Request, consider updating the convertToDynamoDbWriteRequest method");
}
}
+
+ private PutItemRequest convertToPutItemRequest(DynamoDbWriteRequest request) {
+ PutItemRequest.Builder builder =
+ PutItemRequest.builder().tableName(tableName).item(request.getItem());
+ if (request.getConditionExpression() != null) {
+ builder.conditionExpression(request.getConditionExpression());
+ }
+ if (request.getExpressionAttributeNames() != null) {
+ builder.expressionAttributeNames(request.getExpressionAttributeNames());
+ }
+ if (request.getExpressionAttributeValues() != null) {
+ builder.expressionAttributeValues(request.getExpressionAttributeValues());
+ }
+ return builder.build();
+ }
+
+ private DeleteItemRequest convertToDeleteItemRequest(DynamoDbWriteRequest request) {
+ DeleteItemRequest.Builder builder =
+ DeleteItemRequest.builder().tableName(tableName).key(request.getItem());
+ if (request.getConditionExpression() != null) {
+ builder.conditionExpression(request.getConditionExpression());
+ }
+ if (request.getExpressionAttributeNames() != null) {
+ builder.expressionAttributeNames(request.getExpressionAttributeNames());
+ }
+ if (request.getExpressionAttributeValues() != null) {
+ builder.expressionAttributeValues(request.getExpressionAttributeValues());
+ }
+ return builder.build();
+ }
+
+ private UpdateItemRequest convertToUpdateItemRequest(DynamoDbWriteRequest request) {
+ UpdateItemRequest.Builder builder =
+ UpdateItemRequest.builder()
+ .tableName(tableName)
+ .key(request.getItem())
+ .updateExpression(request.getUpdateExpression());
+ if (request.getConditionExpression() != null) {
+ builder.conditionExpression(request.getConditionExpression());
+ }
+ if (request.getExpressionAttributeNames() != null) {
+ builder.expressionAttributeNames(request.getExpressionAttributeNames());
+ }
+ if (request.getExpressionAttributeValues() != null) {
+ builder.expressionAttributeValues(request.getExpressionAttributeValues());
+ }
+ return builder.build();
+ }
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java
index c8e3c886..aa181f60 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java
@@ -28,7 +28,11 @@
/**
* Represents a single Write Request to DynamoDb. Contains the item to be written as well as the
- * type of the Write Request (PUT/DELETE)
+ * type of the Write Request (PUT/DELETE/UPDATE).
+ *
+ * For PUT requests, {@code item} contains the full item attributes. For DELETE requests, {@code
+ * item} contains the primary key attributes. For UPDATE requests, {@code item} contains the primary
+ * key attributes and the update expression fields must be set.
*/
@PublicEvolving
public class DynamoDbWriteRequest implements Serializable {
@@ -37,10 +41,24 @@ public class DynamoDbWriteRequest implements Serializable {
private final Map item;
private final DynamoDbWriteRequestType type;
-
- private DynamoDbWriteRequest(Map item, DynamoDbWriteRequestType type) {
+ private final String updateExpression;
+ private final Map expressionAttributeNames;
+ private final Map expressionAttributeValues;
+ private final String conditionExpression;
+
+ private DynamoDbWriteRequest(
+ Map item,
+ DynamoDbWriteRequestType type,
+ String updateExpression,
+ Map expressionAttributeNames,
+ Map expressionAttributeValues,
+ String conditionExpression) {
this.item = item;
this.type = type;
+ this.updateExpression = updateExpression;
+ this.expressionAttributeNames = expressionAttributeNames;
+ this.expressionAttributeValues = expressionAttributeValues;
+ this.conditionExpression = conditionExpression;
}
public Map getItem() {
@@ -51,19 +69,54 @@ public DynamoDbWriteRequestType getType() {
return type;
}
+ public String getUpdateExpression() {
+ return updateExpression;
+ }
+
+ public Map getExpressionAttributeNames() {
+ return expressionAttributeNames;
+ }
+
+ public Map getExpressionAttributeValues() {
+ return expressionAttributeValues;
+ }
+
+ public String getConditionExpression() {
+ return conditionExpression;
+ }
+
public static Builder builder() {
return new Builder();
}
@Override
public String toString() {
- return "DynamoDbWriteRequest{" + "item=" + item + ", type=" + type + '}';
+ return "DynamoDbWriteRequest{"
+ + "item="
+ + item
+ + ", type="
+ + type
+ + ", updateExpression='"
+ + updateExpression
+ + '\''
+ + ", expressionAttributeNames="
+ + expressionAttributeNames
+ + ", expressionAttributeValues="
+ + expressionAttributeValues
+ + ", conditionExpression='"
+ + conditionExpression
+ + '\''
+ + '}';
}
/** Builder for DynamoDbWriteRequest. */
public static class Builder {
private Map item;
private DynamoDbWriteRequestType type;
+ private String updateExpression;
+ private Map expressionAttributeNames;
+ private Map expressionAttributeValues;
+ private String conditionExpression;
public Builder setItem(Map item) {
this.item = item;
@@ -75,12 +128,45 @@ public Builder setType(DynamoDbWriteRequestType type) {
return this;
}
+ public Builder setUpdateExpression(String updateExpression) {
+ this.updateExpression = updateExpression;
+ return this;
+ }
+
+ public Builder setExpressionAttributeNames(
+ Map expressionAttributeNames) {
+ this.expressionAttributeNames = expressionAttributeNames;
+ return this;
+ }
+
+ public Builder setExpressionAttributeValues(
+ Map expressionAttributeValues) {
+ this.expressionAttributeValues = expressionAttributeValues;
+ return this;
+ }
+
+ public Builder setConditionExpression(String conditionExpression) {
+ this.conditionExpression = conditionExpression;
+ return this;
+ }
+
public DynamoDbWriteRequest build() {
Preconditions.checkNotNull(
- item, "No Item was supplied to the " + "DynamoDbWriteRequest builder.");
+ item, "No Item was supplied to the DynamoDbWriteRequest builder.");
Preconditions.checkNotNull(
- type, "No type was supplied to the " + "DynamoDbWriteRequest builder.");
- return new DynamoDbWriteRequest(item, type);
+ type, "No type was supplied to the DynamoDbWriteRequest builder.");
+ if (type == DynamoDbWriteRequestType.UPDATE) {
+ Preconditions.checkNotNull(
+ updateExpression,
+ "No updateExpression was supplied for UPDATE DynamoDbWriteRequest.");
+ }
+ return new DynamoDbWriteRequest(
+ item,
+ type,
+ updateExpression,
+ expressionAttributeNames,
+ expressionAttributeValues,
+ conditionExpression);
}
}
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestType.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestType.java
index 7a69e436..f0b783e2 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestType.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestType.java
@@ -26,6 +26,7 @@
*
* - PUT - Put Request
*
- DELETE - Delete Request
+ *
- UPDATE - Update Request
*
*/
@PublicEvolving
@@ -34,7 +35,8 @@ public enum DynamoDbWriteRequestType {
// Note: Enums have no stable hash code across different JVMs, use toByteValue() for
// this purpose.
PUT((byte) 0),
- DELETE((byte) 1);
+ DELETE((byte) 1),
+ UPDATE((byte) 2);
private final byte value;
DynamoDbWriteRequestType(byte value) {
@@ -48,6 +50,7 @@ public enum DynamoDbWriteRequestType {
*
* - "0" represents {@link #PUT}.
*
- "1" represents {@link #DELETE}.
+ *
- "2" represents {@link #UPDATE}.
*
*/
public byte toByteValue() {
@@ -66,6 +69,8 @@ public static DynamoDbWriteRequestType fromByteValue(byte value) {
return PUT;
case 1:
return DELETE;
+ case 2:
+ return UPDATE;
default:
throw new UnsupportedOperationException(
"Unsupported byte value '" + value + "' for DynamoDb request type.");
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializer.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializer.java
index d06155cc..ba56673e 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializer.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializer.java
@@ -20,6 +20,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.dynamodb.util.DynamoDbSerializationUtil;
import java.io.DataInputStream;
@@ -31,6 +32,12 @@
public class DynamoDbWriterStateSerializer
extends AsyncSinkWriterStateSerializer {
+ // The parent class does not pass the version to deserializeRequestFromStream(),
+ // so we stash it here before calling super.deserialize() and read it back in
+ // deserializeRequestFromStream().
+ private final ThreadLocal deserializingVersion =
+ ThreadLocal.withInitial(this::getVersion);
+
/**
* Serializes {@link DynamoDbWriteRequest} in form of
* [TABLE_NAME,WRITE_REQUEST_TYPE(PUT/DELETE),WRITE_REQUEST].
@@ -44,11 +51,22 @@ protected void serializeRequestToStream(DynamoDbWriteRequest request, DataOutput
@Override
protected DynamoDbWriteRequest deserializeRequestFromStream(
long requestSize, DataInputStream in) throws IOException {
- return DynamoDbSerializationUtil.deserializeWriteRequest(in);
+ return DynamoDbSerializationUtil.deserializeWriteRequest(in, deserializingVersion.get());
+ }
+
+ @Override
+ public BufferedRequestState deserialize(int version, byte[] serialized)
+ throws IOException {
+ deserializingVersion.set(version);
+ try {
+ return super.deserialize(version, serialized);
+ } finally {
+ deserializingVersion.set(getVersion());
+ }
}
@Override
public int getVersion() {
- return 1;
+ return 2;
}
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtil.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtil.java
index 0e4a8eb9..0b6dd018 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtil.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtil.java
@@ -43,20 +43,112 @@ public class DynamoDbSerializationUtil {
public static void serializeWriteRequest(
DynamoDbWriteRequest dynamoDbWriteRequest, DataOutputStream out) throws IOException {
out.writeByte(dynamoDbWriteRequest.getType().toByteValue());
- Map item = dynamoDbWriteRequest.getItem();
- serializeItem(item, out);
+ serializeItem(dynamoDbWriteRequest.getItem(), out);
+ serializeNullableString(dynamoDbWriteRequest.getUpdateExpression(), out);
+ serializeNullableString(dynamoDbWriteRequest.getConditionExpression(), out);
+ serializeNullableStringMap(dynamoDbWriteRequest.getExpressionAttributeNames(), out);
+ serializeNullableAttributeValueMap(
+ dynamoDbWriteRequest.getExpressionAttributeValues(), out);
}
public static DynamoDbWriteRequest deserializeWriteRequest(DataInputStream in)
throws IOException {
+ return deserializeWriteRequest(in, 2);
+ }
+
+ public static DynamoDbWriteRequest deserializeWriteRequest(DataInputStream in, int version)
+ throws IOException {
int writeRequestType = in.read();
DynamoDbWriteRequestType dynamoDbWriteRequestType =
DynamoDbWriteRequestType.fromByteValue((byte) writeRequestType);
Map item = deserializeItem(in);
- return DynamoDbWriteRequest.builder()
- .setType(dynamoDbWriteRequestType)
- .setItem(item)
- .build();
+
+ DynamoDbWriteRequest.Builder builder =
+ DynamoDbWriteRequest.builder()
+ .setType(dynamoDbWriteRequestType)
+ .setItem(item);
+
+ // Version 2 added expression fields for conditional and update writes
+ if (version >= 2) {
+ String updateExpression = deserializeNullableString(in);
+ String conditionExpression = deserializeNullableString(in);
+ Map expressionAttributeNames = deserializeNullableStringMap(in);
+ Map expressionAttributeValues =
+ deserializeNullableAttributeValueMap(in);
+ if (updateExpression != null) {
+ builder.setUpdateExpression(updateExpression);
+ }
+ if (conditionExpression != null) {
+ builder.setConditionExpression(conditionExpression);
+ }
+ if (expressionAttributeNames != null) {
+ builder.setExpressionAttributeNames(expressionAttributeNames);
+ }
+ if (expressionAttributeValues != null) {
+ builder.setExpressionAttributeValues(expressionAttributeValues);
+ }
+ }
+
+ return builder.build();
+ }
+
+ private static void serializeNullableString(String value, DataOutputStream out)
+ throws IOException {
+ if (value == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeUTF(value);
+ }
+ }
+
+ private static String deserializeNullableString(DataInputStream in) throws IOException {
+ boolean present = in.readBoolean();
+ return present ? in.readUTF() : null;
+ }
+
+ private static void serializeNullableStringMap(Map map, DataOutputStream out)
+ throws IOException {
+ if (map == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeInt(map.size());
+ for (Map.Entry entry : map.entrySet()) {
+ out.writeUTF(entry.getKey());
+ out.writeUTF(entry.getValue());
+ }
+ }
+ }
+
+ private static Map deserializeNullableStringMap(DataInputStream in)
+ throws IOException {
+ boolean present = in.readBoolean();
+ if (!present) {
+ return null;
+ }
+ int size = in.readInt();
+ Map map = new HashMap<>(size);
+ for (int i = 0; i < size; i++) {
+ map.put(in.readUTF(), in.readUTF());
+ }
+ return map;
+ }
+
+ private static void serializeNullableAttributeValueMap(
+ Map map, DataOutputStream out) throws IOException {
+ if (map == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ serializeItem(map, out);
+ }
+ }
+
+ private static Map deserializeNullableAttributeValueMap(
+ DataInputStream in) throws IOException {
+ boolean present = in.readBoolean();
+ return present ? deserializeItem(in) : null;
}
private static void serializeItem(Map item, DataOutputStream out)
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java
index 13a733f7..afa201e3 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java
@@ -19,11 +19,17 @@
package org.apache.flink.connector.dynamodb.sink;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.dynamodb.testutils.TestItem;
import org.apache.flink.connector.dynamodb.util.Order;
import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import java.util.Collections;
+
+import static org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.DELETE;
import static org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.PUT;
+import static org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.UPDATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -88,4 +94,87 @@ void testConvertWithClosedConvertedThrowsException() {
.isThrownBy(() -> elementConverter.apply(order, null))
.withMessageContaining("Table schema has not been initialized");
}
+
+ @Test
+ void testConditionalPutSetsConditionExpression() {
+ ElementConverter elementConverter =
+ DynamoDbBeanElementConverter.builder(TestItem.class)
+ .setType(PUT)
+ .setConditionExpression("attribute_not_exists(pk)")
+ .setExpressionAttributeNames(Collections.singletonMap("#pk", "pk"))
+ .build();
+ elementConverter.open(null);
+ TestItem item = new TestItem("1", "a", "data", "0");
+
+ DynamoDbWriteRequest actual = elementConverter.apply(item, null);
+
+ assertThat(actual.getType()).isEqualTo(PUT);
+ assertThat(actual.getConditionExpression()).isEqualTo("attribute_not_exists(pk)");
+ assertThat(actual.getExpressionAttributeNames()).containsEntry("#pk", "pk");
+ assertThat(actual.getItem()).containsKeys("pk", "sk", "payload", "counter");
+ }
+
+ @Test
+ void testDeleteExtractsOnlyKeyAttributes() {
+ ElementConverter elementConverter =
+ DynamoDbBeanElementConverter.builder(TestItem.class)
+ .setType(DELETE)
+ .build();
+ elementConverter.open(null);
+ TestItem item = new TestItem("1", "a", "data", "0");
+
+ DynamoDbWriteRequest actual = elementConverter.apply(item, null);
+
+ assertThat(actual.getType()).isEqualTo(DELETE);
+ assertThat(actual.getItem()).containsOnlyKeys("pk", "sk");
+ assertThat(actual.getItem().get("pk").s()).isEqualTo("1");
+ assertThat(actual.getItem().get("sk").s()).isEqualTo("a");
+ }
+
+ @Test
+ void testUpdateExtractsKeyAndSetsUpdateExpression() {
+ ElementConverter elementConverter =
+ DynamoDbBeanElementConverter.builder(TestItem.class)
+ .setType(UPDATE)
+ .setUpdateExpression("SET #c = :val")
+ .setExpressionAttributeNames(Collections.singletonMap("#c", "counter"))
+ .setExpressionAttributeValues(
+ Collections.singletonMap(
+ ":val", AttributeValue.builder().s("42").build()))
+ .build();
+ elementConverter.open(null);
+ TestItem item = new TestItem("1", "a", "data", "0");
+
+ DynamoDbWriteRequest actual = elementConverter.apply(item, null);
+
+ assertThat(actual.getType()).isEqualTo(UPDATE);
+ assertThat(actual.getUpdateExpression()).isEqualTo("SET #c = :val");
+ assertThat(actual.getItem()).containsOnlyKeys("pk", "sk");
+ assertThat(actual.getExpressionAttributeNames()).containsEntry("#c", "counter");
+ assertThat(actual.getExpressionAttributeValues())
+ .containsEntry(":val", AttributeValue.builder().s("42").build());
+ }
+
+ @Test
+ void testUpdateWithoutUpdateExpressionThrows() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ DynamoDbBeanElementConverter.builder(TestItem.class)
+ .setType(UPDATE)
+ .build())
+ .withMessageContaining("updateExpression is required for UPDATE type");
+ }
+
+ @Test
+ void testPutWithUpdateExpressionThrows() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ DynamoDbBeanElementConverter.builder(TestItem.class)
+ .setType(PUT)
+ .setUpdateExpression("SET #a = :val")
+ .build())
+ .withMessageContaining("updateExpression is only allowed for UPDATE type");
+ }
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkConditionalWriteITCase.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkConditionalWriteITCase.java
new file mode 100644
index 00000000..c241d5a6
--- /dev/null
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkConditionalWriteITCase.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers;
+import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer;
+import org.apache.flink.connector.dynamodb.testutils.TestItem;
+import org.apache.flink.connector.dynamodb.util.DockerImageVersions;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/**
+ * Integration test for batch and single writes in {@link DynamoDbSink} using {@link DynamoDbBeanElementConverter}.
+ */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+public class DynamoDbSinkConditionalWriteITCase {
+
+ private static final String PARTITION_KEY = "pk";
+ private static final String SORT_KEY = "sk";
+ private static DynamoDBHelpers dynamoDBHelpers;
+ private static String testTableName;
+
+ @Container
+ public static final DynamoDbContainer LOCALSTACK =
+ new DynamoDbContainer(DockerImageName.parse(DockerImageVersions.DYNAMODB))
+ .withNetwork(Network.newNetwork())
+ .withNetworkAliases("dynamodb");
+
+ @BeforeEach
+ public void setup() throws URISyntaxException {
+ testTableName = "test_" + UUID.randomUUID().toString().replace("-", "");
+ dynamoDBHelpers = new DynamoDBHelpers(LOCALSTACK.getHostClient());
+ }
+
+ @Test
+ public void testBatchPutWithBeanConverter() throws Exception {
+ dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+
+ List items =
+ Arrays.asList(
+ new TestItem("1", "a", "data1", "0"),
+ new TestItem("2", "b", "data2", "0"));
+
+ StreamExecutionEnvironment env = createEnv();
+ DataStream stream = env.fromCollection(items);
+ stream.sinkTo(buildSink(new DynamoDbBeanElementConverter<>(TestItem.class)));
+ env.execute("Batch PUT via BeanConverter");
+
+ Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName)).isEqualTo(2);
+ }
+
+ @Test
+ public void testUpdateWithBeanUpdateConverter() throws Exception {
+ dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+
+ List items =
+ Arrays.asList(
+ new TestItem("1", "a", "data1", "0"),
+ new TestItem("2", "b", "data2", "0"));
+
+ StreamExecutionEnvironment env = createEnv();
+ DataStream putStream = env.fromCollection(items);
+ putStream.sinkTo(buildSink(new DynamoDbBeanElementConverter<>(TestItem.class)));
+ env.execute("Initial PUT");
+
+ Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName)).isEqualTo(2);
+
+ Map expressionNames = Collections.singletonMap("#c", "counter");
+ Map expressionValues =
+ Collections.singletonMap(":val", AttributeValue.builder().s("42").build());
+
+ env = createEnv();
+ DataStream updateStream =
+ env.fromCollection(
+ Collections.singletonList(new TestItem("1", "a", null, null)));
+ updateStream.sinkTo(
+ buildSink(
+ DynamoDbBeanElementConverter.builder(TestItem.class)
+ .setType(DynamoDbWriteRequestType.UPDATE)
+ .setUpdateExpression("SET #c = :val")
+ .setExpressionAttributeNames(expressionNames)
+ .setExpressionAttributeValues(expressionValues)
+ .build()));
+ env.execute("UPDATE via BeanConverter");
+
+ Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName)).isEqualTo(2);
+ Assertions.assertThat(
+ dynamoDBHelpers.containsAttributeValue(testTableName, "counter", "42"))
+ .isTrue();
+ }
+
+ @Test
+ public void testConditionalPutWithBeanConverterPreventsOverwrite() throws Exception {
+ dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+
+ StreamExecutionEnvironment env = createEnv();
+ DataStream putStream =
+ env.fromCollection(
+ Collections.singletonList(new TestItem("1", "a", "original", "0")));
+ putStream.sinkTo(buildSink(new DynamoDbBeanElementConverter<>(TestItem.class)));
+ env.execute("Initial PUT");
+
+ Map exprNames = new HashMap<>();
+ exprNames.put("#pk", PARTITION_KEY);
+ exprNames.put("#sk", SORT_KEY);
+
+ StreamExecutionEnvironment conditionalEnv = createEnv();
+ DataStream conditionalStream =
+ conditionalEnv.fromCollection(
+ Collections.singletonList(new TestItem("1", "a", "overwritten", "0")));
+ conditionalStream.sinkTo(
+ buildSink(
+ DynamoDbBeanElementConverter.builder(TestItem.class)
+ .setType(DynamoDbWriteRequestType.PUT)
+ .setConditionExpression(
+ "attribute_not_exists(#pk) AND attribute_not_exists(#sk)")
+ .setExpressionAttributeNames(exprNames)
+ .build()));
+
+ Assertions.assertThatExceptionOfType(JobExecutionException.class)
+ .isThrownBy(() -> conditionalEnv.execute("Conditional PUT should fail"))
+ .havingCause()
+ .havingCause()
+ .withMessageContaining("conditional check");
+
+ Assertions.assertThat(
+ dynamoDBHelpers.containsAttributeValue(testTableName, "payload", "original"))
+ .isTrue();
+ }
+
+ @Test
+ public void testConditionalDeleteWithBeanConverter() throws Exception {
+ dynamoDBHelpers.createTable(testTableName, PARTITION_KEY, SORT_KEY);
+
+ List items =
+ Arrays.asList(
+ new TestItem("1", "a", "keep", "0"),
+ new TestItem("2", "b", "remove", "0"));
+
+ StreamExecutionEnvironment env = createEnv();
+ DataStream putStream = env.fromCollection(items);
+ putStream.sinkTo(buildSink(new DynamoDbBeanElementConverter<>(TestItem.class)));
+ env.execute("Insert items");
+
+ Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName)).isEqualTo(2);
+
+ env = createEnv();
+ DataStream deleteStream =
+ env.fromCollection(
+ Collections.singletonList(new TestItem("2", "b", null, null)));
+ deleteStream.sinkTo(
+ buildSink(
+ DynamoDbBeanElementConverter.builder(TestItem.class)
+ .setType(DynamoDbWriteRequestType.DELETE)
+ .setConditionExpression("#p = :val")
+ .setExpressionAttributeNames(
+ Collections.singletonMap("#p", "payload"))
+ .setExpressionAttributeValues(
+ Collections.singletonMap(
+ ":val",
+ AttributeValue.builder()
+ .s("remove")
+ .build()))
+ .build()));
+ env.execute("Conditional DELETE");
+
+ Assertions.assertThat(dynamoDBHelpers.getItemsCount(testTableName)).isEqualTo(1);
+ Assertions.assertThat(
+ dynamoDBHelpers.containsAttributeValue(testTableName, "payload", "keep"))
+ .isTrue();
+ }
+
+ private DynamoDbSink buildSink(
+ ElementConverter converter) {
+ return DynamoDbSink.builder()
+ .setElementConverter(converter)
+ .setMaxTimeInBufferMS(1000)
+ .setMaxInFlightRequests(1)
+ .setMaxBatchSize(25)
+ .setFailOnError(true)
+ .setMaxBufferedRequests(1000)
+ .setTableName(testTableName)
+ .setOverwriteByPartitionKeys(Collections.emptyList())
+ .setDynamoDbProperties(getProperties())
+ .build();
+ }
+
+ private StreamExecutionEnvironment createEnv() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ Configuration config = new Configuration();
+ config.set(
+ RestartStrategyOptions.RESTART_STRATEGY,
+ RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY.getMainValue());
+ env.configure(config);
+ env.setParallelism(1);
+ return env;
+ }
+
+ private Properties getProperties() {
+ Properties properties = new Properties();
+ properties.setProperty(AWS_ENDPOINT, LOCALSTACK.getHostEndpointUrl());
+ properties.setProperty(AWS_ACCESS_KEY_ID, LOCALSTACK.getAccessKey());
+ properties.setProperty(AWS_SECRET_ACCESS_KEY, LOCALSTACK.getSecretKey());
+ properties.setProperty(AWS_REGION, LOCALSTACK.getRegion().toString());
+ properties.setProperty(TRUST_ALL_CERTIFICATES, "true");
+ properties.setProperty(HTTP_PROTOCOL_VERSION, "HTTP1_1");
+ return properties;
+ }
+}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
index 44adf5cf..63551f2b 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
@@ -31,11 +31,17 @@
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import software.amazon.awssdk.services.sts.model.StsException;
@@ -321,6 +327,235 @@ public void testClientClosesWhenWriterIsClosed() throws IOException {
assertThat(testAsyncDynamoDbClientProvider.getCloseCount()).isEqualTo(1);
}
+ @Test
+ public void testUpdateRequestUsesUpdateItemApi() throws Exception {
+ TrackingDynamoDbAsyncClient trackingClient = new TrackingDynamoDbAsyncClient();
+ DynamoDbSinkWriter