NIFI-4786 Allow Expression Evaluation to Kinesis/Firehose Stream Name

Signed-off-by: James Wing <jvwing@gmail.com>

This closes #2409.
This commit is contained in:
dorian.bugeja 2018-01-17 11:22:39 +01:00 committed by James Wing
parent c4e2ac7cda
commit 8bdc2910e1
6 changed files with 127 additions and 82 deletions

View File

@ -64,16 +64,16 @@ public abstract class AbstractBaseKinesisProcessor<ClientType extends AmazonWebS
public static final int MAX_MESSAGE_SIZE = 1000 * 1024; public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate, protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate,
final String streamName, String message) { String message) {
flowFileCandidate = session.putAttribute(flowFileCandidate, message, flowFileCandidate = session.putAttribute(flowFileCandidate, message,
"record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE ); "record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE );
session.transfer(flowFileCandidate, REL_FAILURE); session.transfer(flowFileCandidate, REL_FAILURE);
getLogger().error("Failed to publish to kinesis {} records {} because the size was greater than {} bytes", getLogger().error("Failed to publish to kinesis records {} because the size was greater than {} bytes",
new Object[]{streamName, flowFileCandidate, MAX_MESSAGE_SIZE}); new Object[]{flowFileCandidate, MAX_MESSAGE_SIZE});
return flowFileCandidate; return flowFileCandidate;
} }
protected List<FlowFile> filterMessagesByMaxSize(final ProcessSession session, final int batchSize, final long maxBufferSizeBytes, final String streamName, String message) { protected List<FlowFile> filterMessagesByMaxSize(final ProcessSession session, final int batchSize, final long maxBufferSizeBytes, String message) {
List<FlowFile> flowFiles = new ArrayList<FlowFile>(batchSize); List<FlowFile> flowFiles = new ArrayList<FlowFile>(batchSize);
long currentBufferSizeBytes = 0; long currentBufferSizeBytes = 0;
@ -85,7 +85,7 @@ public abstract class AbstractBaseKinesisProcessor<ClientType extends AmazonWebS
break; break;
if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) { if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) {
flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, streamName, message); flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, message);
continue; continue;
} }

View File

@ -34,7 +34,7 @@ public abstract class AbstractKinesisFirehoseProcessor extends AbstractBaseKines
public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
.name("Amazon Kinesis Firehose Delivery Stream Name") .name("Amazon Kinesis Firehose Delivery Stream Name")
.description("The name of kinesis firehose delivery stream") .description("The name of kinesis firehose delivery stream")
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();

View File

@ -35,7 +35,7 @@ public abstract class AbstractKinesisStreamProcessor extends AbstractBaseKinesis
.name("kinesis-stream-name") .name("kinesis-stream-name")
.displayName("Amazon Kinesis Stream Name") .displayName("Amazon Kinesis Stream Name")
.description("The name of Kinesis Stream") .description("The name of Kinesis Stream")
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();

View File

@ -64,6 +64,16 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId> <artifactId>nifi-ssl-context-service-api</artifactId>
</dependency> </dependency>
<!-- Test Dependencies for testing interaction with AWS -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.6</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>

View File

