From 4b08cf116c852a0a969ba0bea0be0bc4811f20bb Mon Sep 17 00:00:00 2001 From: Matthew Formosa Date: Wed, 30 Oct 2019 14:55:33 +0100 Subject: [PATCH] NIFI-6824 - Handling NPE of header value when consuming from Kafka Signed-off-by: Pierre Villard This closes #3859. --- .../apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 5 +++-- .../apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 5 +++-- .../apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 5 +++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index bcba8ac9fb..62d696459d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -497,8 +497,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe for (final Header header : consumerRecord.headers()) { final String attributeName = header.key(); - if (headerNamePattern.matcher(attributeName).matches()) { - attributes.put(attributeName, new String(header.value(), headerCharacterSet)); + final byte[] attributeValue = header.value(); + if (headerNamePattern.matcher(attributeName).matches() && attributeValue != null) { + attributes.put(attributeName, new String(attributeValue, headerCharacterSet)); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 04176144f1..af969a876e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -497,8 +497,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe for (final Header header : consumerRecord.headers()) { final String attributeName = header.key(); - if (headerNamePattern.matcher(attributeName).matches()) { - attributes.put(attributeName, new String(header.value(), headerCharacterSet)); + final byte[] attributeValue = header.value(); + if (headerNamePattern.matcher(attributeName).matches() && attributeValue != null) { + attributes.put(attributeName, new String(attributeValue, headerCharacterSet)); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 77d53a979d..2674dd9dc3 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -497,8 +497,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe for (final Header header : consumerRecord.headers()) { final String attributeName = header.key(); - if (headerNamePattern.matcher(attributeName).matches()) { - attributes.put(attributeName, new String(header.value(), headerCharacterSet)); + final byte[] attributeValue = header.value(); + if (headerNamePattern.matcher(attributeName).matches() && attributeValue != null) { + attributes.put(attributeName, new String(attributeValue, headerCharacterSet)); } }