From 8bdc2910e1e0da4e49dd45123327b4b1d37ca640 Mon Sep 17 00:00:00 2001 From: "dorian.bugeja" Date: Wed, 17 Jan 2018 11:22:39 +0100 Subject: [PATCH] NIFI-4786 Allow Expression Evaluation to Kinesis/Firehose Stream Name Signed-off-by: James Wing This closes #2409. --- .../kinesis/AbstractBaseKinesisProcessor.java | 12 +-- .../AbstractKinesisFirehoseProcessor.java | 2 +- .../AbstractKinesisStreamProcessor.java | 2 +- .../nifi-aws-processors/pom.xml | 10 ++ .../kinesis/firehose/PutKinesisFirehose.java | 92 +++++++++++-------- .../aws/kinesis/stream/PutKinesisStream.java | 91 ++++++++++-------- 6 files changed, 127 insertions(+), 82 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java index e5598200fa..a43359e5c2 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java @@ -64,16 +64,16 @@ public abstract class AbstractBaseKinesisProcessor filterMessagesByMaxSize(final ProcessSession session, final int batchSize, final long maxBufferSizeBytes, final String streamName, String message) { + protected List filterMessagesByMaxSize(final ProcessSession session, final int batchSize, final long maxBufferSizeBytes, String message) { List flowFiles = new ArrayList(batchSize); long currentBufferSizeBytes = 0; @@ -85,7 +85,7 @@ public abstract class AbstractBaseKinesisProcessor MAX_MESSAGE_SIZE) { - flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, streamName, message); + flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, message); continue; } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java index ca15653c4f..bf91e9bd8f 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java @@ -34,7 +34,7 @@ public abstract class AbstractKinesisFirehoseProcessor extends AbstractBaseKines public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder() .name("Amazon Kinesis Firehose Delivery Stream Name") .description("The name of kinesis firehose delivery stream") - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java index b7513af328..79c8dedded 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java @@ -35,7 +35,7 @@ public abstract class AbstractKinesisStreamProcessor extends AbstractBaseKinesis .name("kinesis-stream-name") .displayName("Amazon Kinesis Stream Name") .description("The name of Kinesis Stream") - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml index 29129fd53a..e6fee000e9 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml @@ -64,6 +64,16 @@ org.apache.nifi nifi-ssl-context-service-api + + + + com.fasterxml.jackson.core + jackson-databind + 2.6.6 + test + + + diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java index 62aa244dba..8c8856d666 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java @@ -89,16 +89,14 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue(); - final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue(); - List flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, firehoseStreamName, - AWS_KINESIS_FIREHOSE_ERROR_MESSAGE); + List flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, AWS_KINESIS_FIREHOSE_ERROR_MESSAGE); + HashMap> hashFlowFiles = new HashMap<>(); + HashMap> recordHash = new HashMap>(); final AmazonKinesisFirehoseClient client = getClient(); try { - List records = new ArrayList<>(); - List failedFlowFiles = new ArrayList<>(); List successfulFlowFiles = new ArrayList<>(); @@ -106,46 +104,66 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { for (int i = 0; i < flowFiles.size(); i++) { FlowFile flowFile = flowFiles.get(i); + final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); session.exportTo(flowFile, baos); - records.add(new Record().withData(ByteBuffer.wrap(baos.toByteArray()))); + + if (recordHash.containsKey(firehoseStreamName) == false) { + recordHash.put(firehoseStreamName, new ArrayList<>()); + } + + if (hashFlowFiles.containsKey(firehoseStreamName) == false) { + hashFlowFiles.put(firehoseStreamName, new ArrayList<>()); + } + + hashFlowFiles.get(firehoseStreamName).add(flowFile); + recordHash.get(firehoseStreamName).add(new Record().withData(ByteBuffer.wrap(baos.toByteArray()))); } - if ( records.size() > 0 ) { - // Send the batch - PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest(); - putRecordBatchRequest.setDeliveryStreamName(firehoseStreamName); - putRecordBatchRequest.setRecords(records); - PutRecordBatchResult results = client.putRecordBatch(putRecordBatchRequest); + for (Map.Entry> entryRecord : recordHash.entrySet()) { + String streamName = entryRecord.getKey(); + List records = entryRecord.getValue(); - // Separate out the successful and failed flow files - List responseEntries = results.getRequestResponses(); - for (int i = 0; i < responseEntries.size(); i++ ) { - PutRecordBatchResponseEntry entry = responseEntries.get(i); - FlowFile flowFile = flowFiles.get(i); + if (records.size() > 0) { + // Send the batch + PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest(); + putRecordBatchRequest.setDeliveryStreamName(streamName); + putRecordBatchRequest.setRecords(records); + PutRecordBatchResult results = client.putRecordBatch(putRecordBatchRequest); - Map attributes = new HashMap<>(); - attributes.put(AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId()); - flowFile = session.putAttribute(flowFile, AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId()); - if ( ! StringUtils.isBlank(entry.getErrorCode()) ) { - attributes.put(AWS_KINESIS_FIREHOSE_ERROR_CODE, entry.getErrorCode()); - attributes.put(AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, entry.getErrorMessage()); - flowFile = session.putAllAttributes(flowFile, attributes); - failedFlowFiles.add(flowFile); - } else { - flowFile = session.putAllAttributes(flowFile, attributes); - successfulFlowFiles.add(flowFile); + // Separate out the successful and failed flow files + List responseEntries = results.getRequestResponses(); + for (int i = 0; i < responseEntries.size(); i++ ) { + + PutRecordBatchResponseEntry entry = responseEntries.get(i); + FlowFile flowFile = hashFlowFiles.get(streamName).get(i); + + Map attributes = new HashMap<>(); + attributes.put(AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId()); + flowFile = session.putAttribute(flowFile, AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId()); + if (StringUtils.isBlank(entry.getErrorCode()) == false) { + attributes.put(AWS_KINESIS_FIREHOSE_ERROR_CODE, entry.getErrorCode()); + attributes.put(AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, entry.getErrorMessage()); + flowFile = session.putAllAttributes(flowFile, attributes); + failedFlowFiles.add(flowFile); + } else { + flowFile = session.putAllAttributes(flowFile, attributes); + successfulFlowFiles.add(flowFile); + } } + recordHash.get(streamName).clear(); + records.clear(); } - if ( failedFlowFiles.size() > 0 ) { - session.transfer(failedFlowFiles, REL_FAILURE); - getLogger().error("Failed to publish to kinesis firehose {} records {}", new Object[]{firehoseStreamName, failedFlowFiles}); - } - if ( successfulFlowFiles.size() > 0 ) { - session.transfer(successfulFlowFiles, REL_SUCCESS); - getLogger().info("Successfully published to kinesis firehose {} records {}", new Object[]{firehoseStreamName, successfulFlowFiles}); - } - records.clear(); + } + + if (failedFlowFiles.size() > 0) { + session.transfer(failedFlowFiles, REL_FAILURE); + getLogger().error("Failed to publish to kinesis firehose {}", new Object[]{failedFlowFiles}); + } + if (successfulFlowFiles.size() > 0) { + session.transfer(successfulFlowFiles, REL_SUCCESS); + getLogger().info("Successfully published to kinesis firehose {}", new Object[]{successfulFlowFiles}); } } catch (final Exception exception) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java index 7694fd4441..c6d79e0d79 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java @@ -94,15 +94,16 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor { final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue(); - final String streamName = context.getProperty(KINESIS_STREAM_NAME).getValue(); - List flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, streamName, - AWS_KINESIS_ERROR_MESSAGE); + List flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, AWS_KINESIS_ERROR_MESSAGE); + + HashMap> hashFlowFiles = new HashMap<>(); + HashMap> recordHash = new HashMap>(); final AmazonKinesisClient client = getClient(); + try { - List records = new ArrayList<>(); List failedFlowFiles = new ArrayList<>(); List successfulFlowFiles = new ArrayList<>(); @@ -111,64 +112,80 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor { for (int i = 0; i < flowFiles.size(); i++) { FlowFile flowFile = flowFiles.get(i); + String streamName = context.getProperty(KINESIS_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue();; + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); session.exportTo(flowFile, baos); PutRecordsRequestEntry record = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray())); String partitionKey = context.getProperty(PutKinesisStream.KINESIS_PARTITION_KEY) - .evaluateAttributeExpressions(flowFiles.get(i)).getValue(); + .evaluateAttributeExpressions(flowFiles.get(i)).getValue(); - if ( ! StringUtils.isBlank(partitionKey) ) { + if (StringUtils.isBlank(partitionKey) == false) { record.setPartitionKey(partitionKey); } else { record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt())); } - records.add(record); + if (recordHash.containsKey(streamName) == false) { + recordHash.put(streamName, new ArrayList<>()); + } + if (hashFlowFiles.containsKey(streamName) == false) { + hashFlowFiles.put(streamName, new ArrayList<>()); + } + + hashFlowFiles.get(streamName).add(flowFile); + recordHash.get(streamName).add(record); } - if ( records.size() > 0 ) { + for (Map.Entry> entryRecord : recordHash.entrySet()) { + String streamName = entryRecord.getKey(); + List records = entryRecord.getValue(); - PutRecordsRequest putRecordRequest = new PutRecordsRequest(); - putRecordRequest.setStreamName(streamName); - putRecordRequest.setRecords(records); - PutRecordsResult results = client.putRecords(putRecordRequest); + if (records.size() > 0) { - List responseEntries = results.getRecords(); - for (int i = 0; i < responseEntries.size(); i++ ) { - PutRecordsResultEntry entry = responseEntries.get(i); - FlowFile flowFile = flowFiles.get(i); + PutRecordsRequest putRecordRequest = new PutRecordsRequest(); + putRecordRequest.setStreamName(streamName); + putRecordRequest.setRecords(records); + PutRecordsResult results = client.putRecords(putRecordRequest); - Map attributes = new HashMap<>(); - attributes.put(AWS_KINESIS_SHARD_ID, entry.getShardId()); - attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry.getSequenceNumber()); + List responseEntries = results.getRecords(); + for (int i = 0; i < responseEntries.size(); i++ ) { + PutRecordsResultEntry entry = responseEntries.get(i); + FlowFile flowFile = hashFlowFiles.get(streamName).get(i); - if ( ! StringUtils.isBlank(entry.getErrorCode()) ) { - attributes.put(AWS_KINESIS_ERROR_CODE, entry.getErrorCode()); - attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry.getErrorMessage()); - flowFile = session.putAllAttributes(flowFile, attributes); - failedFlowFiles.add(flowFile); - } else { - flowFile = session.putAllAttributes(flowFile, attributes); - successfulFlowFiles.add(flowFile); + Map attributes = new HashMap<>(); + attributes.put(AWS_KINESIS_SHARD_ID, entry.getShardId()); + attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry.getSequenceNumber()); + + if (StringUtils.isBlank(entry.getErrorCode()) == false) { + attributes.put(AWS_KINESIS_ERROR_CODE, entry.getErrorCode()); + attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry.getErrorMessage()); + flowFile = session.putAllAttributes(flowFile, attributes); + failedFlowFiles.add(flowFile); + } else { + flowFile = session.putAllAttributes(flowFile, attributes); + successfulFlowFiles.add(flowFile); + } } } - if ( failedFlowFiles.size() > 0 ) { - session.transfer(failedFlowFiles, REL_FAILURE); - getLogger().error("Failed to publish to kinesis {} records {}", new Object[]{streamName, failedFlowFiles}); - } - if ( successfulFlowFiles.size() > 0 ) { - session.transfer(successfulFlowFiles, REL_SUCCESS); - getLogger().debug("Successfully published to kinesis {} records {}", new Object[]{streamName, successfulFlowFiles}); - } + recordHash.get(streamName).clear(); records.clear(); } + if ( failedFlowFiles.size() > 0 ) { + session.transfer(failedFlowFiles, REL_FAILURE); + getLogger().error("Failed to publish to kinesis records {}", new Object[]{failedFlowFiles}); + } + if ( successfulFlowFiles.size() > 0 ) { + session.transfer(successfulFlowFiles, REL_SUCCESS); + getLogger().debug("Successfully published to kinesis records {}", new Object[]{successfulFlowFiles}); + } + } catch (final Exception exception) { - getLogger().error("Failed to publish due to exception {} to kinesis {} flowfiles {} ", new Object[]{exception, streamName, flowFiles}); + getLogger().error("Failed to publish due to exception {} flowfiles {} ", new Object[]{exception, flowFiles}); session.transfer(flowFiles, REL_FAILURE); context.yield(); } } - }