@ -89,16 +89,14 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue(); final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue();
List<FlowFile> flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, firehoseStreamName, List<FlowFile> flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, AWS_KINESIS_FIREHOSE_ERROR_MESSAGE);
AWS_KINESIS_FIREHOSE_ERROR_MESSAGE); HashMap<String, List<FlowFile>> hashFlowFiles = new HashMap<>();
HashMap<String, List<Record>> recordHash = new HashMap<String, List<Record>>();
final AmazonKinesisFirehoseClient client = getClient(); final AmazonKinesisFirehoseClient client = getClient();
try { try {
List<Record> records = new ArrayList<>();
List<FlowFile> failedFlowFiles = new ArrayList<>(); List<FlowFile> failedFlowFiles = new ArrayList<>();
List<FlowFile> successfulFlowFiles = new ArrayList<>(); List<FlowFile> successfulFlowFiles = new ArrayList<>();
@ -106,28 +104,45 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
for (int i = 0; i < flowFiles.size(); i++) { for (int i = 0; i < flowFiles.size(); i++) {
FlowFile flowFile = flowFiles.get(i); FlowFile flowFile = flowFiles.get(i);
final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos); session.exportTo(flowFile, baos);
records.add(new Record().withData(ByteBuffer.wrap(baos.toByteArray())));
if (recordHash.containsKey(firehoseStreamName) == false) {
recordHash.put(firehoseStreamName, new ArrayList<>());
} }
if (hashFlowFiles.containsKey(firehoseStreamName) == false) {
hashFlowFiles.put(firehoseStreamName, new ArrayList<>());
}
hashFlowFiles.get(firehoseStreamName).add(flowFile);
recordHash.get(firehoseStreamName).add(new Record().withData(ByteBuffer.wrap(baos.toByteArray())));
}
for (Map.Entry<String, List<Record>> entryRecord : recordHash.entrySet()) {
String streamName = entryRecord.getKey();
List<Record> records = entryRecord.getValue();
if (records.size() > 0) { if (records.size() > 0) {
// Send the batch // Send the batch
PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest(); PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
putRecordBatchRequest.setDeliveryStreamName(firehoseStreamName); putRecordBatchRequest.setDeliveryStreamName(streamName);
putRecordBatchRequest.setRecords(records); putRecordBatchRequest.setRecords(records);
PutRecordBatchResult results = client.putRecordBatch(putRecordBatchRequest); PutRecordBatchResult results = client.putRecordBatch(putRecordBatchRequest);
// Separate out the successful and failed flow files // Separate out the successful and failed flow files
List<PutRecordBatchResponseEntry> responseEntries = results.getRequestResponses(); List<PutRecordBatchResponseEntry> responseEntries = results.getRequestResponses();
for (int i = 0; i < responseEntries.size(); i++ ) { for (int i = 0; i < responseEntries.size(); i++ ) {
PutRecordBatchResponseEntry entry = responseEntries.get(i); PutRecordBatchResponseEntry entry = responseEntries.get(i);
FlowFile flowFile = flowFiles.get(i); FlowFile flowFile = hashFlowFiles.get(streamName).get(i);
Map<String,String> attributes = new HashMap<>(); Map<String,String> attributes = new HashMap<>();
attributes.put(AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId()); attributes.put(AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId());
flowFile = session.putAttribute(flowFile, AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId()); flowFile = session.putAttribute(flowFile, AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId());
if ( ! StringUtils.isBlank(entry.getErrorCode()) ) { if (StringUtils.isBlank(entry.getErrorCode()) == false) {
attributes.put(AWS_KINESIS_FIREHOSE_ERROR_CODE, entry.getErrorCode()); attributes.put(AWS_KINESIS_FIREHOSE_ERROR_CODE, entry.getErrorCode());
attributes.put(AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, entry.getErrorMessage()); attributes.put(AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, entry.getErrorMessage());
flowFile = session.putAllAttributes(flowFile, attributes); flowFile = session.putAllAttributes(flowFile, attributes);
@ -137,15 +152,18 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
successfulFlowFiles.add(flowFile); successfulFlowFiles.add(flowFile);
} }
} }
recordHash.get(streamName).clear();
records.clear();
}
}
if (failedFlowFiles.size() > 0) { if (failedFlowFiles.size() > 0) {
session.transfer(failedFlowFiles, REL_FAILURE); session.transfer(failedFlowFiles, REL_FAILURE);
getLogger().error("Failed to publish to kinesis firehose {} records {}", new Object[]{firehoseStreamName, failedFlowFiles}); getLogger().error("Failed to publish to kinesis firehose {}", new Object[]{failedFlowFiles});
} }
if (successfulFlowFiles.size() > 0) { if (successfulFlowFiles.size() > 0) {
session.transfer(successfulFlowFiles, REL_SUCCESS); session.transfer(successfulFlowFiles, REL_SUCCESS);
getLogger().info("Successfully published to kinesis firehose {} records {}", new Object[]{firehoseStreamName, successfulFlowFiles}); getLogger().info("Successfully published to kinesis firehose {}", new Object[]{successfulFlowFiles});
}
records.clear();
} }
} catch (final Exception exception) { } catch (final Exception exception) {

View File

@ -94,15 +94,16 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue(); final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
final String streamName = context.getProperty(KINESIS_STREAM_NAME).getValue();
List<FlowFile> flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, streamName, List<FlowFile> flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, AWS_KINESIS_ERROR_MESSAGE);
AWS_KINESIS_ERROR_MESSAGE);
HashMap<String, List<FlowFile>> hashFlowFiles = new HashMap<>();
HashMap<String, List<PutRecordsRequestEntry>> recordHash = new HashMap<String, List<PutRecordsRequestEntry>>();
final AmazonKinesisClient client = getClient(); final AmazonKinesisClient client = getClient();
try { try {
List<PutRecordsRequestEntry> records = new ArrayList<>();
List<FlowFile> failedFlowFiles = new ArrayList<>(); List<FlowFile> failedFlowFiles = new ArrayList<>();
List<FlowFile> successfulFlowFiles = new ArrayList<>(); List<FlowFile> successfulFlowFiles = new ArrayList<>();
@ -111,6 +112,8 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor {
for (int i = 0; i < flowFiles.size(); i++) { for (int i = 0; i < flowFiles.size(); i++) {
FlowFile flowFile = flowFiles.get(i); FlowFile flowFile = flowFiles.get(i);
String streamName = context.getProperty(KINESIS_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue();;
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos); session.exportTo(flowFile, baos);
PutRecordsRequestEntry record = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray())); PutRecordsRequestEntry record = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray()));
@ -118,14 +121,26 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor {
String partitionKey = context.getProperty(PutKinesisStream.KINESIS_PARTITION_KEY) String partitionKey = context.getProperty(PutKinesisStream.KINESIS_PARTITION_KEY)
.evaluateAttributeExpressions(flowFiles.get(i)).getValue(); .evaluateAttributeExpressions(flowFiles.get(i)).getValue();
if ( ! StringUtils.isBlank(partitionKey) ) { if (StringUtils.isBlank(partitionKey) == false) {
record.setPartitionKey(partitionKey); record.setPartitionKey(partitionKey);
} else { } else {
record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt())); record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt()));
} }
records.add(record); if (recordHash.containsKey(streamName) == false) {
recordHash.put(streamName, new ArrayList<>());
} }
if (hashFlowFiles.containsKey(streamName) == false) {
hashFlowFiles.put(streamName, new ArrayList<>());
}
hashFlowFiles.get(streamName).add(flowFile);
recordHash.get(streamName).add(record);
}
for (Map.Entry<String, List<PutRecordsRequestEntry>> entryRecord : recordHash.entrySet()) {
String streamName = entryRecord.getKey();
List<PutRecordsRequestEntry> records = entryRecord.getValue();
if (records.size() > 0) { if (records.size() > 0) {
@ -137,13 +152,13 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor {
List<PutRecordsResultEntry> responseEntries = results.getRecords(); List<PutRecordsResultEntry> responseEntries = results.getRecords();
for (int i = 0; i < responseEntries.size(); i++ ) { for (int i = 0; i < responseEntries.size(); i++ ) {
PutRecordsResultEntry entry = responseEntries.get(i); PutRecordsResultEntry entry = responseEntries.get(i);
FlowFile flowFile = flowFiles.get(i); FlowFile flowFile = hashFlowFiles.get(streamName).get(i);
Map<String,String> attributes = new HashMap<>(); Map<String,String> attributes = new HashMap<>();
attributes.put(AWS_KINESIS_SHARD_ID, entry.getShardId()); attributes.put(AWS_KINESIS_SHARD_ID, entry.getShardId());
attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry.getSequenceNumber()); attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry.getSequenceNumber());
if ( ! StringUtils.isBlank(entry.getErrorCode()) ) { if (StringUtils.isBlank(entry.getErrorCode()) == false) {
attributes.put(AWS_KINESIS_ERROR_CODE, entry.getErrorCode()); attributes.put(AWS_KINESIS_ERROR_CODE, entry.getErrorCode());
attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry.getErrorMessage()); attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry.getErrorMessage());
flowFile = session.putAllAttributes(flowFile, attributes); flowFile = session.putAllAttributes(flowFile, attributes);
@ -153,22 +168,24 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor {
successfulFlowFiles.add(flowFile); successfulFlowFiles.add(flowFile);
} }
} }
if ( failedFlowFiles.size() > 0 ) {
session.transfer(failedFlowFiles, REL_FAILURE);
getLogger().error("Failed to publish to kinesis {} records {}", new Object[]{streamName, failedFlowFiles});
}
if ( successfulFlowFiles.size() > 0 ) {
session.transfer(successfulFlowFiles, REL_SUCCESS);
getLogger().debug("Successfully published to kinesis {} records {}", new Object[]{streamName, successfulFlowFiles});
} }
recordHash.get(streamName).clear();
records.clear(); records.clear();
} }
if ( failedFlowFiles.size() > 0 ) {
session.transfer(failedFlowFiles, REL_FAILURE);
getLogger().error("Failed to publish to kinesis records {}", new Object[]{failedFlowFiles});
}
if ( successfulFlowFiles.size() > 0 ) {
session.transfer(successfulFlowFiles, REL_SUCCESS);
getLogger().debug("Successfully published to kinesis records {}", new Object[]{successfulFlowFiles});
}
} catch (final Exception exception) { } catch (final Exception exception) {
getLogger().error("Failed to publish due to exception {} to kinesis {} flowfiles {} ", new Object[]{exception, streamName, flowFiles}); getLogger().error("Failed to publish due to exception {} flowfiles {} ", new Object[]{exception, flowFiles});
session.transfer(flowFiles, REL_FAILURE); session.transfer(flowFiles, REL_FAILURE);
context.yield(); context.yield();
} }
} }
} }