diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 1f58ab3965..711c2bc6d4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -387,9 +387,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe FlowFile flowFile = session.create(); final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding); tracker.incrementRecordCount(1); - flowFile = session.write(flowFile, out -> { - out.write(record.value()); - }); + final byte[] value = record.value(); + if (value != null) { + flowFile = session.write(flowFile, out -> { + out.write(value); + }); + } tracker.updateFlowFile(flowFile); populateAttributes(tracker); session.transfer(tracker.flowFile, REL_SUCCESS); @@ -418,7 +421,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe if (useDemarcator) { out.write(demarcatorBytes); } - out.write(record.value()); + final byte[] value = record.value(); + if (value != null) { + out.write(record.value()); + } useDemarcator = true; } }); @@ -439,7 +445,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic()); FlowFile failureFlowFile = session.create(); - failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); + final byte[] value = consumerRecord.value(); + if (value != null) { + failureFlowFile = session.write(failureFlowFile, out -> out.write(value)); + } failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index de196480d1..2d893d8f6f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -396,9 +396,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe FlowFile flowFile = session.create(); final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding); tracker.incrementRecordCount(1); - flowFile = session.write(flowFile, out -> { - out.write(record.value()); - }); + final byte[] value = record.value(); + if (value != null) { + flowFile = session.write(flowFile, out -> { + out.write(value); + }); + } tracker.updateFlowFile(flowFile); populateAttributes(tracker); session.transfer(tracker.flowFile, REL_SUCCESS); @@ -436,7 +439,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe if (useDemarcator) { out.write(demarcatorBytes); } - out.write(record.value()); + final byte[] value = record.value(); + if (value != null) { + out.write(record.value()); + } useDemarcator = true; } }); @@ -460,7 +466,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe FlowFile failureFlowFile = session.create(); - failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); + final byte[] value = consumerRecord.value(); + if (value != null) { + failureFlowFile = session.write(failureFlowFile, out -> out.write(value)); + } failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 53e7a23752..43dfd17dfa 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -358,9 +358,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe FlowFile flowFile = session.create(); final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding); tracker.incrementRecordCount(1); - flowFile = session.write(flowFile, out -> { - out.write(record.value()); - }); + final byte[] value = record.value(); + if (value != null) { + flowFile = session.write(flowFile, out -> { + out.write(value); + }); + } tracker.updateFlowFile(flowFile); populateAttributes(tracker); session.transfer(tracker.flowFile, REL_SUCCESS); @@ -387,7 +390,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe if (useDemarcator) { out.write(demarcatorBytes); } - out.write(record.value()); + final byte[] value = record.value(); + if (value != null) { + out.write(record.value()); + } useDemarcator = true; } });