NIFI-4330 ConsumeKafka* throw NullPointerException if Kafka message has a null value

It is possible null values to be stored in Kafka topics. Fixed handle this scenario.
Notice without this fix, the consumer is unable to consume more messages (at least
without removing messages from the queue).
This commit is contained in:
gardellajuanpablo 2017-09-29 10:49:21 -03:00
parent feaf44b623
commit 2d5b8c7267
3 changed files with 38 additions and 14 deletions

View File

@ -387,9 +387,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding); final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
tracker.incrementRecordCount(1); tracker.incrementRecordCount(1);
final byte[] value = record.value();
if (value != null) {
flowFile = session.write(flowFile, out -> { flowFile = session.write(flowFile, out -> {
out.write(record.value()); out.write(value);
}); });
}
tracker.updateFlowFile(flowFile); tracker.updateFlowFile(flowFile);
populateAttributes(tracker); populateAttributes(tracker);
session.transfer(tracker.flowFile, REL_SUCCESS); session.transfer(tracker.flowFile, REL_SUCCESS);
@ -418,7 +421,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
if (useDemarcator) { if (useDemarcator) {
out.write(demarcatorBytes); out.write(demarcatorBytes);
} }
final byte[] value = record.value();
if (value != null) {
out.write(record.value()); out.write(record.value());
}
useDemarcator = true; useDemarcator = true;
} }
}); });
@ -439,7 +445,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic()); attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
FlowFile failureFlowFile = session.create(); FlowFile failureFlowFile = session.create();
failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); final byte[] value = consumerRecord.value();
if (value != null) {
failureFlowFile = session.write(failureFlowFile, out -> out.write(value));
}
failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic());

View File

@ -396,9 +396,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding); final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
tracker.incrementRecordCount(1); tracker.incrementRecordCount(1);
final byte[] value = record.value();
if (value != null) {
flowFile = session.write(flowFile, out -> { flowFile = session.write(flowFile, out -> {
out.write(record.value()); out.write(value);
}); });
}
tracker.updateFlowFile(flowFile); tracker.updateFlowFile(flowFile);
populateAttributes(tracker); populateAttributes(tracker);
session.transfer(tracker.flowFile, REL_SUCCESS); session.transfer(tracker.flowFile, REL_SUCCESS);
@ -436,7 +439,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
if (useDemarcator) { if (useDemarcator) {
out.write(demarcatorBytes); out.write(demarcatorBytes);
} }
final byte[] value = record.value();
if (value != null) {
out.write(record.value()); out.write(record.value());
}
useDemarcator = true; useDemarcator = true;
} }
}); });
@ -460,7 +466,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
FlowFile failureFlowFile = session.create(); FlowFile failureFlowFile = session.create();
failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); final byte[] value = consumerRecord.value();
if (value != null) {
failureFlowFile = session.write(failureFlowFile, out -> out.write(value));
}
failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic());

View File

@ -358,9 +358,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding); final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
tracker.incrementRecordCount(1); tracker.incrementRecordCount(1);
final byte[] value = record.value();
if (value != null) {
flowFile = session.write(flowFile, out -> { flowFile = session.write(flowFile, out -> {
out.write(record.value()); out.write(value);
}); });
}
tracker.updateFlowFile(flowFile); tracker.updateFlowFile(flowFile);
populateAttributes(tracker); populateAttributes(tracker);
session.transfer(tracker.flowFile, REL_SUCCESS); session.transfer(tracker.flowFile, REL_SUCCESS);
@ -387,7 +390,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
if (useDemarcator) { if (useDemarcator) {
out.write(demarcatorBytes); out.write(demarcatorBytes);
} }
final byte[] value = record.value();
if (value != null) {
out.write(record.value()); out.write(record.value());
}
useDemarcator = true; useDemarcator = true;
} }
}); });