From 2d5b8c7267eca6e0f234eeaf9e3553bdfba75da9 Mon Sep 17 00:00:00 2001 From: gardellajuanpablo Date: Fri, 29 Sep 2017 10:49:21 -0300 Subject: [PATCH] NIFI-4330 ConsumeKafka* throw NullPointerException if Kafka message has a null value It is possible null values to be stored in Kafka topics. Fixed handle this scenario. Notice without this fix, the consumer is unable to consume more messages (at least without removing messages from the queue). --- .../kafka/pubsub/ConsumerLease.java | 19 ++++++++++++++----- .../kafka/pubsub/ConsumerLease.java | 19 ++++++++++++++----- .../kafka/pubsub/ConsumerLease.java | 14 ++++++++++---- 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 1f58ab3965..711c2bc6d4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -387,9 +387,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe FlowFile flowFile = session.create(); final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding); tracker.incrementRecordCount(1); - flowFile = session.write(flowFile, out -> { - out.write(record.value()); - }); + final byte[] value = record.value(); + if (value != null) { + flowFile = session.write(flowFile, out -> { + out.write(value); + }); + } tracker.updateFlowFile(flowFile); populateAttributes(tracker); session.transfer(tracker.flowFile, REL_SUCCESS); @@ -418,7 +421,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe if (useDemarcator) { out.write(demarcatorBytes); } - out.write(record.value()); + final byte[] value = record.value(); + if (value != null) { + out.write(record.value()); + } useDemarcator = true; } }); @@ -439,7 +445,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic()); FlowFile failureFlowFile = session.create(); - failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); + final byte[] value = consumerRecord.value(); + if (value != null) { + failureFlowFile = session.write(failureFlowFile, out -> out.write(value)); + } failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index de196480d1..2d893d8f6f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -396,9 +396,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe FlowFile flowFile = session.create(); final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding); tracker.incrementRecordCount(1); - flowFile = session.write(flowFile, out -> { - out.write(record.value()); - }); + final byte[] value = record.value(); + if (value != null) { + flowFile = session.write(flowFile, out -> { + out.write(value); + }); + } tracker.updateFlowFile(flowFile); populateAttributes(tracker); session.transfer(tracker.flowFile, REL_SUCCESS); @@ -436,7 +439,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe if (useDemarcator) { out.write(demarcatorBytes); } - out.write(record.value()); + final byte[] value = record.value(); + if (value != null) { + out.write(record.value()); + } useDemarcator = true; } }); @@ -460,7 +466,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe FlowFile failureFlowFile = session.create(); - failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); + final byte[] value = consumerRecord.value(); + if (value != null) { + failureFlowFile = session.write(failureFlowFile, out -> out.write(value)); + } failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 53e7a23752..43dfd17dfa 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -358,9 +358,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe FlowFile flowFile = session.create(); final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding); tracker.incrementRecordCount(1); - flowFile = session.write(flowFile, out -> { - out.write(record.value()); - }); + final byte[] value = record.value(); + if (value != null) { + flowFile = session.write(flowFile, out -> { + out.write(value); + }); + } tracker.updateFlowFile(flowFile); populateAttributes(tracker); session.transfer(tracker.flowFile, REL_SUCCESS); @@ -387,7 +390,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe if (useDemarcator) { out.write(demarcatorBytes); } - out.write(record.value()); + final byte[] value = record.value(); + if (value != null) { + out.write(record.value()); + } useDemarcator = true; } });