diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 242c917769..9b24bc7692 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -347,13 +347,15 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe private Collection getBundles() throws IOException { final List flowFiles = new ArrayList<>(); for (final BundleTracker tracker : bundleMap.values()) { - processBundle(tracker); - flowFiles.add(tracker.flowFile); + final boolean includeBundle = processBundle(tracker); + if (includeBundle) { + flowFiles.add(tracker.flowFile); + } } return flowFiles; } - private void processBundle(final BundleTracker bundle) throws IOException { + private boolean processBundle(final BundleTracker bundle) throws IOException { final RecordSetWriter writer = bundle.recordWriter; if (writer != null) { final WriteResult writeResult; @@ -364,6 +366,11 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe writer.close(); } + if (writeResult.getRecordCount() == 0) { + getProcessSession().remove(bundle.flowFile); + return false; + } + final Map attributes = new HashMap<>(); attributes.putAll(writeResult.getAttributes()); attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); @@ -372,6 +379,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } populateAttributes(bundle); + return true; } private void writeData(final ProcessSession session, ConsumerRecord record, final TopicPartition topicPartition) { @@ -417,6 +425,35 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe bundleMap.put(bundleInfo, tracker); } + private void handleParseFailure(final ConsumerRecord consumerRecord, final ProcessSession session, final Exception cause) { + handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. " + + "Will route message as its own FlowFile to the 'parse.failure' relationship"); + } + + private void handleParseFailure(final ConsumerRecord consumerRecord, final ProcessSession session, final Exception cause, final String message) { + // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship + final Map attributes = new HashMap<>(); + attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset())); + attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition())); + attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic()); + + FlowFile failureFlowFile = session.create(); + failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); + failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); + + final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); + session.getProvenanceReporter().receive(failureFlowFile, transitUri); + + session.transfer(failureFlowFile, REL_PARSE_FAILURE); + + if (cause == null) { + logger.error(message); + } else { + logger.error(message, cause); + } + + session.adjustCounter("Parse Failures", 1, false); + } private void writeRecordData(final ProcessSession session, final List> records, final TopicPartition topicPartition) { // In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile. @@ -433,24 +470,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger); record = reader.nextRecord(); } catch (final Exception e) { - // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship - final Map attributes = new HashMap<>(); - attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset())); - attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition())); - attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic()); + handleParseFailure(consumerRecord, session, e); + continue; + } - FlowFile failureFlowFile = session.create(); - failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); - failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); - - final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic()); - session.getProvenanceReporter().receive(failureFlowFile, transitUri); - - session.transfer(failureFlowFile, REL_PARSE_FAILURE); - logger.error("Failed to parse message from Kafka using the configured Record Reader. " - + "Will route message as its own FlowFile to the 'parse.failure' relationship", e); - - session.adjustCounter("Parse Failures", 1, false); + if (record == null) { + handleParseFailure(consumerRecord, session, null); continue; } @@ -489,7 +514,14 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe writer = tracker.recordWriter; } - writer.write(record); + try { + writer.write(record); + } catch (final RuntimeException re) { + handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. " + + "Will route message as its own FlowFile to the 'parse.failure' relationship"); + continue; + } + tracker.incrementRecordCount(1L); }