NIFI-1516 Provide AWS DynamoDB Delete/Put/Get processors

This closes #224.

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
mans2singh 2016-04-08 11:21:39 -04:00 committed by Aldrin Piri
parent 41583e6dc0
commit e3155a8a4b
14 changed files with 3019 additions and 1 deletions

View File

@ -31,6 +31,7 @@ import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@ -287,4 +288,10 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
}
@OnShutdown
public void onShutdown() {
if ( getClient() != null ) {
getClient().shutdown();
}
}
}

View File

@ -0,0 +1,338 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
/**
* Base class for Nifi dynamo db related processors
*
* @see DeleteDynamoDB
* @see PutDynamoDB
* @see GetDynamoDB
*/
public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonDynamoDBClient> {
public static final Relationship REL_UNPROCESSED = new Relationship.Builder().name("unprocessed")
.description("FlowFiles are routed to unprocessed relationship when DynamoDB is not able to process "
+ "all the items in the request. Typical reasons are insufficient table throughput capacity and exceeding the maximum bytes per request. "
+ "Unprocessed FlowFiles can be retried with a new request.").build();
public static final AllowableValue ALLOWABLE_VALUE_STRING = new AllowableValue("string");
public static final AllowableValue ALLOWABLE_VALUE_NUMBER = new AllowableValue("number");
public static final String DYNAMODB_KEY_ERROR_UNPROCESSED = "dynamodb.key.error.unprocessed";
public static final String DYNAMODB_RANGE_KEY_VALUE_ERROR = "dynmodb.range.key.value.error";
public static final String DYNAMODB_HASH_KEY_VALUE_ERROR = "dynmodb.hash.key.value.error";
public static final String DYNAMODB_KEY_ERROR_NOT_FOUND = "dynamodb.key.error.not.found";
public static final String DYNAMODB_ERROR_EXCEPTION_MESSAGE = "dynamodb.error.exception.message";
public static final String DYNAMODB_ERROR_CODE = "dynamodb.error.code";
public static final String DYNAMODB_ERROR_MESSAGE = "dynamodb.error.message";
public static final String DYNAMODB_ERROR_TYPE = "dynamodb.error.type";
public static final String DYNAMODB_ERROR_SERVICE = "dynamodb.error.service";
public static final String DYNAMODB_ERROR_RETRYABLE = "dynamodb.error.retryable";
public static final String DYNAMODB_ERROR_REQUEST_ID = "dynamodb.error.request.id";
public static final String DYNAMODB_ERROR_STATUS_CODE = "dynamodb.error.status.code";
public static final String DYNAMODB_ITEM_HASH_KEY_VALUE = " dynamodb.item.hash.key.value";
public static final String DYNAMODB_ITEM_RANGE_KEY_VALUE = " dynamodb.item.range.key.value";
public static final String DYNAMODB_ITEM_IO_ERROR = "dynamodb.item.io.error";
public static final String AWS_DYNAMO_DB_ITEM_SIZE_ERROR = "dynamodb.item.size.error";
protected static final String DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE = "DynamoDB key not found : ";
public static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder()
.name("Table Name")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("The DynamoDB table name")
.build();
public static final PropertyDescriptor HASH_KEY_VALUE = new PropertyDescriptor.Builder()
.name("Hash Key Value")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("The hash key value of the item")
.defaultValue("${dynamodb.item.hash.key.value}")
.build();
public static final PropertyDescriptor RANGE_KEY_VALUE = new PropertyDescriptor.Builder()
.name("Range Key Value")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("${dynamodb.item.range.key.value}")
.build();
public static final PropertyDescriptor HASH_KEY_VALUE_TYPE = new PropertyDescriptor.Builder()
.name("Hash Key Value Type")
.required(true)
.description("The hash key value type of the item")
.defaultValue(ALLOWABLE_VALUE_STRING.getValue())
.allowableValues(ALLOWABLE_VALUE_STRING, ALLOWABLE_VALUE_NUMBER)
.build();
public static final PropertyDescriptor RANGE_KEY_VALUE_TYPE = new PropertyDescriptor.Builder()
.name("Range Key Value Type")
.required(true)
.description("The range key value type of the item")
.defaultValue(ALLOWABLE_VALUE_STRING.getValue())
.allowableValues(ALLOWABLE_VALUE_STRING, ALLOWABLE_VALUE_NUMBER)
.build();
public static final PropertyDescriptor HASH_KEY_NAME = new PropertyDescriptor.Builder()
.name("Hash Key Name")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("The hash key name of the item")
.build();
public static final PropertyDescriptor RANGE_KEY_NAME = new PropertyDescriptor.Builder()
.name("Range Key Name")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("The range key name of the item")
.build();
public static final PropertyDescriptor JSON_DOCUMENT = new PropertyDescriptor.Builder()
.name("Json Document attribute")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("The Json document to be retrieved from the dynamodb item")
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch items for each request (between 1 and 50)")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.createLongValidator(1, 50, true))
.defaultValue("1")
.description("The items to be retrieved in one batch")
.build();
public static final PropertyDescriptor DOCUMENT_CHARSET = new PropertyDescriptor.Builder()
.name("Character set of document")
.description("Character set of data in the document")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.required(true)
.defaultValue(Charset.defaultCharset().name())
.build();
protected volatile DynamoDB dynamoDB;
public static final Set<Relationship> dynamoDBrelationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_UNPROCESSED)));
@Override
public Set<Relationship> getRelationships() {
return dynamoDBrelationships;
}
/**
* Create client using credentials provider. This is the preferred way for creating clients
*/
@Override
protected AmazonDynamoDBClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
getLogger().debug("Creating client with credentials provider");
final AmazonDynamoDBClient client = new AmazonDynamoDBClient(credentialsProvider, config);
return client;
}
/**
* Create client using AWSCredentials
*
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
*/
@Override
protected AmazonDynamoDBClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
getLogger().debug("Creating client with aws credentials");
final AmazonDynamoDBClient client = new AmazonDynamoDBClient(credentials, config);
return client;
}
protected Object getValue(ProcessContext context, PropertyDescriptor type, PropertyDescriptor value, FlowFile flowFile) {
if ( context.getProperty(type).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) {
return context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue();
} else {
return new BigDecimal(context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue());
}
}
protected Object getAttributeValue(ProcessContext context, PropertyDescriptor propertyType, AttributeValue value) {
if ( context.getProperty(propertyType).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) {
if ( value == null ) return null;
else return value.getS();
} else {
if ( value == null ) return null;
else return new BigDecimal(value.getN());
}
}
protected synchronized DynamoDB getDynamoDB() {
if ( dynamoDB == null )
dynamoDB = new DynamoDB(client);
return dynamoDB;
}
protected Object getValue(Map<String, AttributeValue> item, String keyName, String valueType) {
if ( ALLOWABLE_VALUE_STRING.getValue().equals(valueType)) {
AttributeValue val = item.get(keyName);
if ( val == null ) return val;
else return val.getS();
} else {
AttributeValue val = item.get(keyName);
if ( val == null ) return val;
else return val.getN();
}
}
protected List<FlowFile> processException(final ProcessSession session, List<FlowFile> flowFiles, Exception exception) {
List<FlowFile> failedFlowFiles = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
flowFile = session.putAttribute(flowFile, DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
failedFlowFiles.add(flowFile);
}
return failedFlowFiles;
}
protected List<FlowFile> processClientException(final ProcessSession session, List<FlowFile> flowFiles,
AmazonClientException exception) {
List<FlowFile> failedFlowFiles = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
Map<String,String> attributes = new HashMap<>();
attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
flowFile = session.putAllAttributes(flowFile, attributes);
failedFlowFiles.add(flowFile);
}
return failedFlowFiles;
}
protected List<FlowFile> processServiceException(final ProcessSession session, List<FlowFile> flowFiles,
AmazonServiceException exception) {
List<FlowFile> failedFlowFiles = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
Map<String,String> attributes = new HashMap<>();
attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
attributes.put(DYNAMODB_ERROR_CODE, exception.getErrorCode() );
attributes.put(DYNAMODB_ERROR_MESSAGE, exception.getErrorMessage() );
attributes.put(DYNAMODB_ERROR_TYPE, exception.getErrorType().name() );
attributes.put(DYNAMODB_ERROR_SERVICE, exception.getServiceName() );
attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
attributes.put(DYNAMODB_ERROR_REQUEST_ID, exception.getRequestId() );
attributes.put(DYNAMODB_ERROR_STATUS_CODE, Integer.toString(exception.getStatusCode()) );
attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
attributes.put(DYNAMODB_ERROR_RETRYABLE, Boolean.toString(exception.isRetryable()));
flowFile = session.putAllAttributes(flowFile, attributes);
failedFlowFiles.add(flowFile);
}
return failedFlowFiles;
}
/**
* Send unhandled items to failure and remove the flow files from key to flow file map
* @param session used for sending the flow file
* @param keysToFlowFileMap - ItemKeys to flow file map
* @param hashKeyValue the items hash key value
* @param rangeKeyValue the items hash key value
*/
protected void sendUnprocessedToUnprocessedRelationship(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, Object hashKeyValue, Object rangeKeyValue) {
ItemKeys itemKeys = new ItemKeys(hashKeyValue, rangeKeyValue);
FlowFile flowFile = keysToFlowFileMap.get(itemKeys);
flowFile = session.putAttribute(flowFile, DYNAMODB_KEY_ERROR_UNPROCESSED, itemKeys.toString());
session.transfer(flowFile,REL_UNPROCESSED);
getLogger().error("Unprocessed key " + itemKeys + " for flow file " + flowFile);
keysToFlowFileMap.remove(itemKeys);
}
protected boolean isRangeKeyValueConsistent(String rangeKeyName, Object rangeKeyValue, ProcessSession session,
FlowFile flowFile) {
boolean isRangeNameBlank = StringUtils.isBlank(rangeKeyName);
boolean isRangeValueNull = rangeKeyValue == null;
boolean isConsistent = true;
if ( ! isRangeNameBlank && (isRangeValueNull || StringUtils.isBlank(rangeKeyValue.toString()))) {
isConsistent = false;
}
if ( isRangeNameBlank && ( ! isRangeValueNull && ! StringUtils.isBlank(rangeKeyValue.toString()))) {
isConsistent = false;
}
if ( ! isConsistent ) {
getLogger().error("Range key name '" + rangeKeyName + "' was not consistent with range value "
+ rangeKeyValue + "'" + flowFile);
flowFile = session.putAttribute(flowFile, DYNAMODB_RANGE_KEY_VALUE_ERROR, "range key '" + rangeKeyName
+ "'/value '" + rangeKeyValue + "' inconsistency error");
session.transfer(flowFile, REL_FAILURE);
}
return isConsistent;
}
protected boolean isHashKeyValueConsistent(String hashKeyName, Object hashKeyValue, ProcessSession session,
FlowFile flowFile) {
boolean isConsistent = true;
if ( hashKeyValue == null || StringUtils.isBlank(hashKeyValue.toString())) {
getLogger().error("Hash key value '" + hashKeyValue + "' is required for flow file " + flowFile);
flowFile = session.putAttribute(flowFile, DYNAMODB_HASH_KEY_VALUE_ERROR, "hash key " + hashKeyName
+ "/value '" + hashKeyValue + "' inconsistency error");
session.transfer(flowFile, REL_FAILURE);
isConsistent = false;
}
return isConsistent;
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import java.util.List;
import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
public abstract class AbstractWriteDynamoDBProcessor extends AbstractDynamoDBProcessor {
/**
* Helper method to handle unprocessed items items
* @param session process session
* @param keysToFlowFileMap map of flow db primary key to flow file
* @param table dynamodb table
* @param hashKeyName the hash key name
* @param hashKeyValueType the hash key value
* @param rangeKeyName the range key name
* @param rangeKeyValueType range key value
* @param outcome the write outcome
*/
protected void handleUnprocessedItems(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, final String table, final String hashKeyName, final String hashKeyValueType,
final String rangeKeyName, final String rangeKeyValueType, BatchWriteItemOutcome outcome) {
BatchWriteItemResult result = outcome.getBatchWriteItemResult();
// Handle unprocessed items
List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
if ( unprocessedItems != null && unprocessedItems.size() > 0 ) {
for ( WriteRequest request : unprocessedItems) {
Map<String,AttributeValue> item = getRequestItem(request);
Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);
sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
}
}
}
/**
* Get the request item key and attribute value
* @param writeRequest write request
* @return Map of keys and values
*
* @see PutDynamoDB#getRequestItem(WriteRequest)
* @see DeleteDynamoDB#getRequestItem(WriteRequest)
*/
protected abstract Map<String, AttributeValue> getRequestItem(WriteRequest writeRequest);
}

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
@SupportsBatching
@SeeAlso({GetDynamoDB.class, PutDynamoDB.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "DynamoDB", "AWS", "Delete", "Remove"})
@CapabilityDescription("Deletes a document from DynamoDB based on hash and range key. The key can be string or number."
+ " The request requires all the primary keys for the operation (hash or hash and range key)")
@WritesAttributes({
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo db unprocessed keys"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "Dynamod db range key error"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo db key not found"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "Dynamo db exception message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error code"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db error message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error type"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db error service"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db error is retryable"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db error request id"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db status code")
})
@ReadsAttributes({
@ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items hash key value" ),
@ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items range key value" ),
})
public class DeleteDynamoDB extends AbstractWriteDynamoDBProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY,
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger());
if (flowFiles == null || flowFiles.size() == 0) {
return;
}
Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
final String table = context.getProperty(TABLE).getValue();
final String hashKeyName = context.getProperty(HASH_KEY_NAME).getValue();
final String hashKeyValueType = context.getProperty(HASH_KEY_VALUE_TYPE).getValue();
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).getValue();
final String rangeKeyValueType = context.getProperty(RANGE_KEY_VALUE_TYPE).getValue();
TableWriteItems tableWriteItems = new TableWriteItems(table);
for (FlowFile flowFile : flowFiles) {
final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile);
final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) {
continue;
}
if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile) ) {
continue;
}
if ( rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString()) ) {
tableWriteItems.addHashOnlyPrimaryKeysToDelete(hashKeyName, hashKeyValue);
} else {
tableWriteItems.addHashAndRangePrimaryKeyToDelete(hashKeyName,
hashKeyValue, rangeKeyName, rangeKeyValue);
}
keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile);
}
if ( keysToFlowFileMap.isEmpty() ) {
return;
}
final DynamoDB dynamoDB = getDynamoDB();
try {
BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(tableWriteItems);
handleUnprocessedItems(session, keysToFlowFileMap, table, hashKeyName, hashKeyValueType, rangeKeyName,
rangeKeyValueType, outcome);
// All non unprocessed items are successful
for (FlowFile flowFile : keysToFlowFileMap.values()) {
getLogger().debug("Successfully deleted item from dynamodb : " + table);
session.transfer(flowFile,REL_SUCCESS);
}
} catch(AmazonServiceException exception) {
getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
} catch(AmazonClientException exception) {
getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processClientException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
} catch(Exception exception) {
getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
}
}
/**
* {@inheritDoc}
*/
protected Map<String, AttributeValue> getRequestItem(WriteRequest writeRequest) {
return writeRequest.getDeleteRequest().getKey();
}
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import java.io.ByteArrayInputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
@SupportsBatching
@SeeAlso({DeleteDynamoDB.class, PutDynamoDB.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "DynamoDB", "AWS", "Get", "Fetch"})
@CapabilityDescription("Retrieves a document from DynamoDB based on hash and range key. The key can be string or number."
+ "For any get request all the primary keys are required (hash or hash and range based on the table keys)."
+ "A Json Document ('Map') attribute of the DynamoDB item is read into the content of the FlowFile.")
@WritesAttributes({
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo db unprocessed keys"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "Dynamod db range key error"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo db key not found"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "Dynamo db exception message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error code"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db error message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error type"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db error service"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db error is retryable"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db error request id"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db status code")
})
@ReadsAttributes({
@ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items hash key value" ),
@ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items range key value" ),
})
public class GetDynamoDB extends AbstractDynamoDBProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY,
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
.description("FlowFiles are routed to not found relationship if key not found in the table").build();
public static final Set<Relationship> getDynamoDBrelationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_UNPROCESSED, REL_NOT_FOUND)));
@Override
public Set<Relationship> getRelationships() {
return getDynamoDBrelationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger());
if (flowFiles == null || flowFiles.size() == 0) {
return;
}
Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
final String table = context.getProperty(TABLE).getValue();
TableKeysAndAttributes tableKeysAndAttributes = new TableKeysAndAttributes(table);
final String hashKeyName = context.getProperty(HASH_KEY_NAME).getValue();
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).getValue();
final String jsonDocument = context.getProperty(JSON_DOCUMENT).getValue();
for (FlowFile flowFile : flowFiles) {
final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile);
final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) {
continue;
}
if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile) ) {
continue;
}
keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile);
if ( rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString()) ) {
tableKeysAndAttributes.addHashOnlyPrimaryKey(hashKeyName, hashKeyValue);
} else {
tableKeysAndAttributes.addHashAndRangePrimaryKey(hashKeyName, hashKeyValue, rangeKeyName, rangeKeyValue);
}
}
if (keysToFlowFileMap.isEmpty()) {
return;
}
final DynamoDB dynamoDB = getDynamoDB();
try {
BatchGetItemOutcome result = dynamoDB.batchGetItem(tableKeysAndAttributes);
// Handle processed items and get the json document
List<Item> items = result.getTableItems().get(table);
for (Item item : items) {
ItemKeys itemKeys = new ItemKeys(item.get(hashKeyName), item.get(rangeKeyName));
FlowFile flowFile = keysToFlowFileMap.get(itemKeys);
if ( item.get(jsonDocument) != null ) {
ByteArrayInputStream bais = new ByteArrayInputStream(item.getJSON(jsonDocument).getBytes());
flowFile = session.importFrom(bais, flowFile);
}
session.transfer(flowFile,REL_SUCCESS);
keysToFlowFileMap.remove(itemKeys);
}
// Handle unprocessed keys
Map<String, KeysAndAttributes> unprocessedKeys = result.getUnprocessedKeys();
if ( unprocessedKeys != null && unprocessedKeys.size() > 0) {
KeysAndAttributes keysAndAttributes = unprocessedKeys.get(table);
List<Map<String, AttributeValue>> keys = keysAndAttributes.getKeys();
for (Map<String,AttributeValue> unprocessedKey : keys) {
Object hashKeyValue = getAttributeValue(context, HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName));
Object rangeKeyValue = getAttributeValue(context, RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName));
sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
}
}
// Handle any remaining items
for (ItemKeys key : keysToFlowFileMap.keySet()) {
FlowFile flowFile = keysToFlowFileMap.get(key);
flowFile = session.putAttribute(flowFile, DYNAMODB_KEY_ERROR_NOT_FOUND, DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE + key.toString() );
session.transfer(flowFile,REL_NOT_FOUND);
keysToFlowFileMap.remove(key);
}
} catch(AmazonServiceException exception) {
getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
} catch(AmazonClientException exception) {
getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processClientException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
} catch(Exception exception) {
getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
/**
* Utility class to keep a map of keys and flow files
*/
class ItemKeys {
protected Object hashKey = "";
protected Object rangeKey = "";
public ItemKeys(Object hashKey, Object rangeKey) {
if ( hashKey != null )
this.hashKey = hashKey;
if ( rangeKey != null )
this.rangeKey = rangeKey;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this,ToStringStyle.SHORT_PREFIX_STYLE);
}
@Override
public int hashCode() {
return HashCodeBuilder.reflectionHashCode(this, false);
}
@Override
public boolean equals(Object other) {
return EqualsBuilder.reflectionEquals(this, other, false);
}
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
@SupportsBatching
@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert"})
@CapabilityDescription("Puts a document from DynamoDB based on hash and range key. The table can have either hash and range or hash key alone."
+ " Currently the keys supported are string and number and value can be json document. "
+ "In case of hash and range keys both key are required for the operation."
+ " The FlowFile content must be JSON. FlowFile content is mapped to the specified Json Document attribute in the DynamoDB item.")
@WritesAttributes({
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo db unprocessed keys"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "Dynamod db range key error"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo db key not found"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "Dynamo db exception message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error code"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db error message"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error type"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db error service"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db error is retryable"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db error request id"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db error status code"),
@WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
})
@ReadsAttributes({
@ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items hash key value"),
@ReadsAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items range key value")
})
public class PutDynamoDB extends AbstractWriteDynamoDBProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, DOCUMENT_CHARSET, BATCH_SIZE,
REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
/**
* Dyamodb max item size limit 400 kb
*/
public static final int DYNAMODB_MAX_ITEM_SIZE = 400 * 1024;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger());
if (flowFiles == null || flowFiles.size() == 0) {
return;
}
Map<ItemKeys, FlowFile> keysToFlowFileMap = new HashMap<>();
final String table = context.getProperty(TABLE).getValue();
final String hashKeyName = context.getProperty(HASH_KEY_NAME).getValue();
final String hashKeyValueType = context.getProperty(HASH_KEY_VALUE_TYPE).getValue();
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).getValue();
final String rangeKeyValueType = context.getProperty(RANGE_KEY_VALUE_TYPE).getValue();
final String jsonDocument = context.getProperty(JSON_DOCUMENT).getValue();
final String charset = context.getProperty(DOCUMENT_CHARSET).getValue();
TableWriteItems tableWriteItems = new TableWriteItems(table);
for (FlowFile flowFile : flowFiles) {
final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile);
final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) {
continue;
}
if (!isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile)) {
continue;
}
if (!isDataValid(flowFile, jsonDocument)) {
flowFile = session.putAttribute(flowFile, AWS_DYNAMO_DB_ITEM_SIZE_ERROR, "Max size of item + attribute should be 400kb but was " + flowFile.getSize() + jsonDocument.length());
session.transfer(flowFile, REL_FAILURE);
continue;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
try {
if (rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString())) {
tableWriteItems.addItemToPut(new Item().withKeyComponent(hashKeyName, hashKeyValue)
.withJSON(jsonDocument, IOUtils.toString(baos.toByteArray(), charset)));
} else {
tableWriteItems.addItemToPut(new Item().withKeyComponent(hashKeyName, hashKeyValue)
.withKeyComponent(rangeKeyName, rangeKeyValue)
.withJSON(jsonDocument, IOUtils.toString(baos.toByteArray(), charset)));
}
} catch (IOException ioe) {
getLogger().error("IOException while creating put item : " + ioe.getMessage());
flowFile = session.putAttribute(flowFile, DYNAMODB_ITEM_IO_ERROR, ioe.getMessage());
session.transfer(flowFile, REL_FAILURE);
}
keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile);
}
if (keysToFlowFileMap.isEmpty()) {
return;
}
final DynamoDB dynamoDB = getDynamoDB();
try {
BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(tableWriteItems);
handleUnprocessedItems(session, keysToFlowFileMap, table, hashKeyName, hashKeyValueType, rangeKeyName,
rangeKeyValueType, outcome);
// Handle any remaining flowfiles
for (FlowFile flowFile : keysToFlowFileMap.values()) {
getLogger().debug("Successful posted items to dynamodb : " + table);
session.transfer(flowFile, REL_SUCCESS);
}
} catch (AmazonServiceException exception) {
getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
} catch (AmazonClientException exception) {
getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processClientException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
} catch (Exception exception) {
getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processException(session, flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
}
}
private boolean isDataValid(FlowFile flowFile, String jsonDocument) {
return (flowFile.getSize() + jsonDocument.length()) < DYNAMODB_MAX_ITEM_SIZE;
}
/**
* {@inheritDoc}
*/
protected Map<String, AttributeValue> getRequestItem(WriteRequest writeRequest) {
return writeRequest.getPutRequest().getItem();
}
}

