NIFI-12212 Upgraded DynamoDB Processors to AWS SDK 2

This closes #7911

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Joe Gresock 2023-10-19 16:24:01 -04:00 committed by exceptionfactory
parent 582c3ac7cc
commit ec884ac091
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
18 changed files with 1214 additions and 879 deletions

View File

@ -51,6 +51,10 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>lambda</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sns</artifactId>
@ -100,10 +104,6 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>

View File

@ -16,17 +16,7 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -35,9 +25,13 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import org.apache.nifi.processors.aws.v2.AbstractAwsSyncProcessor;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
@ -51,7 +45,7 @@ import java.util.Set;
/**
* Base class for NiFi dynamo db related processors
*/
public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonDynamoDBClient> {
public abstract class AbstractDynamoDBProcessor extends AbstractAwsSyncProcessor<DynamoDbClient, DynamoDbClientBuilder> {
public static final Relationship REL_UNPROCESSED = new Relationship.Builder().name("unprocessed")
.description("FlowFiles are routed to unprocessed relationship when DynamoDB is not able to process "
@ -68,7 +62,6 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
public static final String DYNAMODB_ERROR_EXCEPTION_MESSAGE = "dynamodb.error.exception.message";
public static final String DYNAMODB_ERROR_CODE = "dynamodb.error.code";
public static final String DYNAMODB_ERROR_MESSAGE = "dynamodb.error.message";
public static final String DYNAMODB_ERROR_TYPE = "dynamodb.error.type";
public static final String DYNAMODB_ERROR_SERVICE = "dynamodb.error.service";
public static final String DYNAMODB_ERROR_RETRYABLE = "dynamodb.error.retryable";
public static final String DYNAMODB_ERROR_REQUEST_ID = "dynamodb.error.request.id";
@ -142,7 +135,7 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("The Json document to be retrieved from the dynamodb item")
.description("The Json document to be retrieved from the dynamodb item ('s' type in the schema)")
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
@ -163,8 +156,6 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
.defaultValue(Charset.defaultCharset().name())
.build();
protected volatile DynamoDB dynamoDB;
public static final Set<Relationship> dynamoDBrelationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_UNPROCESSED)));
@ -173,61 +164,24 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
return dynamoDBrelationships;
}
/**
* Create client using credentials provider. This is the preferred way for creating clients
*/
@Override
protected AmazonDynamoDBClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final Region region, final ClientConfiguration config,
final AwsClientBuilder.EndpointConfiguration endpointConfiguration) {
getLogger().debug("Creating client with credentials provider");
return (AmazonDynamoDBClient) AmazonDynamoDBClient.builder()
.withClientConfiguration(config)
.withCredentials(credentialsProvider)
.withEndpointConfiguration(endpointConfiguration)
.withRegion(region.getName())
.build();
protected DynamoDbClientBuilder createClientBuilder(final ProcessContext context) {
return DynamoDbClient.builder();
}
protected AttributeValue getAttributeValue(final ProcessContext context, final PropertyDescriptor type, final PropertyDescriptor value, final Map<String, String> attributes) {
final AttributeValue.Builder builder = AttributeValue.builder();
final String propertyValue = context.getProperty(value).evaluateAttributeExpressions(attributes).getValue();
if (propertyValue == null) {
return null;
}
protected Object getValue(final ProcessContext context, final PropertyDescriptor type, final PropertyDescriptor value, final Map<String, String> attributes) {
if ( context.getProperty(type).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) {
return context.getProperty(value).evaluateAttributeExpressions(attributes).getValue();
if (context.getProperty(type).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) {
builder.s(propertyValue);
} else {
return new BigDecimal(context.getProperty(value).evaluateAttributeExpressions(attributes).getValue());
}
}
protected Object getAttributeValue(final ProcessContext context, final PropertyDescriptor propertyType, final AttributeValue value) {
if ( context.getProperty(propertyType).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) {
if ( value == null ) return null;
else return value.getS();
} else {
if ( value == null ) return null;
else return new BigDecimal(value.getN());
}
}
protected DynamoDB getDynamoDB(final AmazonDynamoDBClient client) {
return new DynamoDB(client);
}
protected synchronized DynamoDB getDynamoDB(ProcessContext context) {
if (dynamoDB == null) {
dynamoDB = getDynamoDB(getClient(context));
}
return dynamoDB;
}
protected Object getValue(Map<String, AttributeValue> item, String keyName, String valueType) {
if ( ALLOWABLE_VALUE_STRING.getValue().equals(valueType)) {
AttributeValue val = item.get(keyName);
if ( val == null ) return val;
else return val.getS();
} else {
AttributeValue val = item.get(keyName);
if ( val == null ) return val;
else return val.getN();
builder.n(propertyValue);
}
return builder.build();
}
protected List<FlowFile> processException(final ProcessSession session, List<FlowFile> flowFiles, Exception exception) {
@ -239,32 +193,31 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
return failedFlowFiles;
}
protected List<FlowFile> processClientException(final ProcessSession session, List<FlowFile> flowFiles,
AmazonClientException exception) {
List<FlowFile> failedFlowFiles = new ArrayList<>();
protected List<FlowFile> processSdkException(final ProcessSession session, final List<FlowFile> flowFiles,
final SdkException exception) {
final List<FlowFile> failedFlowFiles = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
Map<String,String> attributes = new HashMap<>();
attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage());
attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.retryable()));
flowFile = session.putAllAttributes(flowFile, attributes);
failedFlowFiles.add(flowFile);
}
return failedFlowFiles;
}
protected List<FlowFile> processServiceException(final ProcessSession session, List<FlowFile> flowFiles,
AmazonServiceException exception) {
List<FlowFile> failedFlowFiles = new ArrayList<>();
protected List<FlowFile> processServiceException(final ProcessSession session, final List<FlowFile> flowFiles,
final AwsServiceException exception) {
final List<FlowFile> failedFlowFiles = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
Map<String,String> attributes = new HashMap<>();
attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
attributes.put(DYNAMODB_ERROR_CODE, exception.getErrorCode() );
attributes.put(DYNAMODB_ERROR_MESSAGE, exception.getErrorMessage() );
attributes.put(DYNAMODB_ERROR_TYPE, exception.getErrorType().name() );
attributes.put(DYNAMODB_ERROR_SERVICE, exception.getServiceName() );
attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
attributes.put(DYNAMODB_ERROR_REQUEST_ID, exception.getRequestId() );
attributes.put(DYNAMODB_ERROR_STATUS_CODE, Integer.toString(exception.getStatusCode()) );
attributes.put(DYNAMODB_ERROR_CODE, exception.awsErrorDetails().errorCode() );
attributes.put(DYNAMODB_ERROR_MESSAGE, exception.awsErrorDetails().errorMessage() );
attributes.put(DYNAMODB_ERROR_SERVICE, exception.awsErrorDetails().serviceName() );
attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.retryable()));
attributes.put(DYNAMODB_ERROR_REQUEST_ID, exception.requestId() );
attributes.put(DYNAMODB_ERROR_STATUS_CODE, Integer.toString(exception.statusCode()) );
flowFile = session.putAllAttributes(flowFile, attributes);
failedFlowFiles.add(flowFile);
}
@ -278,19 +231,24 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
* @param hashKeyValue the items hash key value
* @param rangeKeyValue the items hash key value
*/
protected void sendUnprocessedToUnprocessedRelationship(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, Object hashKeyValue, Object rangeKeyValue) {
ItemKeys itemKeys = new ItemKeys(hashKeyValue, rangeKeyValue);
protected void sendUnprocessedToUnprocessedRelationship(final ProcessSession session, final Map<ItemKeys, FlowFile> keysToFlowFileMap,
final AttributeValue hashKeyValue, final AttributeValue rangeKeyValue) {
final ItemKeys itemKeys = new ItemKeys(hashKeyValue, rangeKeyValue);
FlowFile flowFile = keysToFlowFileMap.get(itemKeys);
if (flowFile == null) {
return;
}
flowFile = session.putAttribute(flowFile, DYNAMODB_KEY_ERROR_UNPROCESSED, itemKeys.toString());
session.transfer(flowFile,REL_UNPROCESSED);
session.transfer(flowFile, REL_UNPROCESSED);
getLogger().error("Unprocessed key " + itemKeys + " for flow file " + flowFile);
keysToFlowFileMap.remove(itemKeys);
}
protected boolean isRangeKeyValueConsistent(final String rangeKeyName, final Object rangeKeyValue, final ProcessSession session, FlowFile flowFile) {
protected boolean isRangeKeyValueConsistent(final String rangeKeyName, final AttributeValue rangeKeyValue, final ProcessSession session, FlowFile flowFile) {
try {
validateRangeKeyValue(rangeKeyName, rangeKeyValue);
} catch (final IllegalArgumentException e) {
@ -304,14 +262,13 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
return true;
}
protected void validateRangeKeyValue(final String rangeKeyName, final Object rangeKeyValue) {
protected void validateRangeKeyValue(final String rangeKeyName, final AttributeValue rangeKeyValue) {
boolean isRangeNameBlank = StringUtils.isBlank(rangeKeyName);
boolean isRangeValueNull = rangeKeyValue == null;
boolean isConsistent = true;
if (!isRangeNameBlank && (isRangeValueNull || StringUtils.isBlank(rangeKeyValue.toString()))) {
if (!isRangeNameBlank && isBlank(rangeKeyValue)) {
isConsistent = false;
}
if (isRangeNameBlank && (!isRangeValueNull && !StringUtils.isBlank(rangeKeyValue.toString()))) {
if (isRangeNameBlank && !isBlank(rangeKeyValue)) {
isConsistent = false;
}
if (!isConsistent) {
@ -319,8 +276,7 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
}
}
protected boolean isHashKeyValueConsistent(String hashKeyName, Object hashKeyValue, ProcessSession session,
FlowFile flowFile) {
protected boolean isHashKeyValueConsistent(final String hashKeyName, final AttributeValue hashKeyValue, final ProcessSession session, FlowFile flowFile) {
boolean isConsistent = true;
@ -337,15 +293,17 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
}
protected void validateHashKeyValue(final Object hashKeyValue) {
if (hashKeyValue == null || StringUtils.isBlank(hashKeyValue.toString())) {
protected void validateHashKeyValue(final AttributeValue hashKeyValue) {
if (isBlank(hashKeyValue)) {
throw new IllegalArgumentException(String.format("Hash key value is required. Provided value was '%s'", hashKeyValue));
}
}
@OnStopped
public void onStopped() {
this.dynamoDB = null;
/**
* @param attributeValue At attribute value
* @return True if the AttributeValue is null or both 's' and 'n' are null or blank
*/
protected static boolean isBlank(final AttributeValue attributeValue) {
return attributeValue == null || (StringUtils.isBlank(attributeValue.s()) && StringUtils.isBlank(attributeValue.n()));
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.util.Objects;
/**
@ -23,10 +25,10 @@ import java.util.Objects;
*/
class ItemKeys {
protected Object hashKey = "";
protected Object rangeKey = "";
protected AttributeValue hashKey = AttributeValue.fromS("");
protected AttributeValue rangeKey = AttributeValue.fromS("");
public ItemKeys(final Object hashKey, final Object rangeKey) {
public ItemKeys(final AttributeValue hashKey, final AttributeValue rangeKey) {
if (hashKey != null) {
this.hashKey = hashKey;
}

View File

@ -16,15 +16,6 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@ -39,7 +30,18 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import software.amazon.awssdk.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -57,7 +59,6 @@ import java.util.Map;
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
@ -81,6 +82,7 @@ public class DeleteDynamoDB extends AbstractDynamoDBProcessor {
RANGE_KEY_VALUE_TYPE,
BATCH_SIZE,
TIMEOUT,
ENDPOINT_OVERRIDE,
SSL_CONTEXT_SERVICE,
PROXY_CONFIGURATION_SERVICE);
@ -91,79 +93,84 @@ public class DeleteDynamoDB extends AbstractDynamoDBProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger());
final List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger());
if (flowFiles == null || flowFiles.size() == 0) {
return;
}
Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
final Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
final String table = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
final String hashKeyName = context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
final String hashKeyValueType = context.getProperty(HASH_KEY_VALUE_TYPE).getValue();
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
final String rangeKeyValueType = context.getProperty(RANGE_KEY_VALUE_TYPE).getValue();
TableWriteItems tableWriteItems = new TableWriteItems(table);
final Map<String, Collection<WriteRequest>> tableNameRequestItemsMap = new HashMap<>();
final Collection<WriteRequest> requestItems = new ArrayList<>();
tableNameRequestItemsMap.put(table, requestItems);
for (FlowFile flowFile : flowFiles) {
final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile.getAttributes());
final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
for (final FlowFile flowFile : flowFiles) {
final AttributeValue hashKeyValue = getAttributeValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile.getAttributes());
final AttributeValue rangeKeyValue = getAttributeValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) {
if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) {
continue;
}
if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile) ) {
if (!isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile) ) {
continue;
}
if ( rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString()) ) {
tableWriteItems.addHashOnlyPrimaryKeysToDelete(hashKeyName, hashKeyValue);
} else {
tableWriteItems.addHashAndRangePrimaryKeyToDelete(hashKeyName,
hashKeyValue, rangeKeyName, rangeKeyValue);
final Map<String, AttributeValue> keyMap = new HashMap<>();
keyMap.put(hashKeyName, hashKeyValue);
if (!isBlank(rangeKeyValue)) {
keyMap.put(rangeKeyName, rangeKeyValue);
}
final DeleteRequest deleteRequest = DeleteRequest.builder()
.key(keyMap)
.build();
final WriteRequest writeRequest = WriteRequest.builder().deleteRequest(deleteRequest).build();
requestItems.add(writeRequest);
keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile);
}
if ( keysToFlowFileMap.isEmpty() ) {
if (keysToFlowFileMap.isEmpty()) {
return;
}
final DynamoDB dynamoDB = getDynamoDB(context);
final DynamoDbClient client = getClient(context);
try {
final BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(tableWriteItems);
final BatchWriteItemResult result = outcome.getBatchWriteItemResult();
final BatchWriteItemRequest batchWriteItemRequest = BatchWriteItemRequest.builder().requestItems(tableNameRequestItemsMap).build();
final BatchWriteItemResponse response = client.batchWriteItem(batchWriteItemRequest);
// Handle unprocessed items
final List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
if (unprocessedItems != null && unprocessedItems.size() > 0) {
for (final WriteRequest request : unprocessedItems) {
final Map<String, AttributeValue> item = request.getDeleteRequest().getKey();
final Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
final Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);
if (CollectionUtils.isNotEmpty(response.unprocessedItems())) {
// Handle unprocessed items
final List<WriteRequest> unprocessedItems = response.unprocessedItems().get(table);
for (final WriteRequest writeRequest : unprocessedItems) {
final Map<String, AttributeValue> item = writeRequest.deleteRequest().key();
final AttributeValue hashKeyValue = item.get(hashKeyName);
final AttributeValue rangeKeyValue = item.get(rangeKeyName);
sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
}
}
// All non unprocessed items are successful
for (FlowFile flowFile : keysToFlowFileMap.values()) {
for (final FlowFile flowFile : keysToFlowFileMap.values()) {
getLogger().debug("Successfully deleted item from dynamodb : " + table);
session.transfer(flowFile,REL_SUCCESS);
session.transfer(flowFile, REL_SUCCESS);
}
} catch(AmazonServiceException exception) {
} catch (final AwsServiceException exception) {
getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
} catch(AmazonClientException exception) {
getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processClientException(session, flowFiles, exception);
} catch (final SdkException exception) {
getLogger().error("Could not process flowFiles due to SDK exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processSdkException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
} catch(Exception exception) {
} catch (final Exception exception) {
getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);

View File

@ -16,15 +16,6 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@ -43,10 +34,20 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes;
import software.amazon.awssdk.utils.CollectionUtils;
import java.io.ByteArrayInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -69,7 +70,6 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
@ -80,6 +80,10 @@ import java.util.stream.Collectors;
@ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items range key value" ),
})
public class GetDynamoDB extends AbstractDynamoDBProcessor {
private static final PropertyDescriptor DOCUMENT_CHARSET = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractDynamoDBProcessor.DOCUMENT_CHARSET)
.required(false)
.build();
public static final List<PropertyDescriptor> properties = List.of(
TABLE,
@ -94,6 +98,7 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
RANGE_KEY_VALUE_TYPE,
BATCH_SIZE,
TIMEOUT,
ENDPOINT_OVERRIDE,
SSL_CONTEXT_SERVICE,
PROXY_CONFIGURATION_SERVICE);
@ -120,10 +125,10 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
final String table = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
final String jsonDocument = context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue();
TableKeysAndAttributes tableKeysAndAttributes;
BatchGetItemRequest batchGetItemRequest;
try {
tableKeysAndAttributes = getTableKeysAndAttributes(context, attributes);
batchGetItemRequest = getBatchGetItemRequest(context, attributes);
results.add(new ConfigVerificationResult.Builder()
.outcome(Outcome.SUCCESSFUL)
.verificationStepName("Configure DynamoDB BatchGetItems Request")
@ -139,7 +144,7 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
return results;
}
if (tableKeysAndAttributes.getPrimaryKeys() == null || tableKeysAndAttributes.getPrimaryKeys().isEmpty()) {
if (!batchGetItemRequest.hasRequestItems()) {
results.add(new ConfigVerificationResult.Builder()
.outcome(Outcome.SKIPPED)
.verificationStepName("Get DynamoDB Items")
@ -147,27 +152,36 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
.build());
} else {
try {
final DynamoDB dynamoDB = getDynamoDB(getClient(context));
final DynamoDbClient client = getClient(context);
int totalCount = 0;
int jsonDocumentCount = 0;
BatchGetItemOutcome result = dynamoDB.batchGetItem(tableKeysAndAttributes);
final BatchGetItemResponse response = client.batchGetItem(batchGetItemRequest);
// Handle processed items and get the json document
final List<Item> items = result.getTableItems().get(table);
for (final Item item : items) {
totalCount++;
if (item.get(jsonDocument) != null) {
jsonDocumentCount++;
if (!response.hasResponses()) {
results.add(new ConfigVerificationResult.Builder()
.outcome(Outcome.SUCCESSFUL)
.verificationStepName("Get DynamoDB Items")
.explanation(String.format("Successfully issued request, although no items were returned from DynamoDB"))
.build());
} else {
// Handle processed items and get the json document
final List<Map<String, AttributeValue>> items = response.responses().get(table);
if (items != null) {
for (final Map<String, AttributeValue> item : items) {
totalCount++;
if (item.get(jsonDocument) != null && item.get(jsonDocument).s() != null) {
jsonDocumentCount++;
}
}
}
results.add(new ConfigVerificationResult.Builder()
.outcome(Outcome.SUCCESSFUL)
.verificationStepName("Get DynamoDB Items")
.explanation(String.format("Successfully retrieved %s items, including %s JSON documents, from DynamoDB", totalCount, jsonDocumentCount))
.build());
}
results.add(new ConfigVerificationResult.Builder()
.outcome(Outcome.SUCCESSFUL)
.verificationStepName("Get DynamoDB Items")
.explanation(String.format("Successfully retrieved %s items, including %s JSON documents, from DynamoDB", totalCount, jsonDocumentCount))
.build());
} catch (final Exception e) {
verificationLogger.error("Failed to retrieve items from DynamoDB", e);
@ -189,11 +203,11 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
return;
}
final Map<ItemKeys,FlowFile> keysToFlowFileMap = getKeysToFlowFileMap(context, session, flowFiles);
final Map<ItemKeys, FlowFile> keysToFlowFileMap = getKeysToFlowFileMap(context, session, flowFiles);
final TableKeysAndAttributes tableKeysAndAttributes;
final BatchGetItemRequest request;
try {
tableKeysAndAttributes = getTableKeysAndAttributes(context, flowFiles.stream()
request = getBatchGetItemRequest(context, flowFiles.stream()
.map(FlowFile::getAttributes).collect(Collectors.toList()).toArray(new Map[0]));
} catch (final IllegalArgumentException e) {
getLogger().error(e.getMessage(), e);
@ -209,35 +223,39 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
return;
}
final DynamoDB dynamoDB = getDynamoDB(context);
final DynamoDbClient client = getClient(context);
try {
BatchGetItemOutcome result = dynamoDB.batchGetItem(tableKeysAndAttributes);
final BatchGetItemResponse response = client.batchGetItem(request);
// Handle processed items and get the json document
final List<Item> items = result.getTableItems().get(table);
for (final Item item : items) {
final ItemKeys itemKeys = new ItemKeys(item.get(hashKeyName), item.get(rangeKeyName));
FlowFile flowFile = keysToFlowFileMap.get(itemKeys);
if (CollectionUtils.isNotEmpty(response.responses())) {
// Handle processed items and get the json document
final List<Map<String, AttributeValue>> items = response.responses().get(table);
for (final Map<String, AttributeValue> item : items) {
final ItemKeys itemKeys = new ItemKeys(item.get(hashKeyName), item.get(rangeKeyName));
FlowFile flowFile = keysToFlowFileMap.get(itemKeys);
if (item.get(jsonDocument) != null) {
ByteArrayInputStream bais = new ByteArrayInputStream(item.getJSON(jsonDocument).getBytes());
flowFile = session.importFrom(bais, flowFile);
if (item.get(jsonDocument) != null && item.get(jsonDocument).s() != null) {
final String charsetPropertyValue = context.getProperty(DOCUMENT_CHARSET).getValue();
final String charset = charsetPropertyValue == null ? Charset.defaultCharset().name() : charsetPropertyValue;
final ByteArrayInputStream bais = new ByteArrayInputStream(item.get(jsonDocument).s().getBytes(charset));
flowFile = session.importFrom(bais, flowFile);
}
session.transfer(flowFile, REL_SUCCESS);
keysToFlowFileMap.remove(itemKeys);
}
session.transfer(flowFile,REL_SUCCESS);
keysToFlowFileMap.remove(itemKeys);
}
// Handle unprocessed keys
final Map<String, KeysAndAttributes> unprocessedKeys = result.getUnprocessedKeys();
if ( unprocessedKeys != null && unprocessedKeys.size() > 0) {
final Map<String, KeysAndAttributes> unprocessedKeys = response.unprocessedKeys();
if (CollectionUtils.isNotEmpty(unprocessedKeys)) {
final KeysAndAttributes keysAndAttributes = unprocessedKeys.get(table);
final List<Map<String, AttributeValue>> keys = keysAndAttributes.getKeys();
final List<Map<String, AttributeValue>> keys = keysAndAttributes.keys();
for (final Map<String,AttributeValue> unprocessedKey : keys) {
final Object hashKeyValue = getAttributeValue(context, HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName));
final Object rangeKeyValue = getAttributeValue(context, RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName));
for (final Map<String, AttributeValue> unprocessedKey : keys) {
final AttributeValue hashKeyValue = unprocessedKey.get(hashKeyName);
final AttributeValue rangeKeyValue = unprocessedKey.get(rangeKeyName);
sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
}
}
@ -246,17 +264,17 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
for (final ItemKeys key : keysToFlowFileMap.keySet()) {
FlowFile flowFile = keysToFlowFileMap.get(key);
flowFile = session.putAttribute(flowFile, DYNAMODB_KEY_ERROR_NOT_FOUND, DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE + key.toString() );
session.transfer(flowFile,REL_NOT_FOUND);
session.transfer(flowFile, REL_NOT_FOUND);
keysToFlowFileMap.remove(key);
}
} catch(final AmazonServiceException exception) {
} catch (final AwsServiceException exception) {
getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
} catch(final AmazonClientException exception) {
getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processClientException(session, flowFiles, exception);
} catch (final SdkException exception) {
getLogger().error("Could not process flowFiles due to SDK exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processSdkException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
} catch(final Exception exception) {
getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage());
@ -272,8 +290,8 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
for (final FlowFile flowFile : flowFiles) {
final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile.getAttributes());
final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
final AttributeValue hashKeyValue = getAttributeValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile.getAttributes());
final AttributeValue rangeKeyValue = getAttributeValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) {
continue;
@ -288,27 +306,30 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
return keysToFlowFileMap;
}
private TableKeysAndAttributes getTableKeysAndAttributes(final ProcessContext context, final Map<String, String>... attributes) {
private BatchGetItemRequest getBatchGetItemRequest(final ProcessContext context, final Map<String, String>... attributes) {
final String table = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
final TableKeysAndAttributes tableKeysAndAttributes = new TableKeysAndAttributes(table);
final Collection<Map<String, AttributeValue>> keys = new HashSet<>();
final String hashKeyName = context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
for (final Map<String, String> attributeMap : attributes) {
final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, attributeMap);
final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, attributeMap);
final Map<String, AttributeValue> keyMap = new HashMap<>();
final AttributeValue hashKeyValue = getAttributeValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, attributeMap);
final AttributeValue rangeKeyValue = getAttributeValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, attributeMap);
validateHashKeyValue(hashKeyValue);
validateRangeKeyValue(rangeKeyName, rangeKeyValue);
if (rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString())) {
tableKeysAndAttributes.addHashOnlyPrimaryKey(hashKeyName, hashKeyValue);
} else {
tableKeysAndAttributes.addHashAndRangePrimaryKey(hashKeyName, hashKeyValue, rangeKeyName, rangeKeyValue);
keyMap.put(hashKeyName, hashKeyValue);
if (!isBlank(rangeKeyValue)) {
keyMap.put(rangeKeyName, rangeKeyValue);
}
keys.add(keyMap);
}
return tableKeysAndAttributes;
return BatchGetItemRequest.builder()
.requestItems(Map.of(table, KeysAndAttributes.builder().keys(keys).build()))
.build();
}
}

View File

@ -16,17 +16,7 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@ -43,8 +33,18 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -64,7 +64,6 @@ import java.util.Map;
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
@ -92,6 +91,7 @@ public class PutDynamoDB extends AbstractDynamoDBProcessor {
DOCUMENT_CHARSET,
BATCH_SIZE,
TIMEOUT,
ENDPOINT_OVERRIDE,
SSL_CONTEXT_SERVICE,
PROXY_CONFIGURATION_SERVICE);
@ -114,17 +114,17 @@ public class PutDynamoDB extends AbstractDynamoDBProcessor {
final String table = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
final String hashKeyName = context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
final String hashKeyValueType = context.getProperty(HASH_KEY_VALUE_TYPE).getValue();
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
final String rangeKeyValueType = context.getProperty(RANGE_KEY_VALUE_TYPE).getValue();
final String jsonDocument = context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue();
final String charset = context.getProperty(DOCUMENT_CHARSET).evaluateAttributeExpressions().getValue();
TableWriteItems tableWriteItems = new TableWriteItems(table);
final Map<String, Collection<WriteRequest>> tableNameRequestItemsMap = new HashMap<>();
final Collection<WriteRequest> requestItems = new ArrayList<>();
tableNameRequestItemsMap.put(table, requestItems);
for (FlowFile flowFile : flowFiles) {
final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile.getAttributes());
final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
final AttributeValue hashKeyValue = getAttributeValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile.getAttributes());
final AttributeValue rangeKeyValue = getAttributeValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) {
continue;
@ -140,17 +140,22 @@ public class PutDynamoDB extends AbstractDynamoDBProcessor {
continue;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
if (rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString())) {
tableWriteItems.addItemToPut(new Item().withKeyComponent(hashKeyName, hashKeyValue)
.withJSON(jsonDocument, IOUtils.toString(baos.toByteArray(), charset)));
} else {
tableWriteItems.addItemToPut(new Item().withKeyComponent(hashKeyName, hashKeyValue)
.withKeyComponent(rangeKeyName, rangeKeyValue)
.withJSON(jsonDocument, IOUtils.toString(baos.toByteArray(), charset)));
final Map<String, AttributeValue> item = new HashMap<>();
item.put(hashKeyName, hashKeyValue);
if (!isBlank(rangeKeyValue)) {
item.put(rangeKeyName, rangeKeyValue);
}
final String jsonText = IOUtils.toString(baos.toByteArray(), charset);
final AttributeValue jsonValue = AttributeValue.builder().s(jsonText).build();
item.put(jsonDocument, jsonValue);
final PutRequest putRequest = PutRequest.builder().item(item).build();
final WriteRequest writeRequest = WriteRequest.builder().putRequest(putRequest).build();
requestItems.add(writeRequest);
keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile);
}
@ -158,36 +163,38 @@ public class PutDynamoDB extends AbstractDynamoDBProcessor {
return;
}
final DynamoDB dynamoDB = getDynamoDB(context);
final DynamoDbClient client = getClient(context);
try {
final BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(tableWriteItems);
final BatchWriteItemResult result = outcome.getBatchWriteItemResult();
final BatchWriteItemRequest batchWriteItemRequest = BatchWriteItemRequest.builder().requestItems(tableNameRequestItemsMap).build();
final BatchWriteItemResponse response = client.batchWriteItem(batchWriteItemRequest);
// Handle unprocessed items
final List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
if (unprocessedItems != null && unprocessedItems.size() > 0) {
for (final WriteRequest request : unprocessedItems) {
final Map<String, AttributeValue> item = request.getPutRequest().getItem();
final Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
final Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);
if (response.unprocessedItems() != null) {
// Handle unprocessed items
final List<WriteRequest> unprocessedItems = response.unprocessedItems().get(table);
if (unprocessedItems != null) {
for (final WriteRequest request : unprocessedItems) {
final Map<String, AttributeValue> item = request.putRequest().item();
final AttributeValue hashKeyValue = item.get(hashKeyName);
final AttributeValue rangeKeyValue = item.get(rangeKeyName);
sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
}
}
}
// Handle any remaining flowfiles
for (FlowFile flowFile : keysToFlowFileMap.values()) {
for (final FlowFile flowFile : keysToFlowFileMap.values()) {
getLogger().debug("Successful posted items to dynamodb : " + table);
session.transfer(flowFile, REL_SUCCESS);
}
} catch (AmazonServiceException exception) {
} catch (final AwsServiceException exception) {
getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
} catch (AmazonClientException exception) {
getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processClientException(session, flowFiles, exception);
} catch (final SdkException exception) {
getLogger().error("Could not process flowFiles due to SDK exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processSdkException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
} catch (Exception exception) {
getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage());

View File

@ -16,13 +16,6 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.SystemResource;
@ -48,8 +41,20 @@ import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SplitRecordSetHandler;
import org.apache.nifi.serialization.SplitRecordSetHandlerException;
import org.apache.nifi.serialization.record.Record;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import software.amazon.awssdk.utils.CollectionUtils;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -74,7 +79,6 @@ import java.util.UUID;
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
@ -179,6 +183,7 @@ public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
SORT_KEY_STRATEGY,
SORT_KEY_FIELD,
TIMEOUT,
ENDPOINT_OVERRIDE,
ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
SSL_CONTEXT_SERVICE
);
@ -197,7 +202,7 @@ public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
final int alreadyProcessedChunks = flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? Integer.parseInt(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;
final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(context), context, flowFile.getAttributes(), getLogger());
final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getClient(context), context, flowFile.getAttributes(), getLogger());
final SplitRecordSetHandler.RecordHandlerResult result;
try (
@ -237,12 +242,12 @@ public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
// More about throughput limitations: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html
context.yield();
session.transfer(outgoingFlowFile, REL_UNPROCESSED);
} else if (cause instanceof AmazonServiceException) {
} else if (cause instanceof AwsServiceException) {
getLogger().error("Could not process FlowFile due to server exception: " + message, error);
session.transfer(processServiceException(session, Collections.singletonList(outgoingFlowFile), (AmazonServiceException) cause), REL_FAILURE);
} else if (cause instanceof AmazonClientException) {
session.transfer(processServiceException(session, Collections.singletonList(outgoingFlowFile), (AwsServiceException) cause), REL_FAILURE);
} else if (cause instanceof SdkException) {
getLogger().error("Could not process FlowFile due to client exception: " + message, error);
session.transfer(processClientException(session, Collections.singletonList(outgoingFlowFile), (AmazonClientException) cause), REL_FAILURE);
session.transfer(processSdkException(session, Collections.singletonList(outgoingFlowFile), (SdkException) cause), REL_FAILURE);
} else {
getLogger().error("Could not process FlowFile: " + message, error);
session.transfer(outgoingFlowFile, REL_FAILURE);
@ -250,43 +255,45 @@ public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
}
private static class DynamoDbSplitRecordSetHandler extends SplitRecordSetHandler {
private final DynamoDB dynamoDB;
private final DynamoDbClient client;
private final String tableName;
private final ProcessContext context;
private final Map<String, String> flowFileAttributes;
private final ComponentLog logger;
private TableWriteItems accumulator;
private Collection<WriteRequest> accumulator;
private int itemCounter = 0;
private DynamoDbSplitRecordSetHandler(
final int maxChunkSize,
final DynamoDB dynamoDB,
final DynamoDbClient client,
final ProcessContext context,
final Map<String, String> flowFileAttributes,
final ComponentLog logger) {
super(maxChunkSize);
this.dynamoDB = dynamoDB;
this.client = client;
this.context = context;
this.flowFileAttributes = flowFileAttributes;
this.logger = logger;
this.tableName = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
accumulator = new TableWriteItems(tableName);
accumulator = new ArrayList<>();
}
@Override
protected void handleChunk(final boolean wasBatchAlreadyProcessed) throws SplitRecordSetHandlerException {
try {
if (!wasBatchAlreadyProcessed) {
final BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(accumulator);
final Map<String, Collection<WriteRequest>> requestItems = new HashMap<>();
requestItems.put(tableName, accumulator);
final BatchWriteItemResponse response = client.batchWriteItem(BatchWriteItemRequest.builder().requestItems(requestItems).build());
if (!outcome.getUnprocessedItems().isEmpty()) {
throw new SplitRecordSetHandlerException("Could not insert all items. The unprocessed items are: " + outcome.getUnprocessedItems().toString());
if (CollectionUtils.isNotEmpty(response.unprocessedItems())) {
throw new SplitRecordSetHandlerException("Could not insert all items. The unprocessed items are: " + response.unprocessedItems());
}
} else {
logger.debug("Skipping chunk as was already processed");
}
accumulator = new TableWriteItems(tableName);
accumulator.clear();
} catch (final Exception e) {
throw new SplitRecordSetHandlerException(e);
}
@ -295,39 +302,41 @@ public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
@Override
protected void addToChunk(final Record record) {
itemCounter++;
accumulator.addItemToPut(convert(record));
accumulator.add(convert(record));
}
private Item convert(final Record record) {
private WriteRequest convert(final Record record) {
final String partitionKeyField = context.getProperty(PARTITION_KEY_FIELD).evaluateAttributeExpressions().getValue();
final String sortKeyStrategy = context.getProperty(SORT_KEY_STRATEGY).getValue();
final String sortKeyField = context.getProperty(SORT_KEY_FIELD).evaluateAttributeExpressions().getValue();
final Item result = new Item();
final PutRequest.Builder putRequestBuilder = PutRequest.builder();
final Map<String, AttributeValue> item = new HashMap<>();
record.getSchema()
.getFields()
.stream()
.filter(field -> !field.getFieldName().equals(partitionKeyField))
.filter(field -> SORT_NONE.getValue().equals(sortKeyStrategy) || !field.getFieldName().equals(sortKeyField))
.forEach(field -> RecordToItemConverter.addField(record, result, field.getDataType().getFieldType(), field.getFieldName()));
.forEach(field -> RecordToItemConverter.addField(record, item, field.getDataType().getFieldType(), field.getFieldName()));
addPartitionKey(record, result);
addSortKey(record, result);
return result;
addPartitionKey(record, item);
addSortKey(record, item);
return WriteRequest.builder().putRequest(putRequestBuilder.item(item).build()).build();
}
private void addPartitionKey(final Record record, final Item result) {
private void addPartitionKey(final Record record, final Map<String, AttributeValue> item) {
final String partitionKeyStrategy = context.getProperty(PARTITION_KEY_STRATEGY).getValue();
final String partitionKeyField = context.getProperty(PARTITION_KEY_FIELD).evaluateAttributeExpressions().getValue();
final String partitionKeyAttribute = context.getProperty(PARTITION_KEY_ATTRIBUTE).evaluateAttributeExpressions().getValue();
final Object partitionKeyValue;
if (PARTITION_BY_FIELD.getValue().equals(partitionKeyStrategy)) {
if (!record.getSchema().getFieldNames().contains(partitionKeyField)) {
throw new ProcessException("\"" + PARTITION_BY_FIELD.getDisplayName() + "\" strategy needs the \"" + PARTITION_KEY_FIELD.getDefaultValue() +"\" to present in the record");
}
result.withKeyComponent(partitionKeyField, record.getValue(partitionKeyField));
partitionKeyValue = record.getValue(partitionKeyField);
} else if (PARTITION_BY_ATTRIBUTE.getValue().equals(partitionKeyStrategy)) {
if (record.getSchema().getFieldNames().contains(partitionKeyField)) {
throw new ProcessException("Cannot reuse existing field with " + PARTITION_KEY_STRATEGY.getDisplayName() + " \"" + PARTITION_BY_ATTRIBUTE.getDisplayName() + "\"");
@ -337,39 +346,47 @@ public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
throw new ProcessException("Missing attribute \"" + partitionKeyAttribute + "\"" );
}
result.withKeyComponent(partitionKeyField, flowFileAttributes.get(partitionKeyAttribute));
partitionKeyValue = flowFileAttributes.get(partitionKeyAttribute);
} else if (PARTITION_GENERATED.getValue().equals(partitionKeyStrategy)) {
if (record.getSchema().getFieldNames().contains(partitionKeyField)) {
throw new ProcessException("Cannot reuse existing field with " + PARTITION_KEY_STRATEGY.getDisplayName() + " \"" + PARTITION_GENERATED.getDisplayName() + "\"");
}
result.withKeyComponent(partitionKeyField, UUID.randomUUID().toString());
partitionKeyValue = UUID.randomUUID().toString();
} else {
throw new ProcessException("Unknown " + PARTITION_KEY_STRATEGY.getDisplayName() + " \"" + partitionKeyStrategy + "\"");
}
item.put(partitionKeyField, RecordToItemConverter.toAttributeValue(partitionKeyValue));
}
private void addSortKey(final Record record, final Item result) {
private void addSortKey(final Record record, final Map<String, AttributeValue> item) {
final String sortKeyStrategy = context.getProperty(SORT_KEY_STRATEGY).getValue();
final String sortKeyField = context.getProperty(SORT_KEY_FIELD).evaluateAttributeExpressions().getValue();
final Object sortKeyValue;
if (SORT_BY_FIELD.getValue().equals(sortKeyStrategy)) {
if (!record.getSchema().getFieldNames().contains(sortKeyField)) {
throw new ProcessException(SORT_BY_FIELD.getDisplayName() + " strategy needs the \"" + SORT_KEY_FIELD.getDisplayName() + "\" to present in the record");
}
result.withKeyComponent(sortKeyField, record.getValue(sortKeyField));
sortKeyValue = record.getValue(sortKeyField);
} else if (SORT_BY_SEQUENCE.getValue().equals(sortKeyStrategy)) {
if (record.getSchema().getFieldNames().contains(sortKeyField)) {
throw new ProcessException("Cannot reuse existing field with " + SORT_KEY_STRATEGY.getDisplayName() + " \"" + SORT_BY_SEQUENCE.getDisplayName() +"\"");
}
result.withKeyComponent(sortKeyField, itemCounter);
sortKeyValue = itemCounter;
} else if (SORT_NONE.getValue().equals(sortKeyStrategy)) {
logger.debug("No " + SORT_KEY_STRATEGY.getDisplayName() + " was applied");
sortKeyValue = null;
} else {
throw new ProcessException("Unknown " + SORT_KEY_STRATEGY.getDisplayName() + " \"" + sortKeyStrategy + "\"");
}
if (sortKeyValue != null) {
item.put(sortKeyField, RecordToItemConverter.toAttributeValue(sortKeyValue));
}
}
}
}

View File

@ -16,17 +16,16 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.amazonaws.services.dynamodbv2.document.Item;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
final class RecordToItemConverter {
@ -38,82 +37,76 @@ final class RecordToItemConverter {
/*
* https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBMapper.ArbitraryDataMapping.html
*/
public static void addField(final Record record, final Item item, final RecordFieldType fieldType, final String fieldName) {
public static void addField(final Record record, final Map<String, AttributeValue> item, final RecordFieldType fieldType, final String fieldName) {
item.put(fieldName, toAttributeValue(record.getValue(fieldName), fieldType));
}
static AttributeValue toAttributeValue(final Object object) {
final DataType dataType = DataTypeUtils.inferDataType(object, RecordFieldType.STRING.getDataType());
return toAttributeValue(object, dataType.getFieldType());
}
private static AttributeValue toAttributeValue(final Object object, final RecordFieldType fieldType) {
if (object == null) {
return null;
}
final AttributeValue.Builder builder = AttributeValue.builder();
switch (fieldType) {
case BOOLEAN:
builder.bool(DataTypeUtils.toBoolean(object, null));
break;
case SHORT:
case INT:
case LONG:
case FLOAT:
case BYTE:
case DOUBLE:
case STRING:
item.with(fieldName, record.getValue(fieldName));
break;
case BIGINT:
item.withBigInteger(fieldName, new BigInteger(record.getAsString(fieldName)));
break;
case DECIMAL:
item.withNumber(fieldName, new BigDecimal(record.getAsString(fieldName)));
break;
case TIMESTAMP:
case DATE:
case TIME:
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.String
item.withString(fieldName, record.getAsString(fieldName));
case CHAR:
item.withString(fieldName, record.getAsString(fieldName));
break;
case ENUM:
item.withString(fieldName, record.getAsString(fieldName));
builder.n(DataTypeUtils.toString(object, (String) null));
break;
case ARRAY:
item.withList(fieldName, record.getAsArray(fieldName));
final List<AttributeValue> list = Arrays.stream(DataTypeUtils.toArray(object, null, null))
.map(RecordToItemConverter::toAttributeValue)
.toList();
builder.l(list);
break;
case RECORD:
// In case of the underlying field is really a record (and not a map for example), schema argument is not used
item.withMap(fieldName, getRecordFieldAsMap(record.getAsRecord(fieldName, null)));
builder.m(getRecordFieldAsMap(DataTypeUtils.toRecord(object, null)));
break;
case MAP:
item.withMap(fieldName, getMapFieldAsMap(record.getValue(fieldName)));
builder.m(getMapFieldAsMap(DataTypeUtils.toMap(object, null)));
break;
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.String
case TIMESTAMP:
case DATE:
case TIME:
case CHAR:
case ENUM:
case STRING:
case CHOICE: // No similar data type is supported by DynamoDB
default:
item.withString(fieldName, record.getAsString(fieldName));
builder.s(DataTypeUtils.toString(object, (String) null));
}
return builder.build();
}
private static Map<String, Object> getRecordFieldAsMap(final Record recordField) {
final Map<String, Object> result = new HashMap<>();
private static Map<String, AttributeValue> getRecordFieldAsMap(final Record recordField) {
final Map<String, AttributeValue> result = new HashMap<>();
for (final RecordField field : recordField.getSchema().getFields()) {
result.put(field.getFieldName(), convertToSupportedType(recordField.getValue(field)));
result.put(field.getFieldName(), toAttributeValue(recordField.getValue(field)));
}
return result;
}
private static Map<String, Object> getMapFieldAsMap(final Object recordField) {
if (!(recordField instanceof Map)) {
throw new IllegalArgumentException("Map type is expected");
}
private static Map<String, AttributeValue> getMapFieldAsMap(final Map<String, Object> mapField) {
final Map<String, AttributeValue> result = new HashMap<>();
final Map<String, Object> result = new HashMap<>();
((Map<String, Object>) recordField).forEach((name, value) -> result.put(name, convertToSupportedType(value)));
mapField.forEach((name, value) -> result.put(name, toAttributeValue(value)));
return result;
}
private static Object convertToSupportedType(Object value) {
if (value instanceof Record) {
return getRecordFieldAsMap((Record) value);
} else if (value instanceof Map) {
return getMapFieldAsMap(value);
} else if (value instanceof Character || value instanceof Timestamp || value instanceof Date || value instanceof Time) {
return value.toString();
} else if (value instanceof Enum) {
return ((Enum) value).name();
} else {
return value;
}
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class AbstractDynamoDBIT {
protected static final String PARTITION_KEY_ONLY_TABLE = "partitionKeyTable";
protected static final String PARTITION_AND_SORT_KEY_TABLE = "partitionKeySortKeyTable";
protected static final String PARTITION_KEY = "partitionKey";
protected static final String SORT_KEY = "sortKey";
protected static final String PARTITION_KEY_VALUE_PREFIX = "partition.value.";
private static DynamoDbClient client;
private static final DockerImageName localstackImage = DockerImageName.parse("localstack/localstack:latest");
private static final LocalStackContainer localstack = new LocalStackContainer(localstackImage)
.withServices(LocalStackContainer.Service.DYNAMODB);
@BeforeAll
public static void oneTimeSetup() {
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
localstack.start();
client = DynamoDbClient.builder()
.endpointOverride(localstack.getEndpoint())
.region(Region.of(localstack.getRegion()))
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())))
.build();
final AttributeDefinition partitionKeyDefinition = AttributeDefinition.builder().attributeName(PARTITION_KEY).attributeType(ScalarAttributeType.S).build();
final KeySchemaElement partitionKeySchema = KeySchemaElement.builder().attributeName(PARTITION_KEY).keyType(KeyType.HASH).build();
final AttributeDefinition sortKeyDefinition = AttributeDefinition.builder().attributeName(SORT_KEY).attributeType(ScalarAttributeType.N).build();
final KeySchemaElement sortKeySchema = KeySchemaElement.builder().attributeName(SORT_KEY).keyType(KeyType.RANGE).build();
final ProvisionedThroughput provisionedThroughput = ProvisionedThroughput.builder()
.readCapacityUnits(1000L)
.writeCapacityUnits(1000L).build();
final CreateTableRequest request1 = CreateTableRequest.builder()
.tableName(PARTITION_KEY_ONLY_TABLE)
.attributeDefinitions(partitionKeyDefinition)
.keySchema(partitionKeySchema)
.provisionedThroughput(provisionedThroughput)
.build();
final CreateTableRequest request2 = CreateTableRequest.builder()
.tableName(PARTITION_AND_SORT_KEY_TABLE)
.attributeDefinitions(partitionKeyDefinition, sortKeyDefinition)
.provisionedThroughput(provisionedThroughput)
.keySchema(partitionKeySchema, sortKeySchema)
.build();
client.createTable(request1);
client.createTable(request2);
}
@AfterAll
public static void oneTimeTeardown() {
client.close();
localstack.stop();
}
protected DynamoDbClient getClient() {
return client;
}
protected TestRunner initRunner(final Class<? extends Processor> processorClass) {
TestRunner runner = TestRunners.newTestRunner(processorClass);
AuthUtils.enableAccessKey(runner, localstack.getAccessKey(), localstack.getSecretKey());
runner.setProperty(AbstractDynamoDBProcessor.REGION, localstack.getRegion());
runner.setProperty(AbstractDynamoDBProcessor.ENDPOINT_OVERRIDE, localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString());
return runner;
}
protected BatchGetItemResponse getBatchGetItems(int count, String table, boolean includeSortKey) {
final Map<String, KeysAndAttributes> requestItems = new HashMap<>();
final Collection<Map<String, AttributeValue>> keys = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Map<String, AttributeValue> partitionKey = new HashMap<>();
partitionKey.put(PARTITION_KEY, AttributeValue.builder().s(PARTITION_KEY_VALUE_PREFIX + i).build());
if (includeSortKey) {
partitionKey.put(SORT_KEY, AttributeValue.builder().n(String.valueOf(i)).build());
}
keys.add(partitionKey);
}
final KeysAndAttributes keysAndAttributes = KeysAndAttributes.builder().keys(keys).build();
requestItems.put(table, keysAndAttributes);
final BatchGetItemRequest request = BatchGetItemRequest.builder().requestItems(requestItems).build();
final BatchGetItemResponse response = getClient().batchGetItem(request);
return response;
}
}

View File

@ -16,9 +16,11 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.AmazonServiceException.ErrorType;
import org.apache.nifi.util.MockFlowFile;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.util.List;
@ -37,7 +39,6 @@ public abstract class AbstractDynamoDBTest {
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE,
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE,
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE,
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE,
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE,
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE,
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID,
@ -46,13 +47,18 @@ public abstract class AbstractDynamoDBTest {
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE
);
protected AmazonServiceException getSampleAwsServiceException() {
final AmazonServiceException testServiceException = new AmazonServiceException("Test AWS Service Exception");
testServiceException.setErrorCode("8673509");
testServiceException.setErrorMessage("This request cannot be serviced right now.");
testServiceException.setErrorType(ErrorType.Service);
testServiceException.setServiceName("Dynamo DB");
testServiceException.setRequestId("TestRequestId-1234567890");
protected DynamoDbClient client;
protected AwsServiceException getSampleAwsServiceException() {
final AwsServiceException testServiceException = AwsServiceException.builder()
.message("Test AWS Service Exception")
.awsErrorDetails(AwsErrorDetails.builder()
.errorCode("8673509")
.errorMessage("This request cannot be serviced right now.")
.serviceName("Dynamo DB")
.build())
.requestId("TestRequestId-1234567890")
.build();
return testServiceException;
}
@ -61,4 +67,7 @@ public abstract class AbstractDynamoDBTest {
errorAttributes.forEach(flowFile::assertAttributeExists);
}
protected static AttributeValue string(final String s) {
return AttributeValue.builder().s(s).build();
}
}

View File

@ -16,16 +16,6 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
import com.amazonaws.services.dynamodbv2.model.DeleteRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.reporting.InitializationException;
@ -34,8 +24,12 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import java.util.ArrayList;
import java.util.HashMap;
@ -44,31 +38,26 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
protected DeleteDynamoDB deleteDynamoDB;
protected BatchWriteItemResult result = new BatchWriteItemResult();
BatchWriteItemOutcome outcome;
@BeforeEach
public void setUp() {
outcome = new BatchWriteItemOutcome(result);
result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
return outcome;
}
};
client = mock(DynamoDbClient.class);
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
protected DynamoDbClient getClient(final ProcessContext context) {
return client;
}
};
when(client.batchWriteItem(any(BatchWriteItemRequest.class))).thenReturn(BatchWriteItemResponse.builder().build());
}
private TestRunner createRunner() throws InitializationException {
@ -89,17 +78,8 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
@Test
public void testStringHashStringRangeDeleteOnlyHashFailure() throws InitializationException {
// Inject a mock DynamoDB to create the exception condition
final DynamoDB mockDynamoDb = Mockito.mock(DynamoDB.class);
// When writing, mock thrown service exception from AWS
Mockito.when(mockDynamoDb.batchWriteItem(ArgumentMatchers.<TableWriteItems>any())).thenThrow(getSampleAwsServiceException());
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDb;
}
};
when(client.batchWriteItem(any(BatchWriteItemRequest.class))).thenThrow(getSampleAwsServiceException());
final TestRunner deleteRunner = createRunner();
@ -109,8 +89,8 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
validateServiceExceptionAttributes(flowFile);
}
@ -131,16 +111,18 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
@Test
public void testStringHashStringRangeDeleteSuccessfulWithMockOneUnprocessed() throws InitializationException {
Map<String, List<WriteRequest>> unprocessed =
new HashMap<String, List<WriteRequest>>();
DeleteRequest delete = new DeleteRequest();
delete.addKeyEntry("hashS", new AttributeValue("h1"));
delete.addKeyEntry("rangeS", new AttributeValue("r1"));
WriteRequest write = new WriteRequest(delete);
List<WriteRequest> writes = new ArrayList<>();
final Map<String, List<WriteRequest>> unprocessed = new HashMap<>();
final DeleteRequest delete = DeleteRequest.builder().key(Map
.of(
"hashS", string("h1"),
"rangeS", string("r1")
)).build();
final WriteRequest write = WriteRequest.builder().deleteRequest(delete).build();
final List<WriteRequest> writes = new ArrayList<>();
writes.add(write);
unprocessed.put(stringHashStringRangeTableName, writes);
result.setUnprocessedItems(unprocessed);
final BatchWriteItemResponse response = BatchWriteItemResponse.builder().unprocessedItems(unprocessed).build();
when(client.batchWriteItem(any(BatchWriteItemRequest.class))).thenReturn(response);
final TestRunner deleteRunner = createRunner();
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
@ -166,8 +148,8 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR));
}
@ -184,8 +166,8 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
}
@ -201,8 +183,8 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
}
}
@ -235,20 +217,9 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
@Test
public void testStringHashStringRangeDeleteThrowsServiceException() throws InitializationException {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
throw new AmazonServiceException("serviceException");
}
};
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};
when(client.batchWriteItem(any(BatchWriteItemRequest.class)))
.thenThrow(getSampleAwsServiceException());
final TestRunner deleteRunner = createRunner();
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
@ -258,9 +229,9 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("serviceException (Service: null; Status Code: 0; Error Code: null; Request ID: null; Proxy: null)",
final List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertEquals("Test AWS Service Exception (Service: Dynamo DB, Status Code: 0, Request ID: TestRequestId-1234567890)",
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
@ -268,19 +239,8 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
@Test
public void testStringHashStringRangeDeleteThrowsClientException() throws InitializationException {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
throw new AmazonClientException("clientException");
}
};
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};
when(client.batchWriteItem(any(BatchWriteItemRequest.class)))
.thenThrow(SdkException.builder().message("sdkException").build());
final TestRunner deleteRunner = createRunner(deleteDynamoDB);
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
@ -291,31 +251,19 @@ public class DeleteDynamoDBTest extends AbstractDynamoDBTest {
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("clientException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
final List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertEquals("sdkException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangeDeleteThrowsRuntimeException() throws InitializationException {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
throw new RuntimeException("runtimeException");
}
};
when(client.batchWriteItem(any(BatchWriteItemRequest.class)))
.thenThrow(new RuntimeException("runtimeException"));
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};
final TestRunner deleteRunner = createRunner(deleteDynamoDB);
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
final TestRunner deleteRunner = createRunner();
deleteRunner.enqueue(new byte[] {});

View File

@ -16,16 +16,6 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchGetItemResult;
import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.VerifiableProcessor;
@ -35,11 +25,15 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -51,50 +45,70 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class GetDynamoDBTest extends AbstractDynamoDBTest {
protected GetDynamoDB getDynamoDB;
protected BatchGetItemOutcome outcome;
protected BatchGetItemResult result = new BatchGetItemResult();
private HashMap unprocessed;
private static final String JSON_DOCUMENT_KEY = "j1";
private static final String HASH_KEY = "hashS";
private static final String RANGE_KEY = "rangeS";
private static final String HASH_KEY_VALUE = "h1";
private static final String RANGE_KEY_VALUE = "r1";
private GetDynamoDB getDynamoDB;
@BeforeEach
public void setUp() {
outcome = new BatchGetItemOutcome(result);
KeysAndAttributes kaa = new KeysAndAttributes();
Map<String,AttributeValue> map = new HashMap<>();
map.put("hashS", new AttributeValue("h1"));
map.put("rangeS", new AttributeValue("r1"));
kaa.withKeys(map);
unprocessed = new HashMap<>();
unprocessed.put(stringHashStringRangeTableName, kaa);
final Map<String, Collection<Map<String, AttributeValue>>> responses = new HashMap<>();
responses.put(stringHashStringRangeTableName, Collections.emptyList());
final BatchGetItemResponse response = BatchGetItemResponse.builder()
.unprocessedKeys(getUnprocessedTableToKeysMap())
.responses(responses)
.build();
result.withUnprocessedKeys(unprocessed);
client = mock(DynamoDbClient.class);
Map<String,List<Map<String,AttributeValue>>> responses = new HashMap<>();
List<Map<String,AttributeValue>> items = new ArrayList<>();
responses.put("StringHashStringRangeTable", items);
result.withResponses(responses);
when(client.batchGetItem(any(BatchGetItemRequest.class))).thenReturn(response);
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
getDynamoDB = new GetDynamoDB() {
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
return outcome;
protected DynamoDbClient getClient(final ProcessContext context) {
return client;
}
};
}
getDynamoDB = mockDynamoDB(mockDynamoDB);
private static Map<String, KeysAndAttributes> getUnprocessedTableToKeysMap() {
return getUnprocessedTableToKeysMap(true);
}
private static Map<String, KeysAndAttributes> getUnprocessedTableToKeysMap(final boolean includeRangeKey) {
final Map<String, KeysAndAttributes> unprocessedTableToKeysMap = new HashMap<>();
final Map<String, AttributeValue> keyMap = new HashMap<>();
keyMap.put(HASH_KEY, string(HASH_KEY_VALUE));
if (includeRangeKey) {
keyMap.put(RANGE_KEY, string(RANGE_KEY_VALUE));
}
unprocessedTableToKeysMap.put(stringHashStringRangeTableName, KeysAndAttributes.builder().keys(keyMap).build());
return unprocessedTableToKeysMap;
}
private static Map<String, KeysAndAttributes> getEmptyUnprocessedTableToKeysMap() {
final Map<String, KeysAndAttributes> unprocessedTableToKeysMap = new HashMap<>();
final Map<String, AttributeValue> keyMap = new HashMap<>();
unprocessedTableToKeysMap.put(stringHashStringRangeTableName, KeysAndAttributes.builder().keys(keyMap).build());
return unprocessedTableToKeysMap;
}
@Test
public void testStringHashStringRangeGetUnprocessed() {
final TestRunner getRunner = createRunner();
getRunner.enqueue(new byte[]{});
getRunner.enqueue(new byte[] {});
getRunner.run(1);
// No actual items returned
assertVerificationResults(getRunner, 0, 0);
assertVerificationResults(getRunner, 0, 0, true);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);
@ -114,46 +128,25 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, HASH_KEY);
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, RANGE_KEY);
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, RANGE_KEY_VALUE);
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, HASH_KEY_VALUE);
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, JSON_DOCUMENT_KEY);
return getRunner;
}
@Test
public void testStringHashStringRangeGetJsonObjectNull() {
outcome = new BatchGetItemOutcome(result);
KeysAndAttributes kaa = new KeysAndAttributes();
Map<String,AttributeValue> map = new HashMap<>();
map.put("hashS", new AttributeValue("h1"));
map.put("rangeS", new AttributeValue("r1"));
kaa.withKeys(map);
unprocessed = new HashMap<>();
result.withUnprocessedKeys(unprocessed);
final Map<String, List<Map<String, AttributeValue>>> responses = generateResponses(null);
Map<String,List<Map<String,AttributeValue>>> responses = new HashMap<>();
List<Map<String,AttributeValue>> items = new ArrayList<>();
Map<String,AttributeValue> item = new HashMap<String,AttributeValue>();
item.put("j1",null);
item.put("hashS", new AttributeValue("h1"));
item.put("rangeS", new AttributeValue("r1"));
items.add(item);
responses.put("StringHashStringRangeTable", items);
result.withResponses(responses);
final BatchGetItemResponse response = BatchGetItemResponse.builder()
.unprocessedKeys(getUnprocessedTableToKeysMap())
.responses(responses)
.build();
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
return outcome;
}
};
getDynamoDB = mockDynamoDB(mockDynamoDB);
final TestRunner getRunner = createRunner(getDynamoDB);
when(client.batchGetItem(any(BatchGetItemRequest.class))).thenReturn(response);
final TestRunner getRunner = createRunner();
getRunner.enqueue(new byte[] {});
getRunner.run(1);
@ -162,46 +155,26 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (final MockFlowFile flowFile : flowFiles) {
assertNull(flowFile.getContentClaim());
}
}
@Test
public void testStringHashStringRangeGetJsonObjectValid() throws IOException {
outcome = new BatchGetItemOutcome(result);
KeysAndAttributes kaa = new KeysAndAttributes();
Map<String,AttributeValue> map = new HashMap<>();
map.put("hashS", new AttributeValue("h1"));
map.put("rangeS", new AttributeValue("r1"));
kaa.withKeys(map);
unprocessed = new HashMap<>();
result.withUnprocessedKeys(unprocessed);
public void testStringHashStringRangeGetJsonObjectValid() {
final String jsonDocument = "{\"name\": \"john\"}";
final Map<String, List<Map<String, AttributeValue>>> responses = generateResponses(jsonDocument);
Map<String,List<Map<String,AttributeValue>>> responses = new HashMap<>();
List<Map<String,AttributeValue>> items = new ArrayList<>();
Map<String,AttributeValue> item = new HashMap<String,AttributeValue>();
String jsonDocument = "{\"name\": \"john\"}";
item.put("j1",new AttributeValue(jsonDocument));
item.put("hashS", new AttributeValue("h1"));
item.put("rangeS", new AttributeValue("r1"));
items.add(item);
responses.put("StringHashStringRangeTable", items);
result.withResponses(responses);
final BatchGetItemResponse response = BatchGetItemResponse.builder()
.unprocessedKeys(getEmptyUnprocessedTableToKeysMap())
.responses(responses)
.build();
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
when(client.batchGetItem(any(BatchGetItemRequest.class))).thenReturn(response);
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
return outcome;
}
};
getDynamoDB = mockDynamoDB(mockDynamoDB);
final TestRunner getRunner = createRunner(getDynamoDB);
final TestRunner getRunner = createRunner();
getRunner.enqueue(new byte[] {});
getRunner.run(1);
@ -211,27 +184,17 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
@Test
public void testStringHashStringRangeGetThrowsServiceException() {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
throw new AmazonServiceException("serviceException");
}
};
final GetDynamoDB getDynamoDB = mockDynamoDB(mockDynamoDB);
final TestRunner getRunner = createRunner(getDynamoDB);
when(client.batchGetItem(any(BatchGetItemRequest.class))).thenThrow(getSampleAwsServiceException());
final TestRunner getRunner = createRunner();
getRunner.enqueue(new byte[] {});
getRunner.run(1);
assertVerificationFailure(getRunner);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("serviceException (Service: null; Status Code: 0; Error Code: null; Request ID: null; Proxy: null)",
final List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertEquals("Test AWS Service Exception (Service: Dynamo DB, Status Code: 0, Request ID: TestRequestId-1234567890)",
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
@ -239,18 +202,9 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
@Test
public void testStringHashStringRangeGetThrowsRuntimeException() {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
when(client.batchGetItem(any(BatchGetItemRequest.class))).thenThrow(new RuntimeException("runtimeException"));
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
throw new RuntimeException("runtimeException");
}
};
final GetDynamoDB getDynamoDB = mockDynamoDB(mockDynamoDB);
final TestRunner getRunner = createRunner(getDynamoDB);
final TestRunner getRunner = createRunner();
getRunner.enqueue(new byte[] {});
getRunner.run(1);
@ -258,26 +212,17 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
assertVerificationFailure(getRunner);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertEquals("runtimeException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangeGetThrowsClientException() {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
public void testStringHashStringRangeGetThrowsSdkException() {
when(client.batchGetItem(any(BatchGetItemRequest.class))).thenThrow(SdkException.builder().message("sdkException").build());
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
throw new AmazonClientException("clientException");
}
};
final GetDynamoDB getDynamoDB = mockDynamoDB(mockDynamoDB);
final TestRunner getRunner = createRunner(getDynamoDB);
final TestRunner getRunner = createRunner();
getRunner.enqueue(new byte[] {});
getRunner.run(1);
@ -285,33 +230,19 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
assertVerificationFailure(getRunner);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("clientException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
final List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertEquals("sdkException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangeGetNotFound() {
result.clearResponsesEntries();
result.clearUnprocessedKeysEntries();
final BatchGetItemResponse notFoundResponse = BatchGetItemResponse.builder().build();
final BatchGetItemOutcome notFoundOutcome = new BatchGetItemOutcome(result);
Map<String,List<Map<String,AttributeValue>>> responses = new HashMap<>();
List<Map<String,AttributeValue>> items = new ArrayList<>();
responses.put(stringHashStringRangeTableName, items);
result.withResponses(responses);
final DynamoDB notFoundMockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
return notFoundOutcome;
}
};
final GetDynamoDB getDynamoDB = mockDynamoDB(notFoundMockDynamoDB);
final TestRunner getRunner = createRunner(getDynamoDB);
final TestRunner getRunner = createRunner();
when(client.batchGetItem(any(BatchGetItemRequest.class))).thenReturn(notFoundResponse);
getRunner.enqueue(new byte[] {});
getRunner.run(1);
@ -329,14 +260,11 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
@Test
public void testStringHashStringRangeGetOnlyHashFailure() {
// Inject a mock DynamoDB to create the exception condition
final DynamoDB mockDynamoDb = Mockito.mock(DynamoDB.class);
// When writing, mock thrown service exception from AWS
Mockito.when(mockDynamoDb.batchGetItem(ArgumentMatchers.<TableKeysAndAttributes>any())).thenThrow(getSampleAwsServiceException());
when(client.batchGetItem(any(BatchGetItemRequest.class))).thenThrow(getSampleAwsServiceException());
final TestRunner getRunner = createRunner();
getRunner.removeProperty(GetDynamoDB.RANGE_KEY_NAME);
getRunner.removeProperty(GetDynamoDB.RANGE_KEY_VALUE);
getDynamoDB = mockDynamoDB(mockDynamoDb);
final TestRunner getRunner = createRunner(getDynamoDB);
getRunner.enqueue(new byte[] {});
getRunner.run(1);
@ -345,8 +273,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
validateServiceExceptionAttributes(flowFile);
}
@ -373,7 +301,6 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
public void testStringHashStringRangeGetOnlyHashWithRangeValueNoRangeNameFailure() {
final TestRunner getRunner = createRunner();
getRunner.removeProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME);
getRunner.enqueue(new byte[] {});
getRunner.run(1);
@ -390,7 +317,6 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
public void testStringHashStringRangeGetOnlyHashWithRangeNameNoRangeValueFailure() {
final TestRunner getRunner = createRunner();
getRunner.removeProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE);
getRunner.enqueue(new byte[] {});
getRunner.run(1);
@ -405,17 +331,16 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
@Test
public void testStringHashStringNoRangeGetUnprocessed() {
unprocessed.clear();
KeysAndAttributes kaa = new KeysAndAttributes();
Map<String,AttributeValue> map = new HashMap<>();
map.put("hashS", new AttributeValue("h1"));
kaa.withKeys(map);
unprocessed.put(stringHashStringRangeTableName, kaa);
final BatchGetItemResponse response = BatchGetItemResponse.builder()
.unprocessedKeys(getUnprocessedTableToKeysMap(false))
.build();
when(client.batchGetItem(any(BatchGetItemRequest.class))).thenReturn(response);
final TestRunner getRunner = createRunner();
getRunner.removeProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME);
getRunner.removeProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE);
getRunner.enqueue(new byte[] {});
getRunner.run(1);
@ -428,24 +353,6 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
}
}
private GetDynamoDB mockDynamoDB(final DynamoDB mockDynamoDB) {
return new GetDynamoDB() {
@Override
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
@Override
protected DynamoDB getDynamoDB(final AmazonDynamoDBClient client) {
return mockDynamoDB;
}
@Override
protected AmazonDynamoDBClient getClient(final ProcessContext context) {
return Mockito.mock(AmazonDynamoDBClient.class);
}
};
}
private void assertVerificationFailure(final TestRunner runner) {
final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
@ -456,11 +363,31 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
}
private void assertVerificationResults(final TestRunner runner, final int expectedTotalCount, final int expectedJsonDocumentCount) {
assertVerificationResults(runner, expectedTotalCount, expectedJsonDocumentCount, false);
}
private void assertVerificationResults(final TestRunner runner, final int expectedTotalCount, final int expectedJsonDocumentCount, final boolean hasUnprocessedItems) {
final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(3, results.size());
results.forEach(result -> assertEquals(SUCCESSFUL, result.getOutcome()));
assertTrue(results.get(2).getExplanation().contains("retrieved " + expectedTotalCount + " items"));
assertTrue(results.get(2).getExplanation().contains(expectedJsonDocumentCount + " JSON"));
if (expectedTotalCount == 0 && !hasUnprocessedItems) {
assertEquals("Successfully issued request, although no items were returned from DynamoDB", results.get(2).getExplanation());
} else {
assertTrue(results.get(2).getExplanation().contains("retrieved " + expectedTotalCount + " items"));
assertTrue(results.get(2).getExplanation().contains(expectedJsonDocumentCount + " JSON"));
}
}
private static Map<String, List<Map<String, AttributeValue>>> generateResponses(final String jsonValue) {
final Map<String, List<Map<String, AttributeValue>>> responses = new HashMap<>();
final List<Map<String, AttributeValue>> items = new ArrayList<>();
final Map<String, AttributeValue> item = new HashMap<>();
item.put(JSON_DOCUMENT_KEY, string(jsonValue));
item.put(HASH_KEY, string(HASH_KEY_VALUE));
item.put(RANGE_KEY, string(RANGE_KEY_VALUE));
items.add(item);
responses.put(stringHashStringRangeTableName, items);
return responses;
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ITDynamoDB extends AbstractDynamoDBIT {
private static final String JSON_TEMPLATE = "{ \"key\": \"%d\", \"value\": \"val\" }";
private static final String HASH_KEY_ATTRIBUTE = "dynamodb.item.hash.key.value";
private static final String RANGE_KEY_ATTRIBUTE = "dynamodb.item.range.key.value";
private static final String JSON_DOCUMENT_FIELD = "jsonDocument";
@Test
public void partitionKeyOnlySuccess() {
runDynamoDBTest(10, PARTITION_KEY_ONLY_TABLE, false, runner -> {});
}
@Test
public void partitionKeySortKeySuccess() {
runDynamoDBTest(10, PARTITION_AND_SORT_KEY_TABLE, true, runner -> {
runner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, SORT_KEY);
runner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE_TYPE, "number");
});
}
private void runDynamoDBTest(final int count, final String table, final boolean includeSortKey, final Consumer<TestRunner> runnerConfigurer) {
// Put the documents in DynamoDB
final TestRunner putRunner = initRunner(PutDynamoDB.class);
putRunner.setProperty(PutDynamoDB.BATCH_SIZE, String.valueOf(count));
putRunner.setProperty(PutDynamoDB.TABLE, table);
putRunner.setProperty(PutDynamoDB.HASH_KEY_NAME, PARTITION_KEY);
putRunner.setProperty(PutDynamoDB.JSON_DOCUMENT, JSON_DOCUMENT_FIELD);
runnerConfigurer.accept(putRunner);
enqueue(putRunner, count, includeSortKey);
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(PutDynamoDB.REL_SUCCESS, count);
assertItemsExist(count, table, includeSortKey);
// Get the documents from DynamoDB
final TestRunner getRunner = initRunner(GetDynamoDB.class);
getRunner.setProperty(GetDynamoDB.BATCH_SIZE, String.valueOf(count));
getRunner.setProperty(GetDynamoDB.TABLE, table);
getRunner.setProperty(GetDynamoDB.HASH_KEY_NAME, PARTITION_KEY);
getRunner.setProperty(GetDynamoDB.JSON_DOCUMENT, JSON_DOCUMENT_FIELD);
runnerConfigurer.accept(getRunner);
putRunner.getFlowFilesForRelationship(PutDynamoDB.REL_SUCCESS).forEach(getRunner::enqueue);
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(GetDynamoDB.REL_SUCCESS, count);
getRunner.getFlowFilesForRelationship(GetDynamoDB.REL_SUCCESS).forEach(ff -> {
final String data = new String(ff.getData());
final String hashKey = ff.getAttribute(HASH_KEY_ATTRIBUTE);
assertNotNull(hashKey);
assertTrue(data.contains(StringUtils.substringAfter(hashKey, PARTITION_KEY_VALUE_PREFIX)));
});
// Delete the documents from DynamoDB
final TestRunner deleteRunner = initRunner(DeleteDynamoDB.class);
deleteRunner.setProperty(DeleteDynamoDB.BATCH_SIZE, String.valueOf(count));
deleteRunner.setProperty(DeleteDynamoDB.TABLE, table);
deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, PARTITION_KEY);
runnerConfigurer.accept(deleteRunner);
getRunner.getFlowFilesForRelationship(GetDynamoDB.REL_SUCCESS).forEach(deleteRunner::enqueue);
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_SUCCESS, count);
assertItemsDoNotExist(count, table, includeSortKey);
}
private void assertItemsExist(final int count, final String table, final boolean includeSortKey) {
final BatchGetItemResponse response = getBatchGetItems(count, table, includeSortKey);
final List<Map<String, AttributeValue>> items = response.responses().get(table);
assertNotNull(items);
assertEquals(count, items.size());
items.forEach(item -> {
assertNotNull(item.get(PARTITION_KEY));
assertNotNull(item.get(PARTITION_KEY).s());
assertNotNull(item.get(JSON_DOCUMENT_FIELD));
assertNotNull(item.get(JSON_DOCUMENT_FIELD).s());
if (includeSortKey) {
assertNotNull(item.get(SORT_KEY));
assertNotNull(item.get(SORT_KEY).n());
}
});
}
private void assertItemsDoNotExist(final int count, final String table, final boolean includeSortKey) {
final BatchGetItemResponse response = getBatchGetItems(count, table, includeSortKey);
final List<Map<String, AttributeValue>> items = response.responses().get(table);
assertNotNull(items);
assertEquals(0, items.size());
}
private static void enqueue(final TestRunner runner, final int count, final boolean includeSortKey) {
for (int i = 0; i < count; i++) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(HASH_KEY_ATTRIBUTE, PARTITION_KEY_VALUE_PREFIX + i);
if (includeSortKey) {
attributes.put(RANGE_KEY_ATTRIBUTE, String.valueOf(i));
}
runner.enqueue(String.format(JSON_TEMPLATE, i), attributes);
}
}
}

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class ITPutDynamoDBRecord extends AbstractDynamoDBIT {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Test
public void partitionKeyOnlySuccess() throws JsonProcessingException {
runDynamoDBTest(10, PARTITION_KEY_ONLY_TABLE, false, runner -> {});
}
@Test
public void partitionKeySortKeySuccess() throws JsonProcessingException {
runDynamoDBTest(10, PARTITION_AND_SORT_KEY_TABLE, true, runner -> {
runner.setProperty(PutDynamoDBRecord.SORT_KEY_STRATEGY, PutDynamoDBRecord.SORT_BY_FIELD);
runner.setProperty(PutDynamoDBRecord.SORT_KEY_FIELD, "sortKey");
});
}
private void runDynamoDBTest(final int count, final String table, final boolean includeSortKey, final Consumer<TestRunner> runnerConfigurer) throws JsonProcessingException {
final List<TestRecord> testRecords = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Long sortKey = includeSortKey ? (long) i : null;
testRecords.add(new TestRecord(PARTITION_KEY_VALUE_PREFIX + i, sortKey, UUID.randomUUID().toString(), i, Arrays.asList("1", "2", "3")));
}
// Put the documents in DynamoDB
final TestRunner runner = initRunner(PutDynamoDBRecord.class);
runner.setProperty(PutDynamoDBRecord.PARTITION_KEY_FIELD, PARTITION_KEY);
runner.setProperty(PutDynamoDBRecord.TABLE, table);
try {
final JsonTreeReader jsonTreeReader = new JsonTreeReader();
runner.addControllerService("jsonTreeReader", jsonTreeReader);
runner.enableControllerService(jsonTreeReader);
runner.setProperty(PutDynamoDBRecord.RECORD_READER, "jsonTreeReader");
} catch (final InitializationException e) {
Assertions.fail("Could not set properties");
}
runnerConfigurer.accept(runner);
final String data = OBJECT_MAPPER.writeValueAsString(testRecords);
runner.enqueue(data);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutDynamoDB.REL_SUCCESS, 1);
assertItemsExist(count, table, includeSortKey);
}
private void assertItemsExist(final int count, final String table, final boolean includeSortKey) {
final BatchGetItemResponse response = getBatchGetItems(count, table, includeSortKey);
final List<Map<String, AttributeValue>> items = response.responses().get(table);
assertNotNull(items);
assertEquals(count, items.size());
items.forEach(item -> {
assertNotNull(item.get(PARTITION_KEY));
assertNotNull(item.get(PARTITION_KEY).s());
assertNotNull(item.get("testString"));
assertNotNull(item.get("testString").s());
assertNotNull(item.get("testNumber"));
assertNotNull(item.get("testNumber").n());
assertNotNull(item.get("testList"));
assertNotNull(item.get("testList").l());
assertEquals(3, item.get("testList").l().size());
if (includeSortKey) {
assertNotNull(item.get(SORT_KEY));
assertNotNull(item.get(SORT_KEY).n());
}
});
}
private static class TestRecord {
private String partitionKey;
private Long sortKey;
private String testString;
private Integer testNumber;
private List<String> testList;
public TestRecord() {
}
public TestRecord(final String partitionKey, final Long sortKey, final String testString, final Integer testNumber, final List<String> testList) {
this.partitionKey = partitionKey;
this.sortKey = sortKey;
this.testString = testString;
this.testNumber = testNumber;
this.testList = testList;
}
public String getPartitionKey() {
return partitionKey;
}
public void setPartitionKey(String partitionKey) {
this.partitionKey = partitionKey;
}
public Long getSortKey() {
return sortKey;
}
public void setSortKey(Long sortKey) {
this.sortKey = sortKey;
}
public String getTestString() {
return testString;
}
public void setTestString(String testString) {
this.testString = testString;
}
public Integer getTestNumber() {
return testNumber;
}
public void setTestNumber(Integer testNumber) {
this.testNumber = testNumber;
}
public List<String> getTestList() {
return testList;
}
public void setTestList(List<String> testList) {
this.testList = testList;
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.aws.dynamodb;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -34,8 +35,8 @@ public class ItemKeysTest {
@Test
public void testHashNotNullRangeNullEquals() {
ItemKeys ik1 = new ItemKeys("abc", null);
ItemKeys ik2 = new ItemKeys("abc", null);
ItemKeys ik1 = new ItemKeys(string("abc"), null);
ItemKeys ik2 = new ItemKeys(string("abc"), null);
assertEquals(ik1, ik2);
assertEquals(ik1.hashCode(), ik2.hashCode());
assertEquals(ik1.toString(), ik2.toString());
@ -43,8 +44,8 @@ public class ItemKeysTest {
@Test
public void testHashNullRangeNotNullEquals() {
ItemKeys ik1 = new ItemKeys(null, "ab");
ItemKeys ik2 = new ItemKeys(null, "ab");
ItemKeys ik1 = new ItemKeys(null, string("ab"));
ItemKeys ik2 = new ItemKeys(null, string("ab"));
assertEquals(ik1, ik2);
assertEquals(ik1.hashCode(), ik2.hashCode());
assertEquals(ik1.toString(), ik2.toString());
@ -52,8 +53,8 @@ public class ItemKeysTest {
@Test
public void testHashNotNullRangeNotNullEquals() {
ItemKeys ik1 = new ItemKeys("abc", "pqr");
ItemKeys ik2 = new ItemKeys("abc", "pqr");
ItemKeys ik1 = new ItemKeys(string("abc"), string("pqr"));
ItemKeys ik2 = new ItemKeys(string("abc"), string("pqr"));
assertEquals(ik1, ik2);
assertEquals(ik1.hashCode(), ik2.hashCode());
assertEquals(ik1.toString(), ik2.toString());
@ -61,8 +62,12 @@ public class ItemKeysTest {
@Test
public void testHashNotNullRangeNotNullForOtherNotEquals() {
ItemKeys ik1 = new ItemKeys(null, "ab");
ItemKeys ik2 = new ItemKeys("ab", null);
ItemKeys ik1 = new ItemKeys(null, string("ab"));
ItemKeys ik2 = new ItemKeys(string("ab"), null);
assertFalse(ik1.equals(ik2));
}
private static AttributeValue string(final String s) {
return AttributeValue.builder().s(s).build();
}
}

View File

@ -16,13 +16,6 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
@ -36,15 +29,22 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import java.io.FileInputStream;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -54,6 +54,15 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class PutDynamoDBRecordTest {
@ -61,29 +70,29 @@ public class PutDynamoDBRecordTest {
private static final String TABLE_NAME = "table";
@Mock
private DynamoDB mockDynamoDB;
private DynamoDbClient client;
@Mock
private AWSCredentialsProviderService credentialsProviderService;
private ArgumentCaptor<TableWriteItems> captor;
private ArgumentCaptor<BatchWriteItemRequest> captor;
private PutDynamoDBRecord testSubject;
@BeforeEach
public void setUp() {
captor = ArgumentCaptor.forClass(TableWriteItems.class);
Mockito.when(credentialsProviderService.getIdentifier()).thenReturn("credentialProviderService");
captor = ArgumentCaptor.forClass(BatchWriteItemRequest.class);
when(credentialsProviderService.getIdentifier()).thenReturn("credentialProviderService");
final BatchWriteItemOutcome outcome = Mockito.mock(BatchWriteItemOutcome.class);
final BatchWriteItemResponse response = mock(BatchWriteItemResponse.class);
final Map<String, List<WriteRequest>> unprocessedItems = new HashMap<>();
Mockito.when(outcome.getUnprocessedItems()).thenReturn(unprocessedItems);
Mockito.when(mockDynamoDB.batchWriteItem(captor.capture())).thenReturn(outcome);
when(response.unprocessedItems()).thenReturn(unprocessedItems);
when(client.batchWriteItem(captor.capture())).thenReturn(response);
testSubject = new PutDynamoDBRecord() {
@Override
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
protected DynamoDbClient getClient(final ProcessContext context) {
return client;
}
};
}
@ -94,7 +103,7 @@ public class PutDynamoDBRecordTest {
runner.run();
Assertions.assertTrue(captor.getAllValues().isEmpty());
assertTrue(captor.getAllValues().isEmpty());
}
@Test
@ -104,9 +113,10 @@ public class PutDynamoDBRecordTest {
runner.enqueue(new FileInputStream("src/test/resources/dynamodb/singleInput.json"));
runner.run();
final TableWriteItems result = captor.getValue();
Assertions.assertEquals(TABLE_NAME, result.getTableName());
assertItemsConvertedProperly(result.getItemsToPut(), 1);
final BatchWriteItemRequest result = captor.getValue();
assertTrue(result.hasRequestItems());
assertNotNull(result.requestItems().get(TABLE_NAME));
assertItemsConvertedProperly(result.requestItems().get(TABLE_NAME), 1);
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
}
@ -117,9 +127,10 @@ public class PutDynamoDBRecordTest {
runner.enqueue(new FileInputStream("src/test/resources/dynamodb/multipleInputs.json"));
runner.run();
final TableWriteItems result = captor.getValue();
Assertions.assertEquals(TABLE_NAME, result.getTableName());
assertItemsConvertedProperly(result.getItemsToPut(), 3);
final BatchWriteItemRequest result = captor.getValue();
assertTrue(result.hasRequestItems());
assertNotNull(result.requestItems().get(TABLE_NAME));
assertItemsConvertedProperly(result.requestItems().get(TABLE_NAME), 3);
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
}
@ -130,16 +141,18 @@ public class PutDynamoDBRecordTest {
runner.enqueue(new FileInputStream("src/test/resources/dynamodb/multipleChunks.json"));
runner.run();
final List<TableWriteItems> results = captor.getAllValues();
final List<BatchWriteItemRequest> results = captor.getAllValues();
Assertions.assertEquals(2, results.size());
final TableWriteItems result1 = results.get(0);
Assertions.assertEquals(TABLE_NAME, result1.getTableName());
assertItemsConvertedProperly(result1.getItemsToPut(), 25);
final BatchWriteItemRequest result1 = results.get(0);
assertTrue(result1.hasRequestItems());
assertNotNull(result1.requestItems().get(TABLE_NAME));
assertItemsConvertedProperly(result1.requestItems().get(TABLE_NAME), 25);
final TableWriteItems result2 = results.get(1);
Assertions.assertEquals(TABLE_NAME, result2.getTableName());
assertItemsConvertedProperly(result2.getItemsToPut(), 4);
final BatchWriteItemRequest result2 = results.get(1);
assertTrue(result2.hasRequestItems());
assertNotNull(result2.requestItems().get(TABLE_NAME));
assertItemsConvertedProperly(result2.requestItems().get(TABLE_NAME), 4);
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_SUCCESS).get(0);
@ -167,7 +180,7 @@ public class PutDynamoDBRecordTest {
runner.run();
Assertions.assertEquals(1, captor.getAllValues().size());
Assertions.assertEquals(4, captor.getValue().getItemsToPut().size());
Assertions.assertEquals(4, captor.getValue().requestItems().get(TABLE_NAME).size());
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_SUCCESS).get(0);
@ -196,13 +209,13 @@ public class PutDynamoDBRecordTest {
runner.enqueue(new FileInputStream("src/test/resources/dynamodb/singleInput.json"));
runner.run();
final TableWriteItems result = captor.getValue();
Assertions.assertEquals(1, result.getItemsToPut().size());
final BatchWriteItemRequest result = captor.getValue();
Assertions.assertEquals(1, result.requestItems().get(TABLE_NAME).size());
final Item item = result.getItemsToPut().iterator().next();
Assertions.assertEquals(4, item.asMap().size());
Assertions.assertEquals("P0", item.get("partition"));
Assertions.assertTrue(item.hasAttribute("generated"));
final Map<String, AttributeValue> item = result.requestItems().get(TABLE_NAME).iterator().next().putRequest().item();
Assertions.assertEquals(4, item.size());
Assertions.assertEquals(string("P0"), item.get("partition"));
assertTrue(item.containsKey("generated"));
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
}
@ -216,13 +229,17 @@ public class PutDynamoDBRecordTest {
runner.enqueue(new FileInputStream("src/test/resources/dynamodb/multipleChunks.json"));
runner.run();
final List<Item> items = new ArrayList<>();
captor.getAllValues().forEach(capture -> items.addAll(capture.getItemsToPut()));
final List<Map<String, AttributeValue>> items = new ArrayList<>();
captor.getAllValues().forEach(capture -> capture.requestItems().get(TABLE_NAME).stream()
.map(WriteRequest::putRequest)
.map(PutRequest::item)
.forEach(items::add));
Assertions.assertEquals(29, items.size());
for (int sortKeyValue = 0; sortKeyValue < 29; sortKeyValue++) {
Assertions.assertEquals(new BigDecimal(sortKeyValue + 1), items.get(sortKeyValue).get("sort"));
final AttributeValue expectedValue = AttributeValue.builder().n(String.valueOf(sortKeyValue + 1)).build();
Assertions.assertEquals(expectedValue, items.get(sortKeyValue).get("sort"));
}
}
@ -234,7 +251,7 @@ public class PutDynamoDBRecordTest {
runner.enqueue(new FileInputStream("src/test/resources/dynamodb/singleInput.json"));
runner.run();
Mockito.verify(mockDynamoDB, Mockito.never()).batchWriteItem(Mockito.any(TableWriteItems.class));
verify(client, never()).batchWriteItem(any(BatchWriteItemRequest.class));
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_FAILURE, 1);
}
@ -246,7 +263,7 @@ public class PutDynamoDBRecordTest {
runner.enqueue(new FileInputStream("src/test/resources/dynamodb/multipleInputs.json"));
runner.run();
Mockito.verify(mockDynamoDB, Mockito.times(1)).batchWriteItem(Mockito.any(TableWriteItems.class));
verify(client, times(1)).batchWriteItem(any(BatchWriteItemRequest.class));
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_FAILURE, 1);
}
@ -258,7 +275,7 @@ public class PutDynamoDBRecordTest {
runner.run();
Assertions.assertTrue(captor.getAllValues().isEmpty());
assertTrue(captor.getAllValues().isEmpty());
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_FAILURE, 1);
}
@ -280,47 +297,61 @@ public class PutDynamoDBRecordTest {
return runner;
}
private void assertItemsConvertedProperly(final Collection<Item> items, final int expectedNumberOfItems) {
Assertions.assertEquals(expectedNumberOfItems, items.size());
private void assertItemsConvertedProperly(final Collection<WriteRequest> writeRequests, final int expectedNumberOfItems) {
Assertions.assertEquals(expectedNumberOfItems, writeRequests.size());
int index = 0;
for (final Item item : items) {
Assertions.assertEquals(3, item.asMap().size());
Assertions.assertEquals("new", item.get("value"));
for (final WriteRequest writeRequest : writeRequests) {
final PutRequest putRequest = writeRequest.putRequest();
assertNotNull(putRequest);
final Map<String, AttributeValue> item = putRequest.item();
Assertions.assertEquals(3, item.size());
Assertions.assertEquals(string("new"), item.get("value"));
Assertions.assertEquals(new BigDecimal(index), item.get("size"));
Assertions.assertEquals("P" + index, item.get("partition"));
Assertions.assertEquals(number(index), item.get("size"));
Assertions.assertEquals(string("P" + index), item.get("partition"));
index++;
}
}
private void setInsertionError() {
final BatchWriteItemOutcome outcome = Mockito.mock(BatchWriteItemOutcome.class);
final BatchWriteItemResponse outcome = mock(BatchWriteItemResponse.class);
final Map<String, List<WriteRequest>> unprocessedItems = new HashMap<>();
final List<WriteRequest> writeResults = Arrays.asList(Mockito.mock(WriteRequest.class));
final List<WriteRequest> writeResults = Arrays.asList(mock(WriteRequest.class));
unprocessedItems.put("test", writeResults);
Mockito.when(outcome.getUnprocessedItems()).thenReturn(unprocessedItems);
Mockito.when(mockDynamoDB.batchWriteItem(captor.capture())).thenReturn(outcome);
when(outcome.unprocessedItems()).thenReturn(unprocessedItems);
when(outcome.hasUnprocessedItems()).thenReturn(true);
when(client.batchWriteItem(captor.capture())).thenReturn(outcome);
}
private void setServerError() {
Mockito.when(mockDynamoDB.batchWriteItem(captor.capture())).thenThrow(new AmazonServiceException("Error"));
when(client.batchWriteItem(captor.capture())).thenThrow(AwsServiceException.builder().message("Error")
.awsErrorDetails(AwsErrorDetails.builder().errorMessage("Error").errorCode("Code").build()).build());
}
private void setExceedThroughputAtGivenChunk(final int chunkToFail) {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
Mockito.when(mockDynamoDB.batchWriteItem(captor.capture())).then(new Answer<Object>() {
when(client.batchWriteItem(captor.capture())).then(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
final int calls = numberOfCalls.incrementAndGet();
if (calls >= chunkToFail) {
throw new ProvisionedThroughputExceededException("Throughput exceeded");
throw ProvisionedThroughputExceededException.builder().message("Throughput exceeded")
.awsErrorDetails(AwsErrorDetails.builder().errorCode("error code").errorMessage("error message").build()).build();
} else {
return Mockito.mock(BatchWriteItemOutcome.class);
return mock(BatchWriteItemResponse.class);
}
}
});
}
protected static AttributeValue string(final String s) {
return AttributeValue.builder().s(s).build();
}
protected static AttributeValue number(final Number number) {
return AttributeValue.builder().n(String.valueOf(number)).build();
}
}

View File

@ -16,16 +16,6 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
import com.amazonaws.services.dynamodbv2.model.PutRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.reporting.InitializationException;
@ -34,40 +24,46 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class PutDynamoDBTest extends AbstractDynamoDBTest {
private static final byte[] HELLO_2_BYTES = "{\"hell\": 2}".getBytes(StandardCharsets.UTF_8);
protected PutDynamoDB putDynamoDB;
protected BatchWriteItemResult result = new BatchWriteItemResult();
BatchWriteItemOutcome outcome;
@BeforeEach
public void setUp() {
outcome = new BatchWriteItemOutcome(result);
result.setUnprocessedItems(new HashMap<>());
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
return outcome;
}
};
final Map<String, Collection<Map<String, AttributeValue>>> responses = new HashMap<>();
responses.put(stringHashStringRangeTableName, Collections.emptyList());
final BatchWriteItemResponse response = BatchWriteItemResponse.builder().build();
client = mock(DynamoDbClient.class);
when(client.batchWriteItem(any(BatchWriteItemRequest.class))).thenReturn(response);
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
protected DynamoDbClient getClient(final ProcessContext context) {
return client;
}
};
}
@ -90,17 +86,7 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
@Test
public void testStringHashStringRangePutOnlyHashFailure() throws InitializationException {
// Inject a mock DynamoDB to create the exception condition
final DynamoDB mockDynamoDb = Mockito.mock(DynamoDB.class);
// When writing, mock thrown service exception from AWS
Mockito.when(mockDynamoDb.batchWriteItem(ArgumentMatchers.<TableWriteItems>any())).thenThrow(getSampleAwsServiceException());
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDb;
}
};
when(client.batchWriteItem(any(BatchWriteItemRequest.class))).thenThrow(getSampleAwsServiceException());
final TestRunner putRunner = createRunner();
putRunner.enqueue(HELLO_2_BYTES);
@ -109,8 +95,8 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
validateServiceExceptionAttributes(flowFile);
}
@ -128,8 +114,8 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR));
}
}
@ -144,8 +130,8 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
}
}
@ -163,8 +149,8 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
}
}
@ -181,8 +167,8 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (final MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
@ -203,15 +189,15 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putRunner.run(2,true,true);
List<MockFlowFile> flowFilesFailed = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFilesFailed) {
final List<MockFlowFile> flowFilesFailed = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFilesFailed) {
System.out.println(flowFile.getAttributes());
flowFile.assertAttributeExists(PutDynamoDB.AWS_DYNAMO_DB_ITEM_SIZE_ERROR);
assertEquals(item.length,flowFile.getSize());
}
List<MockFlowFile> flowFilesSuccessful = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesSuccessful) {
final List<MockFlowFile> flowFilesSuccessful = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (final MockFlowFile flowFile : flowFilesSuccessful) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
@ -233,15 +219,15 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putRunner.run(1);
List<MockFlowFile> flowFilesFailed = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFilesFailed) {
final List<MockFlowFile> flowFilesFailed = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFilesFailed) {
System.out.println(flowFile.getAttributes());
flowFile.assertAttributeExists(PutDynamoDB.AWS_DYNAMO_DB_ITEM_SIZE_ERROR);
assertEquals(item.length,flowFile.getSize());
}
List<MockFlowFile> flowFilesSuccessful = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesSuccessful) {
final List<MockFlowFile> flowFilesSuccessful = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (final MockFlowFile flowFile : flowFilesSuccessful) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
@ -261,9 +247,9 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
final List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
assertEquals(1,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
for (final MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
flowFile.assertAttributeExists(PutDynamoDB.AWS_DYNAMO_DB_ITEM_SIZE_ERROR);
assertEquals(item.length,flowFile.getSize());
@ -272,19 +258,7 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
@Test
public void testStringHashStringRangePutThrowsServiceException() throws InitializationException {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
throw new AmazonServiceException("serviceException");
}
};
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};
when(client.batchWriteItem(any(BatchWriteItemRequest.class))).thenThrow(getSampleAwsServiceException());
final TestRunner putRunner = createRunner();
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
@ -295,30 +269,18 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("serviceException (Service: null; Status Code: 0; Error Code: null; Request ID: null; Proxy: null)",
final List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertEquals("Test AWS Service Exception (Service: Dynamo DB, Status Code: 0, Request ID: TestRequestId-1234567890)",
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangePutThrowsClientException() throws InitializationException {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
throw new AmazonClientException("clientException");
}
};
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};
public void testStringHashStringRangePutThrowsSdkException() throws InitializationException {
when(client.batchWriteItem(any(BatchWriteItemRequest.class)))
.thenThrow(SdkException.builder().message("sdkException").build());
final TestRunner putRunner = createRunner();
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
@ -328,28 +290,16 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("clientException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
final List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertEquals("sdkException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangePutThrowsRuntimeException() throws InitializationException {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
throw new RuntimeException("runtimeException");
}
};
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB(ProcessContext context) {
return mockDynamoDB;
}
};
when(client.batchWriteItem(any(BatchWriteItemRequest.class))).thenThrow(new RuntimeException("runtimeException"));
final TestRunner putRunner = createRunner();
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
@ -360,8 +310,8 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
final List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (final MockFlowFile flowFile : flowFiles) {
assertEquals("runtimeException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
@ -369,15 +319,19 @@ public class PutDynamoDBTest extends AbstractDynamoDBTest {
@Test
public void testStringHashStringRangePutSuccessfulWithMockOneUnprocessed() throws InitializationException {
final Map<String, List<WriteRequest>> unprocessed = new HashMap<>();
final PutRequest put = new PutRequest();
put.addItemEntry("hashS", new AttributeValue("h1"));
put.addItemEntry("rangeS", new AttributeValue("r1"));
WriteRequest write = new WriteRequest(put);
List<WriteRequest> writes = new ArrayList<>();
writes.add(write);
unprocessed.put(stringHashStringRangeTableName, writes);
result.setUnprocessedItems(unprocessed);
final Map<String, Collection<WriteRequest>> unprocessedTableToKeysMap = new HashMap<>();
final Map<String, AttributeValue> keyMap = new HashMap<>();
keyMap.put("hashS", string("h1"));
keyMap.put("rangeS", string("r1"));
final WriteRequest writeRequest = WriteRequest.builder().putRequest(PutRequest.builder().item(keyMap).build()).build();
unprocessedTableToKeysMap.put(stringHashStringRangeTableName, Collections.singletonList(writeRequest));
final BatchWriteItemResponse response = BatchWriteItemResponse.builder()
.unprocessedItems(unprocessedTableToKeysMap)
.build();
when(client.batchWriteItem(any(BatchWriteItemRequest.class))).thenReturn(response);
final TestRunner putRunner = createRunner();
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
import com.amazonaws.services.dynamodbv2.document.Item;
import org.apache.nifi.action.Component;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
@ -26,6 +25,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.math.BigDecimal;
import java.math.BigInteger;
@ -35,6 +35,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class RecordToItemConverterTest {
@Test
@ -77,41 +81,42 @@ class RecordToItemConverterTest {
values.put("choice", Integer.MAX_VALUE);
final Record record = new MapRecord(schema, values);
final Item item = new Item();
final Map<String, AttributeValue> item = new HashMap<>();
for (final RecordField schemaField : schema.getFields()) {
RecordToItemConverter.addField(record, item, schemaField.getDataType().getFieldType(), schemaField.getFieldName());
}
Assertions.assertEquals(Boolean.TRUE, item.get("boolean"));
assertEquals(bool(Boolean.TRUE), item.get("boolean"));
// Internally Item stores numbers as BigDecimal
Assertions.assertEquals(BigDecimal.valueOf(Short.MAX_VALUE), item.get("short"));
Assertions.assertEquals(BigDecimal.valueOf(Integer.MAX_VALUE), item.get("int"));
Assertions.assertEquals(BigDecimal.valueOf(Long.MAX_VALUE), item.get("long"));
Assertions.assertEquals(BigDecimal.valueOf(Byte.MAX_VALUE), item.get("byte"));
Assertions.assertEquals(new BigDecimal("12345678901234567890.123456789012345678901234567890"), item.get("decimal"));
Assertions.assertEquals(Float.valueOf(123.456F), ((BigDecimal) item.get("float")).floatValue(), 0.0001);
Assertions.assertEquals(Double.valueOf(1234.5678D), ((BigDecimal) item.get("double")).floatValue(), 0.0001);
assertEquals(number(Short.MAX_VALUE), item.get("short"));
assertEquals(number(Integer.MAX_VALUE), item.get("int"));
assertEquals(number(Long.MAX_VALUE), item.get("long"));
assertEquals(number(Byte.MAX_VALUE), item.get("byte"));
assertEquals(number(new BigDecimal("12345678901234567890.123456789012345678901234567890")), item.get("decimal"));
assertEquals(123.456F, Float.valueOf(item.get("float").n()).floatValue(), 0.0001);
assertEquals(1234.5678D, Float.valueOf(item.get("double").n()).doubleValue(), 0.0001);
Assertions.assertEquals(BigDecimal.valueOf(10), item.get("bigint"));
assertEquals(number(10), item.get("bigint"));
// DynamoDB uses string to represent time and date
Assertions.assertTrue(item.get("timestamp") instanceof String);
Assertions.assertTrue(item.get("date") instanceof String);
Assertions.assertTrue(item.get("time") instanceof String);
assertNotNull(item.get("timestamp").s());
assertNotNull(item.get("date").s());
assertNotNull(item.get("time").s());
// Character is unknown type for DynamoDB, as well as enum
Assertions.assertEquals("c", item.get("char"));
assertEquals(string("c"), item.get("char"));
// Enum is not supported in DynamoDB
Assertions.assertEquals(Component.Controller.name(), item.get("enum"));
assertEquals(string(Component.Controller.name()), item.get("enum"));
// DynamoDB uses lists and still keeps the payload datatype
Assertions.assertIterableEquals(Arrays.asList(new BigDecimal[] {BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.TEN}), (Iterable<?>) item.get("array"));
Assertions.assertIterableEquals(Arrays.asList(number(BigDecimal.ZERO), number(BigDecimal.ONE), number(BigDecimal.TEN)),
item.get("array").l());
// DynamoDB cannot handle choice, all values enveloped into choice are handled as strings
Assertions.assertEquals("2147483647", item.get("choice"));
assertEquals(string("2147483647"), item.get("choice"));
}
@Test
@ -132,16 +137,16 @@ class RecordToItemConverterTest {
starSystemValues.put("star", star);
final Record starSystem = new MapRecord(starSystemSchema, starSystemValues);
final Item item = new Item();
final Map<String, AttributeValue> item = new HashMap<>();
RecordToItemConverter.addField(starSystem, item, RecordFieldType.MAP, "star");
final Object result = item.get("star");
Assertions.assertTrue(result instanceof Map);
final Map<String, Object> resultMap = (Map<String, Object>) result;
Assertions.assertEquals(2, resultMap.size());
Assertions.assertEquals(false, resultMap.get("isDwarf"));
Assertions.assertEquals("G", resultMap.get("type"));
final AttributeValue result = item.get("star");
assertTrue(result.hasM());
final Map<String, AttributeValue> resultMap = result.m();
assertEquals(2, resultMap.size());
assertEquals(bool(false), resultMap.get("isDwarf"));
assertEquals(string("G"), resultMap.get("type"));
}
@Test
@ -165,19 +170,19 @@ class RecordToItemConverterTest {
starSystemValues.put("star", star);
final Record starSystem = new MapRecord(starSystemSchema, starSystemValues);
final Item item = new Item();
final Map<String, AttributeValue> item = new HashMap<>();
RecordToItemConverter.addField(starSystem, item, RecordFieldType.MAP, "star");
final Object result = item.get("star");
Assertions.assertTrue(result instanceof Map);
final Map<String, Object> resultMap = (Map<String, Object>) result;
Assertions.assertEquals(1, resultMap.size());
final Object starTypeResult = resultMap.get("starType");
Assertions.assertTrue(starTypeResult instanceof Map);
final Map<String, Object> starTypeResultMap = (Map<String, Object>) starTypeResult;
Assertions.assertEquals(false, starTypeResultMap.get("isDwarf"));
Assertions.assertEquals("G", starTypeResultMap.get("type"));
final AttributeValue result = item.get("star");
assertTrue(result.hasM());
final Map<String, AttributeValue> resultMap = result.m();
assertEquals(1, resultMap.size());
final AttributeValue starTypeResult = resultMap.get("starType");
assertTrue(starTypeResult.hasM());
final Map<String, AttributeValue> starTypeResultMap = starTypeResult.m();
assertEquals(bool(false), starTypeResultMap.get("isDwarf"));
assertEquals(string("G"), starTypeResultMap.get("type"));
}
@Test
@ -204,16 +209,16 @@ class RecordToItemConverterTest {
starSystemValues.put("star", star);
final Record starSystem = new MapRecord(starSystemSchema, starSystemValues);
final Item item = new Item();
final Map<String, AttributeValue> item = new HashMap<>();
RecordToItemConverter.addField(starSystem, item, RecordFieldType.RECORD, "star");
final Object result = item.get("star");
Assertions.assertTrue(result instanceof Map);
final Map<String, Object> resultMap = (Map<String, Object>) result;
Assertions.assertEquals(2, resultMap.size());
Assertions.assertEquals(false, resultMap.get("isDwarf"));
Assertions.assertEquals("G", resultMap.get("type"));
final AttributeValue result = item.get("star");
assertNotNull(result.m());
final Map<String, AttributeValue> resultMap = result.m();
assertEquals(2, resultMap.size());
assertEquals(bool(false), resultMap.get("isDwarf"));
assertEquals(string("G"), resultMap.get("type"));
}
@Test
@ -248,18 +253,30 @@ class RecordToItemConverterTest {
starSystemValues.put("star", star);
final Record starSystem = new MapRecord(starSystemSchema, starSystemValues);
final Item item = new Item();
final Map<String, AttributeValue> item = new HashMap<>();
RecordToItemConverter.addField(starSystem, item, RecordFieldType.RECORD, "star");
final Object result = item.get("star");
Assertions.assertTrue(result instanceof Map);
final Map<String, Object> resultMap = (Map<String, Object>) result;
Assertions.assertEquals(1, resultMap.size());
final Object fieldResult = resultMap.get("starType");
Assertions.assertTrue(fieldResult instanceof Map);
final Map<String, Object> fieldResultMap = (Map<String, Object>) fieldResult;
Assertions.assertEquals(false, fieldResultMap.get("isDwarf"));
Assertions.assertEquals("G", fieldResultMap.get("type"));
final AttributeValue result = item.get("star");
assertNotNull(result.m());
final Map<String, AttributeValue> resultMap = result.m();
assertEquals(1, resultMap.size());
final AttributeValue fieldResult = resultMap.get("starType");
Assertions.assertNotNull(fieldResult.m());
final Map<String, AttributeValue> fieldResultMap = fieldResult.m();
assertEquals(bool(false), fieldResultMap.get("isDwarf"));
assertEquals(string("G"), fieldResultMap.get("type"));
}
private static AttributeValue bool(final Boolean value) {
return AttributeValue.builder().bool(value).build();
}
private static AttributeValue string(final String value) {
return AttributeValue.builder().s(value).build();
}
private static AttributeValue number(final Number value) {
return AttributeValue.builder().n(String.valueOf(value)).build();
}
}