Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,27 @@

import software.amazon.awssdk.enhanced.dynamodb.mapper.BeanTableSchema;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

import java.util.Collection;
import java.util.Map;

/**
* A generic {@link ElementConverter} that uses the dynamodb-enhanced client to build a {@link
* DynamoDbWriteRequest} from a POJO annotated with {@link DynamoDbBean}.
*
* <p>Supports all three write request types:
*
* <ul>
* <li>{@link DynamoDbWriteRequestType#PUT} - maps the full POJO to an item
* <li>{@link DynamoDbWriteRequestType#DELETE} - extracts only the primary key from the POJO
* <li>{@link DynamoDbWriteRequestType#UPDATE} - extracts the primary key from the POJO and uses
* the provided update expression
* </ul>
*
* <p>Condition expressions can be set for any type to enable conditional writes via individual API
* calls instead of {@code BatchWriteItem}.
*
* @param <InputT> The type of the {@link DynamoDbBean} to convert into {@link DynamoDbWriteRequest}
*/
@PublicEvolving
Expand All @@ -40,15 +56,43 @@ public class DynamoDbBeanElementConverter<InputT>

private final Class<InputT> recordType;
private final boolean ignoreNulls;
private final DynamoDbWriteRequestType type;
private final String updateExpression;
private final String conditionExpression;
private final Map<String, String> expressionAttributeNames;
private final Map<String, AttributeValue> expressionAttributeValues;
private transient BeanTableSchema<InputT> tableSchema;
private transient Collection<String> keyAttributeNames;

public DynamoDbBeanElementConverter(final Class<InputT> recordType) {
this(recordType, false);
}

public DynamoDbBeanElementConverter(final Class<InputT> recordType, final boolean ignoreNulls) {
this(recordType, ignoreNulls, DynamoDbWriteRequestType.PUT, null, null, null, null);
}