View File

@ -21,3 +21,6 @@ org.apache.nifi.processors.aws.sqs.PutSQS
org.apache.nifi.processors.aws.sqs.DeleteSQS
org.apache.nifi.processors.aws.lambda.PutLambda
org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose
org.apache.nifi.processors.aws.dynamodb.GetDynamoDB
org.apache.nifi.processors.aws.dynamodb.PutDynamoDB
org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB

View File

@ -0,0 +1,368 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION;
import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
import com.amazonaws.services.dynamodbv2.model.DeleteRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public class DeleteDynamoDBTest {
protected DeleteDynamoDB deleteDynamoDB;
protected BatchWriteItemResult result = new BatchWriteItemResult();
BatchWriteItemOutcome outcome;
@Before
public void setUp() {
outcome = new BatchWriteItemOutcome(result);
result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
return outcome;
}
};
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
}
@Test
public void testStringHashStringRangeDeleteOnlyHashFailure() {
final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
ITAbstractDynamoDBTest.validateServiceExceptionAttribute(flowFile);
}
}
@Test
public void testStringHashStringRangeDeleteSuccessfulWithMock() {
final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
}
@Test
public void testStringHashStringRangeDeleteSuccessfulWithMockOneUnprocessed() {
Map<String, List<WriteRequest>> unprocessed =
new HashMap<String, List<WriteRequest>>();
DeleteRequest delete = new DeleteRequest();
delete.addKeyEntry("hashS", new AttributeValue("h1"));
delete.addKeyEntry("rangeS", new AttributeValue("r1"));
WriteRequest write = new WriteRequest(delete);
List<WriteRequest> writes = new ArrayList<>();
writes.add(write);
unprocessed.put(stringHashStringRangeTableName, writes);
result.setUnprocessedItems(unprocessed);
final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);
}
@Test
public void testStringHashStringRangeDeleteNoHashValueFailure() {
final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR));
}
}
@Test
public void testStringHashStringRangeDeleteOnlyHashWithRangeValueNoRangeNameFailure() {
final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
}
}
@Test
public void testStringHashStringRangeDeleteOnlyHashWithRangeNameNoRangeValueFailure() {
final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
}
}
@Test
public void testStringHashStringRangeDeleteNonExistentHashSuccess() {
final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "nonexistent");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
}
@Test
public void testStringHashStringRangeDeleteNonExistentRangeSuccess() {
final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "nonexistent");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
}
@Test
public void testStringHashStringRangeDeleteThrowsServiceException() {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
throw new AmazonServiceException("serviceException");
}
};
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("serviceException (Service: null; Status Code: 0; Error Code: null; Request ID: null)", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangeDeleteThrowsClientException() {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
throw new AmazonClientException("clientException");
}
};
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("clientException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangeDeleteThrowsRuntimeException() {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
throw new RuntimeException("runtimeException");
}
};
deleteDynamoDB = new DeleteDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);
deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("runtimeException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
}

