NIFI-1495 Adding support for AWS Kinesis Firehose

This closes #213

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
mans2singh 2016-03-16 09:46:38 -04:00 committed by Aldrin Piri
parent 8ed64c9083
commit 19bc5ba999
5 changed files with 750 additions and 0 deletions

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.firehose;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
/**
* This class provides processor the base class for kinesis firehose
*/
public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonKinesisFirehoseClient> {
public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
.name("Amazon Kinesis Firehose Delivery Stream Name")
.description("The name of kinesis firehose delivery stream")
.expressionLanguageSupported(false)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("Batch size for messages (1-500).")
.defaultValue("250")
.required(false)
.addValidator(StandardValidators.createLongValidator(1, 500, true))
.sensitive(false)
.build();
public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder()
.name("Max message buffer size")
.description("Max message buffer")
.defaultValue("1 MB")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.sensitive(false)
.build();
/**
* Create client using aws credentials provider. This is the preferred way for creating clients
*/
@Override
protected AmazonKinesisFirehoseClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
getLogger().info("Creating client using aws credentials provider");
return new AmazonKinesisFirehoseClient(credentialsProvider, config);
}
/**
* Create client using AWSCredentails
*
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
*/
@Override
protected AmazonKinesisFirehoseClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
getLogger().info("Creating client using aws credentials");
return new AmazonKinesisFirehoseClient(credentials, config);
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.firehose;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
import com.amazonaws.services.kinesisfirehose.model.Record;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
+ "In order to send data to firehose, the firehose delivery stream name has to be specified.")
@WritesAttributes({
@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
/**
* Kinesis put record response error message
*/
public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
/**
* Kinesis put record response error code
*/
public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
/**
* Kinesis put record response record id
*/
public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
PROXY_HOST,PROXY_HOST_PORT));
/**
* Max buffer size 1 MB
*/
public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue();
List<FlowFile> flowFiles = new ArrayList<FlowFile>(batchSize);
long currentBufferSizeBytes = 0;
for (int i = 0; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) {
FlowFile flowFileCandidate = session.get();
if ( flowFileCandidate == null )
break;
if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) {
flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, firehoseStreamName);
continue;
}
currentBufferSizeBytes += flowFileCandidate.getSize();
flowFiles.add(flowFileCandidate);
}
final AmazonKinesisFirehoseClient client = getClient();
try {
List<Record> records = new ArrayList<>();
List<FlowFile> failedFlowFiles = new ArrayList<>();
List<FlowFile> successfulFlowFiles = new ArrayList<>();
// Prepare batch of records
for (int i = 0; i < flowFiles.size(); i++) {
FlowFile flowFile = flowFiles.get(i);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
records.add(new Record().withData(ByteBuffer.wrap(baos.toByteArray())));
}
if ( records.size() > 0 ) {
// Send the batch
PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
putRecordBatchRequest.setDeliveryStreamName(firehoseStreamName);
putRecordBatchRequest.setRecords(records);
PutRecordBatchResult results = client.putRecordBatch(putRecordBatchRequest);
// Separate out the successful and failed flow files
List<PutRecordBatchResponseEntry> responseEntries = results.getRequestResponses();
for (int i = 0; i < responseEntries.size(); i++ ) {
PutRecordBatchResponseEntry entry = responseEntries.get(i);
FlowFile flowFile = flowFiles.get(i);
Map<String,String> attributes = new HashMap<>();
attributes.put(AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId());
flowFile = session.putAttribute(flowFile, AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId());
if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
attributes.put(AWS_KINESIS_FIREHOSE_ERROR_CODE, entry.getErrorCode());
attributes.put(AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, entry.getErrorMessage());
flowFile = session.putAllAttributes(flowFile, attributes);
failedFlowFiles.add(flowFile);
} else {
flowFile = session.putAllAttributes(flowFile, attributes);
successfulFlowFiles.add(flowFile);
}
}
if ( failedFlowFiles.size() > 0 ) {
session.transfer(failedFlowFiles, REL_FAILURE);
getLogger().error("Failed to publish to kinesis firehose {} records {}", new Object[]{firehoseStreamName, failedFlowFiles});
}
if ( successfulFlowFiles.size() > 0 ) {
session.transfer(successfulFlowFiles, REL_SUCCESS);
getLogger().info("Successfully published to kinesis firehose {} records {}", new Object[]{firehoseStreamName, successfulFlowFiles});
}
records.clear();
}
} catch (final Exception exception) {
getLogger().error("Failed to publish to kinesis firehose {} with exception {}", new Object[]{flowFiles, exception});
session.transfer(flowFiles, REL_FAILURE);
context.yield();
}
}
protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate,
final String firehoseStreamName) {
flowFileCandidate = session.putAttribute(flowFileCandidate, AWS_KINESIS_FIREHOSE_ERROR_MESSAGE,
"record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE );
session.transfer(flowFileCandidate, REL_FAILURE);
getLogger().error("Failed to publish to kinesis firehose {} records {} because the size was greater than {} bytes",
new Object[]{firehoseStreamName, flowFileCandidate, MAX_MESSAGE_SIZE});
return flowFileCandidate;
}
}

