diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index b2665aeda0..ac24e1f4b9 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -171,6 +171,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final ConsumerRecords records = kafkaConsumer.poll(10); lastPollEmpty = records.count() == 0; processRecords(records); + } catch (final ProcessException pe) { + throw pe; } catch (final Throwable t) { this.poison(); throw t; @@ -405,11 +407,15 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } private void writeRecordData(final ProcessSession session, final List> records, final TopicPartition topicPartition) { + if (records.isEmpty()) { + return; + } + FlowFile flowFile = session.create(); try { final RecordSetWriter writer; try { - writer = writerFactory.createWriter(logger, flowFile, new ByteArrayInputStream(new byte[0])); + writer = writerFactory.createWriter(logger, flowFile, new ByteArrayInputStream(records.get(0).value())); } catch (final Exception e) { logger.error( "Failed to obtain a Record Writer for serializing Kafka messages. This generally happens because the "