private DynamoDbBeanElementConverter(
final Class<InputT> recordType,
final boolean ignoreNulls,
final DynamoDbWriteRequestType type,
final String updateExpression,
final String conditionExpression,
final Map<String, String> expressionAttributeNames,
final Map<String, AttributeValue> expressionAttributeValues) {
this.recordType = recordType;
this.ignoreNulls = ignoreNulls;
this.type = Preconditions.checkNotNull(type, "Type must not be null");
this.updateExpression = updateExpression;
this.conditionExpression = conditionExpression;
this.expressionAttributeNames = expressionAttributeNames;
this.expressionAttributeValues = expressionAttributeValues;

if (type == DynamoDbWriteRequestType.UPDATE) {
Preconditions.checkNotNull(
updateExpression,
"Update expression must not be null for UPDATE requests.");
}

// Attempt to create a table schema now to bubble up errors before starting job
createTableSchema(recordType);
Expand All @@ -57,18 +101,106 @@ public DynamoDbBeanElementConverter(final Class<InputT> recordType, final boolea
@Override
public DynamoDbWriteRequest apply(InputT element, SinkWriter.Context context) {
Preconditions.checkNotNull(tableSchema, "Table schema has not been initialized");
return new DynamoDbWriteRequest.Builder()
.setType(DynamoDbWriteRequestType.PUT)
.setItem(tableSchema.itemToMap(element, ignoreNulls))
.build();
Map<String, AttributeValue> item =
type == DynamoDbWriteRequestType.PUT
? tableSchema.itemToMap(element, ignoreNulls)
: tableSchema.itemToMap(element, keyAttributeNames);
DynamoDbWriteRequest.Builder builder =
new DynamoDbWriteRequest.Builder().setType(type).setItem(item);
if (updateExpression != null) {
builder.setUpdateExpression(updateExpression);
}
if (conditionExpression != null) {
builder.setConditionExpression(conditionExpression);
}
if (expressionAttributeNames != null) {
builder.setExpressionAttributeNames(expressionAttributeNames);
}
if (expressionAttributeValues != null) {
builder.setExpressionAttributeValues(expressionAttributeValues);
}
return builder.build();
}

@Override
public void open(WriterInitContext context) {
tableSchema = createTableSchema(recordType);
// DELETE and UPDATE only need the primary key attributes from the POJO, not the full item.
// PUT uses the complete item map.
if (type == DynamoDbWriteRequestType.DELETE || type == DynamoDbWriteRequestType.UPDATE) {
keyAttributeNames = tableSchema.tableMetadata().primaryKeys();
}
}

private BeanTableSchema<InputT> createTableSchema(final Class<InputT> recordType) {
return BeanTableSchema.create(recordType);
}

/** Builder for {@link DynamoDbBeanElementConverter}. */
public static class ElementConverterBuilder<InputT> {
private final Class<InputT> recordType;
private boolean ignoreNulls = false;
private DynamoDbWriteRequestType type = DynamoDbWriteRequestType.PUT;
private String updateExpression;
private String conditionExpression;
private Map<String, String> expressionAttributeNames;
private Map<String, AttributeValue> expressionAttributeValues;

private ElementConverterBuilder(Class<InputT> recordType) {
this.recordType = recordType;
}

public ElementConverterBuilder<InputT> setIgnoreNulls(boolean ignoreNulls) {
this.ignoreNulls = ignoreNulls;
return this;
}

public ElementConverterBuilder<InputT> setType(DynamoDbWriteRequestType type) {
this.type = type;
return this;
}

public ElementConverterBuilder<InputT> setUpdateExpression(String updateExpression) {
this.updateExpression = updateExpression;
return this;
}

public ElementConverterBuilder<InputT> setConditionExpression(String conditionExpression) {
this.conditionExpression = conditionExpression;
return this;
}

public ElementConverterBuilder<InputT> setExpressionAttributeNames(
Map<String, String> expressionAttributeNames) {
this.expressionAttributeNames = expressionAttributeNames;
return this;
}

public ElementConverterBuilder<InputT> setExpressionAttributeValues(
Map<String, AttributeValue> expressionAttributeValues) {
this.expressionAttributeValues = expressionAttributeValues;
return this;
}

public DynamoDbBeanElementConverter<InputT> build() {
Preconditions.checkArgument(
type != DynamoDbWriteRequestType.UPDATE || updateExpression != null,
"updateExpression is required for UPDATE type.");
Preconditions.checkArgument(
type == DynamoDbWriteRequestType.UPDATE || updateExpression == null,
"updateExpression is only allowed for UPDATE type.");
return new DynamoDbBeanElementConverter<>(
recordType,
ignoreNulls,
type,
updateExpression,
conditionExpression,
expressionAttributeNames,
expressionAttributeValues);
}
}

public static <InputT> ElementConverterBuilder<InputT> builder(Class<InputT> recordType) {
return new ElementConverterBuilder<>(recordType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@
* single batch takes precedence.
* </ul>
*
* <p>The sink supports three write request types via {@link DynamoDbWriteRequestType}:
*
* <ul>
* <li>{@code PUT} - writes a full item using {@code BatchWriteItem} or {@code PutItem} (when a
* condition expression is set)
* <li>{@code DELETE} - deletes an item by key using {@code BatchWriteItem} or {@code DeleteItem}
* (when a condition expression is set)
* <li>{@code UPDATE} - updates specific attributes of an item using {@code UpdateItem}, always
* processed as individual requests since {@code BatchWriteItem} does not support updates
* </ul>
*
* <p>Requests without condition expressions (PUT/DELETE) are sent via {@code BatchWriteItem} for
* efficiency. Requests with condition expressions and all UPDATE requests are sent as individual
* {@code PutItem}, {@code DeleteItem}, or {@code UpdateItem} calls. Both paths execute
* concurrently.
*
* <p>Please see the writer implementation in {@link DynamoDbSinkWriter}
*
* @param <InputT> Type of the elements handled by this sink
Expand Down
Loading