View File

@ -20,3 +20,4 @@ org.apache.nifi.processors.aws.sqs.GetSQS
org.apache.nifi.processors.aws.sqs.PutSQS org.apache.nifi.processors.aws.sqs.PutSQS
org.apache.nifi.processors.aws.sqs.DeleteSQS org.apache.nifi.processors.aws.sqs.DeleteSQS
org.apache.nifi.processors.aws.lambda.PutLambda org.apache.nifi.processors.aws.lambda.PutLambda
org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose

View File

@ -0,0 +1,402 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.firehose;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.List;
import org.apache.nifi.processors.aws.s3.FetchS3Object;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* This test contains both unit and integration test (integration tests are ignored by default)
*/
public class ITPutKinesisFirehose {
private TestRunner runner;
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Before
public void setUp() throws Exception {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd");
runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "deliveryName");
runner.assertValid();
}
@After
public void tearDown() throws Exception {
runner = null;
}
/**
* Comment out ignore for integration tests (requires creds files)
*/
@Test
public void testIntegrationSuccess() throws Exception {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
runner.enqueue("test".getBytes());
runner.run(1);
runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
final MockFlowFile out = ffs.iterator().next();
out.assertContentEquals("test".getBytes());
}
/**
* Comment out ignore for integration tests (requires creds files)
*/
@Test
public void testIntegrationFailedBadStreamName() throws Exception {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "bad-firehose-s3-test");
runner.assertValid();
runner.enqueue("test".getBytes());
runner.run(1);
runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_FAILURE, 1);
}
@Test
public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneSuccess() {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "2");
runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE)];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
assertEquals(1,flowFiles.size());
}
@Test
public void testTwoMessageBatch5WithMaxBufferSize1MBRunOnceTwoMessageSent() {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "5");
runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "2 MB");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE)];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.enqueue(bytes.clone());
runner.run(1);
runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
assertEquals(2,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
}
}
@Test
public void testThreeMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE)];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.enqueue(bytes.clone());
runner.enqueue(bytes.clone());
runner.run(1);
runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
assertEquals(2,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
}
}
@Test
public void testTwoMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE)];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.enqueue(bytes.clone());
runner.run(1);
runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
assertEquals(2,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
}
}
@Test
public void testThreeMessageWithBatch2MaxBufferSize1MBRunTwiceThreeMessageSent() {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "2");
runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE)];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.enqueue(bytes.clone());
runner.enqueue(bytes.clone());
runner.run(2, true, true);
runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
assertEquals(3,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
}
}
@Test
public void testThreeMessageHello2MBThereWithBatch10MaxBufferSize1MBRunOnceTwoMessageSuccessOneFailed() {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE * 2)];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue("hello".getBytes());
runner.enqueue(bytes);
runner.enqueue("there".getBytes());
runner.run(1, true, true);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
assertEquals(2,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
}
List<MockFlowFile> flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_FAILURE);
assertEquals(1,flowFilesFailed.size());
for (MockFlowFile flowFileFailed : flowFilesFailed) {
assertNotNull(flowFileFailed.getAttribute(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_ERROR_MESSAGE));
}
}
@Test
public void testTwoMessageHello2MBWithBatch10MaxBufferSize1MBRunOnceOneSuccessOneFailed() throws Exception {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE * 2)];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue("hello".getBytes());
runner.enqueue(bytes);
runner.run(1, true, true);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
assertEquals(1,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
flowFile.assertContentEquals("hello".getBytes());
}
List<MockFlowFile> flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_FAILURE);
assertEquals(1,flowFilesFailed.size());
for (MockFlowFile flowFileFailed : flowFilesFailed) {
assertNotNull(flowFileFailed.getAttribute(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_ERROR_MESSAGE));
}
}
@Test
public void testTwoMessage2MBHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE * 2)];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.enqueue("HelloWorld".getBytes());
runner.run(1, true, true);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
assertEquals(1,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
flowFile.assertContentEquals("HelloWorld".getBytes());
}
List<MockFlowFile> flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_FAILURE);
assertEquals(1,flowFilesFailed.size());
for (MockFlowFile flowFileFailed : flowFilesFailed) {
assertNotNull(flowFileFailed.getAttribute(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_ERROR_MESSAGE));
}
}
@Test
public void testTwoMessageHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
runner.enqueue("Hello".getBytes());
runner.enqueue("World".getBytes());
runner.run(1, true, true);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
assertEquals(2,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
}
flowFiles.get(0).assertContentEquals("Hello".getBytes());
flowFiles.get(1).assertContentEquals("World".getBytes());
List<MockFlowFile> flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_FAILURE);
assertEquals(0,flowFilesFailed.size());
}
@Test
public void testThreeMessageWithBatch5MaxBufferSize1MBRunOnceTwoMessageSent() {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "5");
runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE)];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.enqueue(bytes.clone());
runner.enqueue(bytes.clone());
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
assertEquals(2,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
}
}
@Test
public void test5MessageWithBatch10MaxBufferSize10MBRunOnce5MessageSent() {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
byte [] bytes = new byte[10];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.enqueue(bytes.clone());
runner.enqueue(bytes.clone());
runner.enqueue(bytes);
runner.enqueue(bytes.clone());
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 5);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
assertEquals(5,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
}
}
@Test
public void test5MessageWithBatch2MaxBufferSize10MBRunOnce2MessageSent() {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "2");
runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "testkinesis");
runner.assertValid();
byte [] bytes = new byte[10];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.enqueue(bytes.clone());
runner.enqueue(bytes.clone());
runner.enqueue(bytes);
runner.enqueue(bytes.clone());
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
assertEquals(2,flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.firehose;
import static org.junit.Assert.assertNotNull;
import java.util.List;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestPutKinesisFirehose {
private TestRunner runner;
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Before
public void setUp() throws Exception {
runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd");
runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key");
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "deliveryName");
runner.assertValid();
}
@After
public void tearDown() throws Exception {
runner = null;
}
@Test
public void testCustomValidateBatchSize1Valid() {
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1");
runner.assertValid();
}
@Test
public void testCustomValidateBatchSize500Valid() {
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "500");
runner.assertValid();
}
@Test
public void testCustomValidateBatchSize501InValid() {
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "501");
runner.assertNotValid();
}
@Test
public void testWithSizeGreaterThan1MB() {
runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1");
runner.assertValid();
byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE + 1)];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_FAILURE);
assertNotNull(flowFiles.get(0).getAttribute(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_ERROR_MESSAGE));
}
}