diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 8dc13f43ff..26562b9e81 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -446,7 +446,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic()); FlowFile failureFlowFile = session.create(); - failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); + if (consumerRecord.value() != null) { + failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); + } failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic()); @@ -461,7 +463,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe try { for (final ConsumerRecord consumerRecord : messages) { - try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) { + final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] : consumerRecord.value(); + try (final InputStream in = new ByteArrayInputStream(recordBytes)) { final RecordReader reader; final Record firstRecord; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 97eed177a1..058778811e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -512,69 +512,69 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe for (final ConsumerRecord consumerRecord : records) { final Map attributes = getAttributes(consumerRecord); - final Record record; - try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) { - final RecordReader reader = readerFactory.createRecordReader(attributes, in, logger); - record = reader.nextRecord(); - } catch (final Exception e) { - handleParseFailure(consumerRecord, session, e); - continue; - } + final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] : consumerRecord.value(); + try (final InputStream in = new ByteArrayInputStream(recordBytes)) { + final RecordReader reader; - if (record == null) { - handleParseFailure(consumerRecord, session, null); - continue; - } - - // Determine the bundle for this record. - final RecordSchema recordSchema = record.getSchema(); - final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes); - - BundleTracker tracker = bundleMap.get(bundleInfo); - if (tracker == null) { - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - - final OutputStream rawOut = session.write(flowFile); - - final RecordSchema writeSchema; try { - writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema); + reader = readerFactory.createRecordReader(attributes, in, logger); } catch (final Exception e) { - logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); - - try { - rollback(topicPartition); - } catch (final Exception rollbackException) { - logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); - } - - yield(); - throw new ProcessException(e); + handleParseFailure(consumerRecord, session, e); + continue; } - writer = writerFactory.createWriter(logger, writeSchema, rawOut); - writer.beginRecordSet(); + Record record; + while ((record = reader.nextRecord()) != null) { + // Determine the bundle for this record. + final RecordSchema recordSchema = record.getSchema(); + final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes); - tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer); - tracker.updateFlowFile(flowFile); - bundleMap.put(bundleInfo, tracker); - } else { - writer = tracker.recordWriter; + BundleTracker tracker = bundleMap.get(bundleInfo); + if (tracker == null) { + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + + final OutputStream rawOut = session.write(flowFile); + + final RecordSchema writeSchema; + try { + writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema); + } catch (final Exception e) { + logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); + + try { + rollback(topicPartition); + } catch (final Exception rollbackException) { + logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); + } + + yield(); + throw new ProcessException(e); + } + + writer = writerFactory.createWriter(logger, writeSchema, rawOut); + writer.beginRecordSet(); + + tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer); + tracker.updateFlowFile(flowFile); + bundleMap.put(bundleInfo, tracker); + } else { + writer = tracker.recordWriter; + } + + try { + writer.write(record); + } catch (final RuntimeException re) { + handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. " + + "Will route message as its own FlowFile to the 'parse.failure' relationship"); + continue; + } + + tracker.incrementRecordCount(1L); + session.adjustCounter("Records Received", records.size(), false); + } } - - try { - writer.write(record); - } catch (final RuntimeException re) { - handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. " - + "Will route message as its own FlowFile to the 'parse.failure' relationship"); - continue; - } - - tracker.incrementRecordCount(1L); } - - session.adjustCounter("Records Received", records.size(), false); } catch (final Exception e) { logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);