View File

@ -0,0 +1,530 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION;
import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchGetItemResult;
import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
import com.amazonaws.util.json.JSONException;
import com.amazonaws.util.json.JSONObject;
public class GetDynamoDBTest {
protected GetDynamoDB getDynamoDB;
protected BatchGetItemOutcome outcome;
protected BatchGetItemResult result = new BatchGetItemResult();
private HashMap unprocessed;
@Before
public void setUp() {
outcome = new BatchGetItemOutcome(result);
KeysAndAttributes kaa = new KeysAndAttributes();
Map<String,AttributeValue> map = new HashMap<>();
map.put("hashS", new AttributeValue("h1"));
map.put("rangeS", new AttributeValue("r1"));
kaa.withKeys(map);
unprocessed = new HashMap<>();
unprocessed.put(stringHashStringRangeTableName, kaa);
result.withUnprocessedKeys(unprocessed);
Map<String,List<Map<String,AttributeValue>>> responses = new HashMap<>();
List<Map<String,AttributeValue>> items = new ArrayList<>();
responses.put("StringHashStringRangeTable", items);
result.withResponses(responses);
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
return outcome;
}
};
getDynamoDB = new GetDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
}
@Test
public void testStringHashStringRangeGetUnprocessed() {
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED));
}
}
@Test
public void testStringHashStringRangeGetJsonObjectNull() {
outcome = new BatchGetItemOutcome(result);
KeysAndAttributes kaa = new KeysAndAttributes();
Map<String,AttributeValue> map = new HashMap<>();
map.put("hashS", new AttributeValue("h1"));
map.put("rangeS", new AttributeValue("r1"));
kaa.withKeys(map);
unprocessed = new HashMap<>();
result.withUnprocessedKeys(unprocessed);
Map<String,List<Map<String,AttributeValue>>> responses = new HashMap<>();
List<Map<String,AttributeValue>> items = new ArrayList<>();
Map<String,AttributeValue> item = new HashMap<String,AttributeValue>();
item.put("j1",null);
item.put("hashS", new AttributeValue("h1"));
item.put("rangeS", new AttributeValue("r1"));
items.add(item);
responses.put("StringHashStringRangeTable", items);
result.withResponses(responses);
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
return outcome;
}
};
getDynamoDB = new GetDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
assertNull(flowFile.getContentClaim());
}
}
@Test
public void testStringHashStringRangeGetJsonObjectValid() throws IOException, JSONException {
outcome = new BatchGetItemOutcome(result);
KeysAndAttributes kaa = new KeysAndAttributes();
Map<String,AttributeValue> map = new HashMap<>();
map.put("hashS", new AttributeValue("h1"));
map.put("rangeS", new AttributeValue("r1"));
kaa.withKeys(map);
unprocessed = new HashMap<>();
result.withUnprocessedKeys(unprocessed);
Map<String,List<Map<String,AttributeValue>>> responses = new HashMap<>();
List<Map<String,AttributeValue>> items = new ArrayList<>();
Map<String,AttributeValue> item = new HashMap<String,AttributeValue>();
String jsonDocument = new JSONObject().put("name", "john").toString();
item.put("j1",new AttributeValue(jsonDocument));
item.put("hashS", new AttributeValue("h1"));
item.put("rangeS", new AttributeValue("r1"));
items.add(item);
responses.put("StringHashStringRangeTable", items);
result.withResponses(responses);
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
return outcome;
}
};
getDynamoDB = new GetDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
}
@Test
public void testStringHashStringRangeGetThrowsServiceException() {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
throw new AmazonServiceException("serviceException");
}
};
final GetDynamoDB getDynamoDB = new GetDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("serviceException (Service: null; Status Code: 0; Error Code: null; Request ID: null)", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangeGetThrowsRuntimeException() {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
throw new RuntimeException("runtimeException");
}
};
final GetDynamoDB getDynamoDB = new GetDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("runtimeException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangeGetThrowsClientException() {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
throw new AmazonClientException("clientException");
}
};
final GetDynamoDB getDynamoDB = new GetDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("clientException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangeGetNotFound() {
result.clearResponsesEntries();
result.clearUnprocessedKeysEntries();
final BatchGetItemOutcome notFoundOutcome = new BatchGetItemOutcome(result);
Map<String,List<Map<String,AttributeValue>>> responses = new HashMap<>();
List<Map<String,AttributeValue>> items = new ArrayList<>();
responses.put(stringHashStringRangeTableName, items);
result.withResponses(responses);
final DynamoDB notFoundMockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) {
return notFoundOutcome;
}
};
final GetDynamoDB getDynamoDB = new GetDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return notFoundMockDynamoDB;
}
};
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND));
}
}
@Test
public void testStringHashStringRangeGetOnlyHashFailure() {
final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
ITAbstractDynamoDBTest.validateServiceExceptionAttribute(flowFile);
}
}
@Test
public void testStringHashStringRangeGetNoHashValueFailure() {
final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR));
}
}
@Test
public void testStringHashStringRangeGetOnlyHashWithRangeValueNoRangeNameFailure() {
final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
}
}
@Test
public void testStringHashStringRangeGetOnlyHashWithRangeNameNoRangeValueFailure() {
final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
}
}
// Incorporated test from James W
@Test
public void testStringHashStringNoRangeGetUnprocessed() {
unprocessed.clear();
KeysAndAttributes kaa = new KeysAndAttributes();
Map<String,AttributeValue> map = new HashMap<>();
map.put("hashS", new AttributeValue("h1"));
kaa.withKeys(map);
unprocessed.put(stringHashStringRangeTableName, kaa);
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED));
}
}
}

