mirror of https://github.com/apache/nifi.git
Nifi-1540 - AWS Kinesis Streams put processor
This closes #239. Signed-off-by: James Wing <jvwing@gmail.com>
This commit is contained in:
parent
6d5f4777c5
commit
af27179322
|
@ -0,0 +1,98 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.kinesis;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
|
||||||
|
|
||||||
|
import com.amazonaws.AmazonWebServiceClient;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class provides processor the base class for kinesis client
|
||||||
|
*/
|
||||||
|
public abstract class AbstractBaseKinesisProcessor<ClientType extends AmazonWebServiceClient>
|
||||||
|
extends AbstractAWSCredentialsProviderProcessor<ClientType> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kinesis put record response error message
|
||||||
|
*/
|
||||||
|
public static final String AWS_KINESIS_ERROR_MESSAGE = "aws.kinesis.error.message";
|
||||||
|
|
||||||
|
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||||
|
.displayName("Message Batch Size")
|
||||||
|
.name("message-batch-size")
|
||||||
|
.description("Batch size for messages (1-500).")
|
||||||
|
.defaultValue("250")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.createLongValidator(1, 500, true))
|
||||||
|
.sensitive(false)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder()
|
||||||
|
.name("max-message-buffer-size")
|
||||||
|
.displayName("Max message buffer size (MB)")
|
||||||
|
.description("Max message buffer size in Mega-bytes")
|
||||||
|
.defaultValue("1 MB")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||||
|
.sensitive(false)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Max buffer size 1 MB
|
||||||
|
*/
|
||||||
|
public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
|
||||||
|
|
||||||
|
protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate,
|
||||||
|
final String streamName, String message) {
|
||||||
|
flowFileCandidate = session.putAttribute(flowFileCandidate, message,
|
||||||
|
"record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE );
|
||||||
|
session.transfer(flowFileCandidate, REL_FAILURE);
|
||||||
|
getLogger().error("Failed to publish to kinesis {} records {} because the size was greater than {} bytes",
|
||||||
|
new Object[]{streamName, flowFileCandidate, MAX_MESSAGE_SIZE});
|
||||||
|
return flowFileCandidate;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<FlowFile> filterMessagesByMaxSize(final ProcessSession session, final int batchSize, final long maxBufferSizeBytes, final String streamName, String message) {
|
||||||
|
List<FlowFile> flowFiles = new ArrayList<FlowFile>(batchSize);
|
||||||
|
|
||||||
|
long currentBufferSizeBytes = 0;
|
||||||
|
|
||||||
|
for (int i = 0; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) {
|
||||||
|
|
||||||
|
FlowFile flowFileCandidate = session.get();
|
||||||
|
if ( flowFileCandidate == null )
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) {
|
||||||
|
flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, streamName, message);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
currentBufferSizeBytes += flowFileCandidate.getSize();
|
||||||
|
|
||||||
|
flowFiles.add(flowFileCandidate);
|
||||||
|
}
|
||||||
|
return flowFiles;
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,7 +19,7 @@ package org.apache.nifi.processors.aws.kinesis.firehose;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
|
import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor;
|
||||||
|
|
||||||
import com.amazonaws.ClientConfiguration;
|
import com.amazonaws.ClientConfiguration;
|
||||||
import com.amazonaws.auth.AWSCredentials;
|
import com.amazonaws.auth.AWSCredentials;
|
||||||
|
@ -29,7 +29,7 @@ import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
|
||||||
/**
|
/**
|
||||||
* This class provides processor the base class for kinesis firehose
|
* This class provides processor the base class for kinesis firehose
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonKinesisFirehoseClient> {
|
public abstract class AbstractKinesisFirehoseProcessor extends AbstractBaseKinesisProcessor<AmazonKinesisFirehoseClient> {
|
||||||
|
|
||||||
public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
|
||||||
.name("Amazon Kinesis Firehose Delivery Stream Name")
|
.name("Amazon Kinesis Firehose Delivery Stream Name")
|
||||||
|
@ -68,7 +68,7 @@ public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCreden
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create client using AWSCredentials
|
* Create client using AWSCredentails
|
||||||
*
|
*
|
||||||
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
|
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -91,25 +91,8 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
|
||||||
final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
|
final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
|
||||||
final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue();
|
final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue();
|
||||||
|
|
||||||
List<FlowFile> flowFiles = new ArrayList<FlowFile>(batchSize);
|
List<FlowFile> flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, firehoseStreamName,
|
||||||
|
AWS_KINESIS_FIREHOSE_ERROR_MESSAGE);
|
||||||
long currentBufferSizeBytes = 0;
|
|
||||||
|
|
||||||
for (int i = 0; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) {
|
|
||||||
|
|
||||||
FlowFile flowFileCandidate = session.get();
|
|
||||||
if ( flowFileCandidate == null )
|
|
||||||
break;
|
|
||||||
|
|
||||||
if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) {
|
|
||||||
flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, firehoseStreamName);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
currentBufferSizeBytes += flowFileCandidate.getSize();
|
|
||||||
|
|
||||||
flowFiles.add(flowFileCandidate);
|
|
||||||
}
|
|
||||||
|
|
||||||
final AmazonKinesisFirehoseClient client = getClient();
|
final AmazonKinesisFirehoseClient client = getClient();
|
||||||
|
|
||||||
|
@ -172,14 +155,4 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate,
|
|
||||||
final String firehoseStreamName) {
|
|
||||||
flowFileCandidate = session.putAttribute(flowFileCandidate, AWS_KINESIS_FIREHOSE_ERROR_MESSAGE,
|
|
||||||
"record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE );
|
|
||||||
session.transfer(flowFileCandidate, REL_FAILURE);
|
|
||||||
getLogger().error("Failed to publish to kinesis firehose {} records {} because the size was greater than {} bytes",
|
|
||||||
new Object[]{firehoseStreamName, flowFileCandidate, MAX_MESSAGE_SIZE});
|
|
||||||
return flowFileCandidate;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.kinesis.stream;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor;
|
||||||
|
|
||||||
|
import com.amazonaws.ClientConfiguration;
|
||||||
|
import com.amazonaws.auth.AWSCredentials;
|
||||||
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
|
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class provides processor the base class for kinesis client
|
||||||
|
*/
|
||||||
|
public abstract class AbstractKinesisStreamProcessor extends AbstractBaseKinesisProcessor<AmazonKinesisClient> {
|
||||||
|
|
||||||
|
public static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder()
|
||||||
|
.name("kinesis-stream-name")
|
||||||
|
.displayName("Amazon Kinesis Stream Name")
|
||||||
|
.description("The name of Kinesis Stream")
|
||||||
|
.expressionLanguageSupported(false)
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create client using aws credentials provider. This is the preferred way for creating clients
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
|
||||||
|
getLogger().info("Creating client using aws credentials provider");
|
||||||
|
|
||||||
|
return new AmazonKinesisClient(credentialsProvider, config);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create client using AWSCredentails
|
||||||
|
*
|
||||||
|
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
|
||||||
|
getLogger().info("Creating client using aws credentials");
|
||||||
|
|
||||||
|
return new AmazonKinesisClient(credentials, config);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,174 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.kinesis.stream;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.DataUnit;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
||||||
|
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
|
||||||
|
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
|
||||||
|
import com.amazonaws.services.kinesis.model.PutRecordsResult;
|
||||||
|
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
|
||||||
|
|
||||||
|
@SupportsBatching
|
||||||
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
|
@Tags({"amazon", "aws", "kinesis", "put", "stream"})
|
||||||
|
@CapabilityDescription("Sends the contents to a specified Amazon Kinesis. "
|
||||||
|
+ "In order to send data to Kinesis, the stream name has to be specified.")
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute = "aws.kinesis.error.message", description = "Error message on posting message to AWS Kinesis"),
|
||||||
|
@WritesAttribute(attribute = "aws.kinesis.error.code", description = "Error code for the message when posting to AWS Kinesis"),
|
||||||
|
@WritesAttribute(attribute = "aws.kinesis.sequence.number", description = "Sequence number for the message when posting to AWS Kinesis"),
|
||||||
|
@WritesAttribute(attribute = "aws.kinesis.shard.id", description = "Shard id of the message posted to AWS Kinesis")})
|
||||||
|
public class PutKinesisStream extends AbstractKinesisStreamProcessor {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kinesis put record response error code
|
||||||
|
*/
|
||||||
|
public static final String AWS_KINESIS_ERROR_CODE = "aws.kinesis.error.code";
|
||||||
|
|
||||||
|
public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id";
|
||||||
|
|
||||||
|
public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number";
|
||||||
|
|
||||||
|
public static final PropertyDescriptor KINESIS_PARTITION_KEY = new PropertyDescriptor.Builder()
|
||||||
|
.displayName("Amazon Kinesis Stream Partition Key")
|
||||||
|
.name("amazon-kinesis-stream-partition-key")
|
||||||
|
.description("The partition key attribute. If it is not set, a random value is used")
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.defaultValue("${kinesis.partition.key}")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
|
||||||
|
|
||||||
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
|
Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
|
||||||
|
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST,PROXY_HOST_PORT));
|
||||||
|
|
||||||
|
/** A random number generator for cases where partition key is not available */
|
||||||
|
protected Random randomParitionKeyGenerator = new Random();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||||
|
|
||||||
|
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
|
||||||
|
final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
|
||||||
|
final String streamName = context.getProperty(KINESIS_STREAM_NAME).getValue();
|
||||||
|
|
||||||
|
List<FlowFile> flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, streamName,
|
||||||
|
AWS_KINESIS_ERROR_MESSAGE);
|
||||||
|
|
||||||
|
final AmazonKinesisClient client = getClient();
|
||||||
|
|
||||||
|
try {
|
||||||
|
List<PutRecordsRequestEntry> records = new ArrayList<>();
|
||||||
|
|
||||||
|
List<FlowFile> failedFlowFiles = new ArrayList<>();
|
||||||
|
List<FlowFile> successfulFlowFiles = new ArrayList<>();
|
||||||
|
|
||||||
|
// Prepare batch of records
|
||||||
|
for (int i = 0; i < flowFiles.size(); i++) {
|
||||||
|
FlowFile flowFile = flowFiles.get(i);
|
||||||
|
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
session.exportTo(flowFile, baos);
|
||||||
|
PutRecordsRequestEntry record = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray()));
|
||||||
|
|
||||||
|
String partitionKey = context.getProperty(PutKinesisStream.KINESIS_PARTITION_KEY)
|
||||||
|
.evaluateAttributeExpressions(flowFiles.get(i)).getValue();
|
||||||
|
|
||||||
|
if ( ! StringUtils.isBlank(partitionKey) ) {
|
||||||
|
record.setPartitionKey(partitionKey);
|
||||||
|
} else {
|
||||||
|
record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt()));
|
||||||
|
}
|
||||||
|
|
||||||
|
records.add(record);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( records.size() > 0 ) {
|
||||||
|
|
||||||
|
PutRecordsRequest putRecordRequest = new PutRecordsRequest();
|
||||||
|
putRecordRequest.setStreamName(streamName);
|
||||||
|
putRecordRequest.setRecords(records);
|
||||||
|
PutRecordsResult results = client.putRecords(putRecordRequest);
|
||||||
|
|
||||||
|
List<PutRecordsResultEntry> responseEntries = results.getRecords();
|
||||||
|
for (int i = 0; i < responseEntries.size(); i++ ) {
|
||||||
|
PutRecordsResultEntry entry = responseEntries.get(i);
|
||||||
|
FlowFile flowFile = flowFiles.get(i);
|
||||||
|
|
||||||
|
Map<String,String> attributes = new HashMap<>();
|
||||||
|
attributes.put(AWS_KINESIS_SHARD_ID, entry.getShardId());
|
||||||
|
attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry.getSequenceNumber());
|
||||||
|
|
||||||
|
if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
|
||||||
|
attributes.put(AWS_KINESIS_ERROR_CODE, entry.getErrorCode());
|
||||||
|
attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry.getErrorMessage());
|
||||||
|
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||||
|
failedFlowFiles.add(flowFile);
|
||||||
|
} else {
|
||||||
|
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||||
|
successfulFlowFiles.add(flowFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ( failedFlowFiles.size() > 0 ) {
|
||||||
|
session.transfer(failedFlowFiles, REL_FAILURE);
|
||||||
|
getLogger().error("Failed to publish to kinesis {} records {}", new Object[]{streamName, failedFlowFiles});
|
||||||
|
}
|
||||||
|
if ( successfulFlowFiles.size() > 0 ) {
|
||||||
|
session.transfer(successfulFlowFiles, REL_SUCCESS);
|
||||||
|
getLogger().debug("Successfully published to kinesis {} records {}", new Object[]{streamName, successfulFlowFiles});
|
||||||
|
}
|
||||||
|
records.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (final Exception exception) {
|
||||||
|
getLogger().error("Failed to publish due to exception {} to kinesis {} flowfiles {} ", new Object[]{exception, streamName, flowFiles});
|
||||||
|
session.transfer(flowFiles, REL_FAILURE);
|
||||||
|
context.yield();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -25,3 +25,4 @@ org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose
|
||||||
org.apache.nifi.processors.aws.dynamodb.GetDynamoDB
|
org.apache.nifi.processors.aws.dynamodb.GetDynamoDB
|
||||||
org.apache.nifi.processors.aws.dynamodb.PutDynamoDB
|
org.apache.nifi.processors.aws.dynamodb.PutDynamoDB
|
||||||
org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB
|
org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB
|
||||||
|
org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream
|
||||||
|
|
|
@ -0,0 +1,430 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.kinesis.stream;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test contains both unit and integration test (integration tests are ignored by default).
|
||||||
|
* Running integration tests may result in failures due to provisioned capacity of Kinesis stream based on number of shards.
|
||||||
|
* The following integration tests run successfully with 10 shards. If increasing shards is not a possiblity, please reduce the size and
|
||||||
|
* number of messages in the integration tests based AWS Kinesis provisioning pages calculations.
|
||||||
|
*/
|
||||||
|
public class ITPutKinesisStream {
|
||||||
|
|
||||||
|
private TestRunner runner;
|
||||||
|
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
runner = TestRunners.newTestRunner(PutKinesisStream.class);
|
||||||
|
runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "kstream");
|
||||||
|
runner.setProperty(PutKinesisStream.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.assertValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
runner = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Comment out ignore for integration tests (requires creds files)
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIntegrationSuccess() throws Exception {
|
||||||
|
runner.setProperty(PutKinesisStream.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
runner.enqueue("test".getBytes());
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
final MockFlowFile out = ffs.iterator().next();
|
||||||
|
|
||||||
|
out.assertContentEquals("test".getBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIntegrationWithFixedPartitionSuccess() throws Exception {
|
||||||
|
runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "pfixed");
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
runner.enqueue("test".getBytes());
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
final MockFlowFile out = ffs.iterator().next();
|
||||||
|
|
||||||
|
out.assertContentEquals("test".getBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIntegrationWithDynamicPartitionSuccess() throws Exception {
|
||||||
|
runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "${parition}");
|
||||||
|
runner.assertValid();
|
||||||
|
Map<String,String> properties = new HashMap<>();
|
||||||
|
properties.put("partition", "px");
|
||||||
|
runner.enqueue("test".getBytes(), properties);
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
final MockFlowFile out = ffs.iterator().next();
|
||||||
|
|
||||||
|
out.assertContentEquals("test".getBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Comment out ignore for integration tests (requires creds files)
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIntegrationFailedBadStreamName() throws Exception {
|
||||||
|
runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "bad-kstream");
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
runner.enqueue("test".getBytes());
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_FAILURE, 1);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneSuccess() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "2");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(1,flowFiles.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneWithParameterPartitionSuccess() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "2");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
|
||||||
|
runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "${partitionKey}");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
Map<String,String> props = new HashMap<>();
|
||||||
|
props.put("partitionKey", "p1");
|
||||||
|
runner.enqueue(bytes,props);
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(1,flowFiles.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTwoMessageBatch5WithMaxBufferSize1MBRunOnceTwoMessageSent() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "5");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "2 MB");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(2,flowFiles.size());
|
||||||
|
for (MockFlowFile flowFile : flowFiles) {
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testThreeMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(2,flowFiles.size());
|
||||||
|
for (MockFlowFile flowFile : flowFiles) {
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTwoMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(2,flowFiles.size());
|
||||||
|
for (MockFlowFile flowFile : flowFiles) {
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testThreeMessageWithBatch2MaxBufferSize1MBRunTwiceThreeMessageSent() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "2");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.run(2, true, true);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 3);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(3,flowFiles.size());
|
||||||
|
for (MockFlowFile flowFile : flowFiles) {
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testThreeMessageHello2MBThereWithBatch10MaxBufferSize1MBRunOnceTwoMessageSuccessOneFailed() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE * 2)];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue("hello".getBytes());
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.enqueue("there".getBytes());
|
||||||
|
runner.run(1, true, true);
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(2,flowFiles.size());
|
||||||
|
for (MockFlowFile flowFile : flowFiles) {
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE);
|
||||||
|
assertEquals(1,flowFilesFailed.size());
|
||||||
|
for (MockFlowFile flowFileFailed : flowFilesFailed) {
|
||||||
|
assertNotNull(flowFileFailed.getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTwoMessageHello2MBWithBatch10MaxBufferSize1MBRunOnceOneSuccessOneFailed() throws Exception {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE * 2)];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue("hello".getBytes());
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.run(1, true, true);
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(1,flowFiles.size());
|
||||||
|
for (MockFlowFile flowFile : flowFiles) {
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
|
||||||
|
flowFile.assertContentEquals("hello".getBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE);
|
||||||
|
assertEquals(1,flowFilesFailed.size());
|
||||||
|
for (MockFlowFile flowFileFailed : flowFilesFailed) {
|
||||||
|
assertNotNull(flowFileFailed.getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTwoMessage2MBHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE * 2)];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.enqueue("HelloWorld".getBytes());
|
||||||
|
runner.run(1, true, true);
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(1,flowFiles.size());
|
||||||
|
for (MockFlowFile flowFile : flowFiles) {
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
|
||||||
|
flowFile.assertContentEquals("HelloWorld".getBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE);
|
||||||
|
assertEquals(1,flowFilesFailed.size());
|
||||||
|
for (MockFlowFile flowFileFailed : flowFilesFailed) {
|
||||||
|
assertNotNull(flowFileFailed.getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTwoMessageHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
|
||||||
|
runner.assertValid();
|
||||||
|
runner.enqueue("Hello".getBytes());
|
||||||
|
runner.enqueue("World".getBytes());
|
||||||
|
runner.run(1, true, true);
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(2,flowFiles.size());
|
||||||
|
for (MockFlowFile flowFile : flowFiles) {
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
|
||||||
|
}
|
||||||
|
flowFiles.get(0).assertContentEquals("Hello".getBytes());
|
||||||
|
flowFiles.get(1).assertContentEquals("World".getBytes());
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE);
|
||||||
|
assertEquals(0,flowFilesFailed.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testThreeMessageWithBatch5MaxBufferSize1MBRunOnceTwoMessageSent() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "5");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.run(1, true, true);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(2,flowFiles.size());
|
||||||
|
for (MockFlowFile flowFile : flowFiles) {
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test5MessageWithBatch10MaxBufferSize10MBRunOnce5MessageSent() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[10];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.run(1, true, true);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 5);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(5,flowFiles.size());
|
||||||
|
for (MockFlowFile flowFile : flowFiles) {
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test5MessageWithBatch2MaxBufferSize10MBRunOnce2MessageSent() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "2");
|
||||||
|
runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[10];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.enqueue(bytes.clone());
|
||||||
|
runner.run(1, true, true);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
|
||||||
|
assertEquals(2,flowFiles.size());
|
||||||
|
for (MockFlowFile flowFile : flowFiles) {
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
|
||||||
|
flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,82 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.kinesis.stream;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestPutKinesisStream {
|
||||||
|
private TestRunner runner;
|
||||||
|
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
runner = TestRunners.newTestRunner(PutKinesisStream.class);
|
||||||
|
runner.setProperty(PutKinesisStream.ACCESS_KEY, "abcd");
|
||||||
|
runner.setProperty(PutKinesisStream.SECRET_KEY, "secret key");
|
||||||
|
runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "kstream");
|
||||||
|
runner.assertValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
runner = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomValidateBatchSize1Valid() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "1");
|
||||||
|
runner.assertValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomValidateBatchSize500Valid() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "500");
|
||||||
|
runner.assertValid();
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testCustomValidateBatchSize501InValid() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "501");
|
||||||
|
runner.assertNotValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithSizeGreaterThan1MB() {
|
||||||
|
runner.setProperty(PutKinesisStream.BATCH_SIZE, "1");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE + 1)];
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
bytes[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue(bytes);
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_FAILURE, 1);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE);
|
||||||
|
|
||||||
|
assertNotNull(flowFiles.get(0).getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue