diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java new file mode 100644 index 0000000000..e5598200fa --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +import com.amazonaws.AmazonWebServiceClient; + +/** + * This class provides processor the base class for kinesis client + */ +public abstract class AbstractBaseKinesisProcessor + extends AbstractAWSCredentialsProviderProcessor { + + /** + * Kinesis put record response error message + */ + public static final String AWS_KINESIS_ERROR_MESSAGE = "aws.kinesis.error.message"; + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .displayName("Message Batch Size") + .name("message-batch-size") + .description("Batch size for messages (1-500).") + .defaultValue("250") + .required(false) + .addValidator(StandardValidators.createLongValidator(1, 500, true)) + .sensitive(false) + .build(); + + public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder() + .name("max-message-buffer-size") + .displayName("Max message buffer size (MB)") + .description("Max message buffer size in Mega-bytes") + .defaultValue("1 MB") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .sensitive(false) + .build(); + + /** + * Max buffer size 1 MB + */ + public static final int MAX_MESSAGE_SIZE = 1000 * 1024; + + protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate, + final String streamName, String message) { + flowFileCandidate = session.putAttribute(flowFileCandidate, message, + "record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE ); + session.transfer(flowFileCandidate, REL_FAILURE); + getLogger().error("Failed to publish to kinesis {} records {} because the size was greater than {} bytes", + new Object[]{streamName, flowFileCandidate, MAX_MESSAGE_SIZE}); + return flowFileCandidate; + } + + protected List filterMessagesByMaxSize(final ProcessSession session, final int batchSize, final long maxBufferSizeBytes, final String streamName, String message) { + List flowFiles = new ArrayList(batchSize); + + long currentBufferSizeBytes = 0; + + for (int i = 0; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) { + + FlowFile flowFileCandidate = session.get(); + if ( flowFileCandidate == null ) + break; + + if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) { + flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, streamName, message); + continue; + } + + currentBufferSizeBytes += flowFileCandidate.getSize(); + + flowFiles.add(flowFileCandidate); + } + return flowFiles; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java index ddc6e6c15a..ca15653c4f 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java @@ -19,7 +19,7 @@ package org.apache.nifi.processors.aws.kinesis.firehose; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; @@ -29,7 +29,7 @@ import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; /** * This class provides processor the base class for kinesis firehose */ -public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor { +public abstract class AbstractKinesisFirehoseProcessor extends AbstractBaseKinesisProcessor { public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder() .name("Amazon Kinesis Firehose Delivery Stream Name") @@ -68,7 +68,7 @@ public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCreden } /** - * Create client using AWSCredentials + * Create client using AWSCredentails * * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead */ diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java index f5c3b9f19e..8abc9650df 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java @@ -91,25 +91,8 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue(); final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue(); - List flowFiles = new ArrayList(batchSize); - - long currentBufferSizeBytes = 0; - - for (int i = 0; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) { - - FlowFile flowFileCandidate = session.get(); - if ( flowFileCandidate == null ) - break; - - if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) { - flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, firehoseStreamName); - continue; - } - - currentBufferSizeBytes += flowFileCandidate.getSize(); - - flowFiles.add(flowFileCandidate); - } + List flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, firehoseStreamName, + AWS_KINESIS_FIREHOSE_ERROR_MESSAGE); final AmazonKinesisFirehoseClient client = getClient(); @@ -172,14 +155,4 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { } } - protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate, - final String firehoseStreamName) { - flowFileCandidate = session.putAttribute(flowFileCandidate, AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, - "record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE ); - session.transfer(flowFileCandidate, REL_FAILURE); - getLogger().error("Failed to publish to kinesis firehose {} records {} because the size was greater than {} bytes", - new Object[]{firehoseStreamName, flowFileCandidate, MAX_MESSAGE_SIZE}); - return flowFileCandidate; - } - } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java new file mode 100644 index 0000000000..b7513af328 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.stream; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.AmazonKinesisClient; + +/** + * This class provides processor the base class for kinesis client + */ +public abstract class AbstractKinesisStreamProcessor extends AbstractBaseKinesisProcessor { + + public static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder() + .name("kinesis-stream-name") + .displayName("Amazon Kinesis Stream Name") + .description("The name of Kinesis Stream") + .expressionLanguageSupported(false) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + /** + * Create client using aws credentials provider. This is the preferred way for creating clients + */ + @Override + protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials provider"); + + return new AmazonKinesisClient(credentialsProvider, config); + } + + /** + * Create client using AWSCredentails + * + * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead + */ + @Override + protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials"); + + return new AmazonKinesisClient(credentials, config); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java new file mode 100644 index 0000000000..cafc82cfa2 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.stream; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; + +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.PutRecordsRequest; +import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; +import com.amazonaws.services.kinesis.model.PutRecordsResult; +import com.amazonaws.services.kinesis.model.PutRecordsResultEntry; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis. " + + "In order to send data to Kinesis, the stream name has to be specified.") +@WritesAttributes({ + @WritesAttribute(attribute = "aws.kinesis.error.message", description = "Error message on posting message to AWS Kinesis"), + @WritesAttribute(attribute = "aws.kinesis.error.code", description = "Error code for the message when posting to AWS Kinesis"), + @WritesAttribute(attribute = "aws.kinesis.sequence.number", description = "Sequence number for the message when posting to AWS Kinesis"), + @WritesAttribute(attribute = "aws.kinesis.shard.id", description = "Shard id of the message posted to AWS Kinesis")}) +public class PutKinesisStream extends AbstractKinesisStreamProcessor { + + /** + * Kinesis put record response error code + */ + public static final String AWS_KINESIS_ERROR_CODE = "aws.kinesis.error.code"; + + public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id"; + + public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number"; + + public static final PropertyDescriptor KINESIS_PARTITION_KEY = new PropertyDescriptor.Builder() + .displayName("Amazon Kinesis Stream Partition Key") + .name("amazon-kinesis-stream-partition-key") + .description("The partition key attribute. If it is not set, a random value is used") + .expressionLanguageSupported(true) + .defaultValue("${kinesis.partition.key}") + .required(false) + .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build(); + + public static final List properties = Collections.unmodifiableList( + Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, + AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST,PROXY_HOST_PORT)); + + /** A random number generator for cases where partition key is not available */ + protected Random randomParitionKeyGenerator = new Random(); + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue(); + final String streamName = context.getProperty(KINESIS_STREAM_NAME).getValue(); + + List flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, streamName, + AWS_KINESIS_ERROR_MESSAGE); + + final AmazonKinesisClient client = getClient(); + + try { + List records = new ArrayList<>(); + + List failedFlowFiles = new ArrayList<>(); + List successfulFlowFiles = new ArrayList<>(); + + // Prepare batch of records + for (int i = 0; i < flowFiles.size(); i++) { + FlowFile flowFile = flowFiles.get(i); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + session.exportTo(flowFile, baos); + PutRecordsRequestEntry record = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray())); + + String partitionKey = context.getProperty(PutKinesisStream.KINESIS_PARTITION_KEY) + .evaluateAttributeExpressions(flowFiles.get(i)).getValue(); + + if ( ! StringUtils.isBlank(partitionKey) ) { + record.setPartitionKey(partitionKey); + } else { + record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt())); + } + + records.add(record); + } + + if ( records.size() > 0 ) { + + PutRecordsRequest putRecordRequest = new PutRecordsRequest(); + putRecordRequest.setStreamName(streamName); + putRecordRequest.setRecords(records); + PutRecordsResult results = client.putRecords(putRecordRequest); + + List responseEntries = results.getRecords(); + for (int i = 0; i < responseEntries.size(); i++ ) { + PutRecordsResultEntry entry = responseEntries.get(i); + FlowFile flowFile = flowFiles.get(i); + + Map attributes = new HashMap<>(); + attributes.put(AWS_KINESIS_SHARD_ID, entry.getShardId()); + attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry.getSequenceNumber()); + + if ( ! StringUtils.isBlank(entry.getErrorCode()) ) { + attributes.put(AWS_KINESIS_ERROR_CODE, entry.getErrorCode()); + attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry.getErrorMessage()); + flowFile = session.putAllAttributes(flowFile, attributes); + failedFlowFiles.add(flowFile); + } else { + flowFile = session.putAllAttributes(flowFile, attributes); + successfulFlowFiles.add(flowFile); + } + } + if ( failedFlowFiles.size() > 0 ) { + session.transfer(failedFlowFiles, REL_FAILURE); + getLogger().error("Failed to publish to kinesis {} records {}", new Object[]{streamName, failedFlowFiles}); + } + if ( successfulFlowFiles.size() > 0 ) { + session.transfer(successfulFlowFiles, REL_SUCCESS); + getLogger().debug("Successfully published to kinesis {} records {}", new Object[]{streamName, successfulFlowFiles}); + } + records.clear(); + } + + } catch (final Exception exception) { + getLogger().error("Failed to publish due to exception {} to kinesis {} flowfiles {} ", new Object[]{exception, streamName, flowFiles}); + session.transfer(flowFiles, REL_FAILURE); + context.yield(); + } + } + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index d8c18fc60d..df265c3cf5 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -25,3 +25,4 @@ org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose org.apache.nifi.processors.aws.dynamodb.GetDynamoDB org.apache.nifi.processors.aws.dynamodb.PutDynamoDB org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB +org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java new file mode 100644 index 0000000000..b195423483 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * This test contains both unit and integration test (integration tests are ignored by default). + * Running integration tests may result in failures due to provisioned capacity of Kinesis stream based on number of shards. + * The following integration tests run successfully with 10 shards. If increasing shards is not a possiblity, please reduce the size and + * number of messages in the integration tests based AWS Kinesis provisioning pages calculations. + */ +public class ITPutKinesisStream { + + private TestRunner runner; + protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + @Before + public void setUp() throws Exception { + runner = TestRunners.newTestRunner(PutKinesisStream.class); + runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "kstream"); + runner.setProperty(PutKinesisStream.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.assertValid(); + } + + @After + public void tearDown() throws Exception { + runner = null; + } + + /** + * Comment out ignore for integration tests (requires creds files) + */ + @Test + public void testIntegrationSuccess() throws Exception { + runner.setProperty(PutKinesisStream.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.assertValid(); + + runner.enqueue("test".getBytes()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1); + + final List ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + final MockFlowFile out = ffs.iterator().next(); + + out.assertContentEquals("test".getBytes()); + } + + @Test + public void testIntegrationWithFixedPartitionSuccess() throws Exception { + runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "pfixed"); + runner.assertValid(); + + runner.enqueue("test".getBytes()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1); + + final List ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + final MockFlowFile out = ffs.iterator().next(); + + out.assertContentEquals("test".getBytes()); + } + + @Test + public void testIntegrationWithDynamicPartitionSuccess() throws Exception { + runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "${parition}"); + runner.assertValid(); + Map properties = new HashMap<>(); + properties.put("partition", "px"); + runner.enqueue("test".getBytes(), properties); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1); + + final List ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + final MockFlowFile out = ffs.iterator().next(); + + out.assertContentEquals("test".getBytes()); + } + + /** + * Comment out ignore for integration tests (requires creds files) + */ + @Test + public void testIntegrationFailedBadStreamName() throws Exception { + runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "bad-kstream"); + runner.assertValid(); + + runner.enqueue("test".getBytes()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_FAILURE, 1); + + } + + @Test + public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneSuccess() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "2"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(1,flowFiles.size()); + } + + @Test + public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneWithParameterPartitionSuccess() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "2"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "${partitionKey}"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + Map props = new HashMap<>(); + props.put("partitionKey", "p1"); + runner.enqueue(bytes,props); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(1,flowFiles.size()); + } + + @Test + public void testTwoMessageBatch5WithMaxBufferSize1MBRunOnceTwoMessageSent() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "5"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "2 MB"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); + } + } + + @Test + public void testThreeMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes.clone()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); + } + } + + @Test + public void testTwoMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); + } + } + + @Test + public void testThreeMessageWithBatch2MaxBufferSize1MBRunTwiceThreeMessageSent() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "2"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes.clone()); + runner.run(2, true, true); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 3); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(3,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); + } + } + + @Test + public void testThreeMessageHello2MBThereWithBatch10MaxBufferSize1MBRunOnceTwoMessageSuccessOneFailed() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE * 2)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue("hello".getBytes()); + runner.enqueue(bytes); + runner.enqueue("there".getBytes()); + runner.run(1, true, true); + + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); + } + + List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE); + assertEquals(1,flowFilesFailed.size()); + for (MockFlowFile flowFileFailed : flowFilesFailed) { + assertNotNull(flowFileFailed.getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE)); + } + } + + @Test + public void testTwoMessageHello2MBWithBatch10MaxBufferSize1MBRunOnceOneSuccessOneFailed() throws Exception { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE * 2)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue("hello".getBytes()); + runner.enqueue(bytes); + runner.run(1, true, true); + + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(1,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); + flowFile.assertContentEquals("hello".getBytes()); + } + + List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE); + assertEquals(1,flowFilesFailed.size()); + for (MockFlowFile flowFileFailed : flowFilesFailed) { + assertNotNull(flowFileFailed.getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE)); + } + } + + @Test + public void testTwoMessage2MBHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE * 2)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue("HelloWorld".getBytes()); + runner.run(1, true, true); + + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(1,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); + flowFile.assertContentEquals("HelloWorld".getBytes()); + } + + List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE); + assertEquals(1,flowFilesFailed.size()); + for (MockFlowFile flowFileFailed : flowFilesFailed) { + assertNotNull(flowFileFailed.getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE)); + } + } + + @Test + public void testTwoMessageHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.assertValid(); + runner.enqueue("Hello".getBytes()); + runner.enqueue("World".getBytes()); + runner.run(1, true, true); + + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); + } + flowFiles.get(0).assertContentEquals("Hello".getBytes()); + flowFiles.get(1).assertContentEquals("World".getBytes()); + + List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE); + assertEquals(0,flowFilesFailed.size()); + } + + @Test + public void testThreeMessageWithBatch5MaxBufferSize1MBRunOnceTwoMessageSent() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "5"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes.clone()); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); + } + } + + @Test + public void test5MessageWithBatch10MaxBufferSize10MBRunOnce5MessageSent() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.assertValid(); + byte [] bytes = new byte[10]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 5); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(5,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); + } + } + + @Test + public void test5MessageWithBatch2MaxBufferSize10MBRunOnce2MessageSent() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "2"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.assertValid(); + byte [] bytes = new byte[10]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); + } + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java new file mode 100644 index 0000000000..29ca4f0ee8 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.stream; + +import static org.junit.Assert.assertNotNull; + +import java.util.List; + +import org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestPutKinesisStream { + private TestRunner runner; + protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + @Before + public void setUp() throws Exception { + runner = TestRunners.newTestRunner(PutKinesisStream.class); + runner.setProperty(PutKinesisStream.ACCESS_KEY, "abcd"); + runner.setProperty(PutKinesisStream.SECRET_KEY, "secret key"); + runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + } + + @After + public void tearDown() throws Exception { + runner = null; + } + + @Test + public void testCustomValidateBatchSize1Valid() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "1"); + runner.assertValid(); + } + + @Test + public void testCustomValidateBatchSize500Valid() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "500"); + runner.assertValid(); + } + @Test + public void testCustomValidateBatchSize501InValid() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "501"); + runner.assertNotValid(); + } + + @Test + public void testWithSizeGreaterThan1MB() { + runner.setProperty(PutKinesisStream.BATCH_SIZE, "1"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE + 1)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE); + + assertNotNull(flowFiles.get(0).getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE)); + } +}