View File

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import static org.junit.Assert.assertNotNull;
import java.io.FileInputStream;
import java.util.ArrayList;
import org.apache.nifi.flowfile.FlowFile;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DeleteTableResult;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
public class ITAbstractDynamoDBTest {
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
protected static DynamoDB dynamoDB;
protected static AmazonDynamoDBClient amazonDynamoDBClient;
protected static String stringHashStringRangeTableName = "StringHashStringRangeTable";
protected static String numberHashNumberRangeTableName = "NumberHashNumberRangeTable";
protected static String numberHashOnlyTableName = "NumberHashOnlyTable";
protected final static String REGION = "us-west-2";
@BeforeClass
public static void beforeClass() throws Exception {
FileInputStream fis = new FileInputStream(CREDENTIALS_FILE);
final PropertiesCredentials credentials = new PropertiesCredentials(fis);
amazonDynamoDBClient = new AmazonDynamoDBClient(credentials);
dynamoDB = new DynamoDB(amazonDynamoDBClient);
amazonDynamoDBClient.setRegion(Region.getRegion(Regions.US_WEST_2));
ArrayList<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions
.add(new AttributeDefinition().withAttributeName("hashS").withAttributeType("S"));
attributeDefinitions
.add(new AttributeDefinition().withAttributeName("rangeS").withAttributeType("S"));
ArrayList<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("hashS").withKeyType(KeyType.HASH));
keySchema.add(new KeySchemaElement().withAttributeName("rangeS").withKeyType(KeyType.RANGE));
CreateTableRequest request = new CreateTableRequest()
.withTableName(stringHashStringRangeTableName)
.withKeySchema(keySchema)
.withAttributeDefinitions(attributeDefinitions)
.withProvisionedThroughput(new ProvisionedThroughput()
.withReadCapacityUnits(5L)
.withWriteCapacityUnits(6L));
Table stringHashStringRangeTable = dynamoDB.createTable(request);
stringHashStringRangeTable.waitForActive();
attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions
.add(new AttributeDefinition().withAttributeName("hashN").withAttributeType("N"));
attributeDefinitions
.add(new AttributeDefinition().withAttributeName("rangeN").withAttributeType("N"));
keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("hashN").withKeyType(KeyType.HASH));
keySchema.add(new KeySchemaElement().withAttributeName("rangeN").withKeyType(KeyType.RANGE));
request = new CreateTableRequest()
.withTableName(numberHashNumberRangeTableName)
.withKeySchema(keySchema)
.withAttributeDefinitions(attributeDefinitions)
.withProvisionedThroughput(new ProvisionedThroughput()
.withReadCapacityUnits(5L)
.withWriteCapacityUnits(6L));
Table numberHashNumberRangeTable = dynamoDB.createTable(request);
numberHashNumberRangeTable.waitForActive();
attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions
.add(new AttributeDefinition().withAttributeName("hashN").withAttributeType("N"));
keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("hashN").withKeyType(KeyType.HASH));
request = new CreateTableRequest()
.withTableName(numberHashOnlyTableName)
.withKeySchema(keySchema)
.withAttributeDefinitions(attributeDefinitions)
.withProvisionedThroughput(new ProvisionedThroughput()
.withReadCapacityUnits(5L)
.withWriteCapacityUnits(6L));
Table numberHashOnlyTable = dynamoDB.createTable(request);
numberHashOnlyTable.waitForActive();
}
protected static void validateServiceExceptionAttribute(FlowFile flowFile) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE));
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE));
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE));
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE));
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE));
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID));
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE));
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE));
}
@AfterClass
public static void afterClass() {
DeleteTableResult result = amazonDynamoDBClient.deleteTable(stringHashStringRangeTableName);
result = amazonDynamoDBClient.deleteTable(numberHashNumberRangeTableName);
result = amazonDynamoDBClient.deleteTable(numberHashOnlyTableName);
}
}

View File

@ -0,0 +1,428 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.List;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Ignore;
import org.junit.Test;
@Ignore
public class ITPutGetDeleteGetDynamoDBTest extends ITAbstractDynamoDBTest {
@Test
public void testStringHashStringRangePutGetDeleteGetSuccess() {
final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"name\":\"john\"}";
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class);
getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class);
deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, CREDENTIALS_FILE);
deleteRunner.setProperty(DeleteDynamoDB.REGION, REGION);
deleteRunner.setProperty(DeleteDynamoDB.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(DeleteDynamoDB.RANGE_KEY_NAME, "rangeS");
deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_VALUE, "h1");
deleteRunner.setProperty(DeleteDynamoDB.RANGE_KEY_VALUE, "r1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_SUCCESS, 1);
flowFiles = deleteRunner.getFlowFilesForRelationship(DeleteDynamoDB.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
assertEquals("", new String(flowFile.toByteArray()));
}
// Final check after delete
final TestRunner getRunnerAfterDelete = TestRunners.newTestRunner(GetDynamoDB.class);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document");
getRunnerAfterDelete.enqueue(new byte[] {});
getRunnerAfterDelete.run(1);
getRunnerAfterDelete.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 1);
flowFiles = getRunnerAfterDelete.getFlowFilesForRelationship(GetDynamoDB.REL_NOT_FOUND);
for (MockFlowFile flowFile : flowFiles) {
String error = flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND);
assertTrue(error.startsWith(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE));
}
}
@Test
public void testStringHashStringRangePutDeleteWithHashOnlyFailure() {
final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"name\":\"john\"}";
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class);
getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class);
deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, CREDENTIALS_FILE);
deleteRunner.setProperty(DeleteDynamoDB.REGION, REGION);
deleteRunner.setProperty(DeleteDynamoDB.TABLE, stringHashStringRangeTableName);
deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, "hashS");
deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_VALUE, "h1");
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_FAILURE, 1);
flowFiles = deleteRunner.getFlowFilesForRelationship(DeleteDynamoDB.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
validateServiceExceptionAttribute(flowFile);
assertEquals("", new String(flowFile.toByteArray()));
}
}
@Test
public void testStringHashStringRangePutGetWithHashOnlyKeyFailure() {
final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"name\":\"john\"}";
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class);
getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
validateServiceExceptionAttribute(flowFile);
assertEquals("", new String(flowFile.toByteArray()));
}
}
@Test
public void testNumberHashOnlyPutGetDeleteGetSuccess() {
final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, numberHashOnlyTableName);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, "hashN");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, "40");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"age\":40}";
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class);
getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, numberHashOnlyTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashN");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "40");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class);
deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, CREDENTIALS_FILE);
deleteRunner.setProperty(DeleteDynamoDB.REGION, REGION);
deleteRunner.setProperty(DeleteDynamoDB.TABLE, numberHashOnlyTableName);
deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, "hashN");
deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_VALUE, "40");
deleteRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_SUCCESS, 1);
flowFiles = deleteRunner.getFlowFilesForRelationship(DeleteDynamoDB.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
assertEquals("", new String(flowFile.toByteArray()));
}
// Final check after delete
final TestRunner getRunnerAfterDelete = TestRunners.newTestRunner(GetDynamoDB.class);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.TABLE, numberHashOnlyTableName);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashN");
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "40");
getRunnerAfterDelete.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document");
getRunnerAfterDelete.enqueue(new byte[] {});
getRunnerAfterDelete.run(1);
getRunnerAfterDelete.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 1);
flowFiles = getRunnerAfterDelete.getFlowFilesForRelationship(GetDynamoDB.REL_NOT_FOUND);
for (MockFlowFile flowFile : flowFiles) {
String error = flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND);
assertTrue(error.startsWith(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE));
}
}
@Test
public void testNumberHashNumberRangePutGetDeleteGetSuccess() {
final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, numberHashNumberRangeTableName);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, "hashN");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_NAME, "rangeN");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, "40");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE, "50");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"40\":\"50\"}";
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
assertEquals(document, new String(flowFile.toByteArray()));
}
final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class);
getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, numberHashNumberRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashN");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeN");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "40");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "50");
getRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
getRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1);
flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
assertEquals(document, new String(flowFile.toByteArray()));
}
final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class);
deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, CREDENTIALS_FILE);
deleteRunner.setProperty(DeleteDynamoDB.REGION, REGION);
deleteRunner.setProperty(DeleteDynamoDB.TABLE, numberHashNumberRangeTableName);
deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, "hashN");
deleteRunner.setProperty(DeleteDynamoDB.RANGE_KEY_NAME, "rangeN");
deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_VALUE, "40");
deleteRunner.setProperty(DeleteDynamoDB.RANGE_KEY_VALUE, "50");
deleteRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
deleteRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
deleteRunner.enqueue(new byte[] {});
deleteRunner.run(1);
deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_SUCCESS, 1);
flowFiles = deleteRunner.getFlowFilesForRelationship(DeleteDynamoDB.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
assertEquals("", new String(flowFile.toByteArray()));
}
// Final check after delete
final TestRunner getRunnerAfterDelete = TestRunners.newTestRunner(GetDynamoDB.class);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.TABLE, numberHashNumberRangeTableName);
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashN");
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeN");
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "40");
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "50");
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document");
getRunnerAfterDelete.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
getRunnerAfterDelete.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
getRunnerAfterDelete.enqueue(new byte[] {});
getRunnerAfterDelete.run(1);
getRunnerAfterDelete.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 1);
flowFiles = getRunnerAfterDelete.getFlowFilesForRelationship(GetDynamoDB.REL_NOT_FOUND);
for (MockFlowFile flowFile : flowFiles) {
String error = flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND);
assertTrue(error.startsWith(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE));
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
public class ItemKeysTest {
@Test
public void testHashNullRangeNullEquals() {
ItemKeys ik1 = new ItemKeys(null, null);
ItemKeys ik2 = new ItemKeys(null, null);
assertEquals(ik1, ik2);
assertEquals(ik1.hashCode(), ik2.hashCode());
assertEquals(ik1.toString(), ik2.toString());
}
@Test
public void testHashNotNullRangeNullEquals() {
ItemKeys ik1 = new ItemKeys("abc", null);
ItemKeys ik2 = new ItemKeys("abc", null);
assertEquals(ik1, ik2);
assertEquals(ik1.hashCode(), ik2.hashCode());
assertEquals(ik1.toString(), ik2.toString());
}
@Test
public void testHashNullRangeNotNullEquals() {
ItemKeys ik1 = new ItemKeys(null, "ab");
ItemKeys ik2 = new ItemKeys(null, "ab");
assertEquals(ik1, ik2);
assertEquals(ik1.hashCode(), ik2.hashCode());
assertEquals(ik1.toString(), ik2.toString());
}
@Test
public void testHashNotNullRangeNotNullEquals() {
ItemKeys ik1 = new ItemKeys("abc", "pqr");
ItemKeys ik2 = new ItemKeys("abc", "pqr");
assertEquals(ik1, ik2);
assertEquals(ik1.hashCode(), ik2.hashCode());
assertEquals(ik1.toString(), ik2.toString());
}
@Test
public void testHashNotNullRangeNotNullForOtherNotEquals() {
ItemKeys ik1 = new ItemKeys(null, "ab");
ItemKeys ik2 = new ItemKeys("ab", null);
assertFalse(ik1.equals(ik2));
}
}

View File

@ -0,0 +1,460 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.dynamodb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION;
import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
import com.amazonaws.services.dynamodbv2.model.PutRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
public class PutDynamoDBTest {
protected PutDynamoDB putDynamoDB;
protected BatchWriteItemResult result = new BatchWriteItemResult();
BatchWriteItemOutcome outcome;
@Before
public void setUp() {
outcome = new BatchWriteItemOutcome(result);
result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
return outcome;
}
};
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
}
@Test
public void testStringHashStringRangePutOnlyHashFailure() {
final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"hello\": 2}";
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
ITAbstractDynamoDBTest.validateServiceExceptionAttribute(flowFile);
}
}
@Test
public void testStringHashStringRangePutNoHashValueFailure() {
final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"hello\": 2}";
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR));
}
}
@Test
public void testStringHashStringRangePutOnlyHashWithRangeValueNoRangeNameFailure() {
final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document");
putRunner.enqueue(new byte[] {});
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
}
}
@Test
public void testStringHashStringRangePutOnlyHashWithRangeNameNoRangeValueFailure() {
final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
putRunner.enqueue(new byte[] {});
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
}
}
@Test
public void testStringHashStringRangePutSuccessfulWithMock() {
final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"name\":\"john\"}";
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
}
@Test
public void testStringHashStringRangePutOneSuccessfulOneSizeFailureWithMockBatchSize1() {
final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"name\":\"john\"}";
putRunner.enqueue(document.getBytes());
byte [] item = new byte[PutDynamoDB.DYNAMODB_MAX_ITEM_SIZE + 1];
for (int i = 0; i < item.length; i++) {
item[i] = 'a';
}
String document2 = new String(item);
putRunner.enqueue(document2.getBytes());
putRunner.run(2,true,true);
List<MockFlowFile> flowFilesFailed = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFilesFailed) {
System.out.println(flowFile.getAttributes());
flowFile.assertAttributeExists(PutDynamoDB.AWS_DYNAMO_DB_ITEM_SIZE_ERROR);
assertEquals(item.length,flowFile.getSize());
}
List<MockFlowFile> flowFilesSuccessful = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesSuccessful) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
}
@Test
public void testStringHashStringRangePutOneSuccessfulOneSizeFailureWithMockBatchSize5() {
final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
putRunner.setProperty(AbstractDynamoDBProcessor.BATCH_SIZE, "5");
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"name\":\"john\"}";
putRunner.enqueue(document.getBytes());
byte [] item = new byte[PutDynamoDB.DYNAMODB_MAX_ITEM_SIZE + 1];
for (int i = 0; i < item.length; i++) {
item[i] = 'a';
}
String document2 = new String(item);
putRunner.enqueue(document2.getBytes());
putRunner.run(1);
List<MockFlowFile> flowFilesFailed = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFilesFailed) {
System.out.println(flowFile.getAttributes());
flowFile.assertAttributeExists(PutDynamoDB.AWS_DYNAMO_DB_ITEM_SIZE_ERROR);
assertEquals(item.length,flowFile.getSize());
}
List<MockFlowFile> flowFilesSuccessful = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesSuccessful) {
System.out.println(flowFile.getAttributes());
assertEquals(document, new String(flowFile.toByteArray()));
}
}
@Test
public void testStringHashStringRangePutFailedWithItemSizeGreaterThan400Kb() {
final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document");
byte [] item = new byte[PutDynamoDB.DYNAMODB_MAX_ITEM_SIZE + 1];
for (int i = 0; i < item.length; i++) {
item[i] = 'a';
}
String document = new String(item);
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_FAILURE);
assertEquals(1,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
flowFile.assertAttributeExists(PutDynamoDB.AWS_DYNAMO_DB_ITEM_SIZE_ERROR);
assertEquals(item.length,flowFile.getSize());
}
}
@Test
public void testStringHashStringRangePutThrowsServiceException() {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
throw new AmazonServiceException("serviceException");
}
};
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"name\":\"john\"}";
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("serviceException (Service: null; Status Code: 0; Error Code: null; Request ID: null)", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangePutThrowsClientException() {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
throw new AmazonClientException("clientException");
}
};
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"name\":\"john\"}";
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("clientException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangePutThrowsRuntimeException() {
final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
@Override
public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
throw new RuntimeException("runtimeException");
}
};
putDynamoDB = new PutDynamoDB() {
@Override
protected DynamoDB getDynamoDB() {
return mockDynamoDB;
}
};
final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"name\":\"john\"}";
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertEquals("runtimeException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
}
}
@Test
public void testStringHashStringRangePutSuccessfulWithMockOneUnprocessed() {
Map<String, List<WriteRequest>> unprocessed =
new HashMap<String, List<WriteRequest>>();
PutRequest put = new PutRequest();
put.addItemEntry("hashS", new AttributeValue("h1"));
put.addItemEntry("rangeS", new AttributeValue("r1"));
WriteRequest write = new WriteRequest(put);
List<WriteRequest> writes = new ArrayList<>();
writes.add(write);
unprocessed.put(stringHashStringRangeTableName, writes);
result.setUnprocessedItems(unprocessed);
final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j2");
putRunner.enqueue("{\"hello\":\"world\"}".getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);
}
}