diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java index c44e25e883..adb7a6f3af 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java @@ -55,11 +55,11 @@ import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriterFactory; @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.10.x Consumer API. " - + "The complementary NiFi processor for sending messages is PublishKafka_0_10. Please note that, at this time, the Processor assumes that " + + "The complementary NiFi processor for sending messages is PublishKafkaRecord_0_10. Please note that, at this time, the Processor assumes that " + "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the " + "configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " + "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. " - + "A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile.") + + "A 'record.count' attribute is added to indicate how many records are contained in the FlowFile.") @Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.10.x"}) @WritesAttributes({ @WritesAttribute(attribute = "record.count", description = "The number of records received"), diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 64af412cc3..8dc13f43ff 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import javax.xml.bind.DatatypeConverter; @@ -432,105 +433,99 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe bundleMap.put(bundleInfo, tracker); } - private void handleParseFailure(final ConsumerRecord consumerRecord, final ProcessSession session, final Exception cause) { - handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. " - + "Will route message as its own FlowFile to the 'parse.failure' relationship"); - } - private void handleParseFailure(final ConsumerRecord consumerRecord, final ProcessSession session, final Exception cause, final String message) { - // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship - final Map attributes = new HashMap<>(); - attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset())); - attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition())); - attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic()); - - FlowFile failureFlowFile = session.create(); - final byte[] value = consumerRecord.value(); - if (value != null) { - failureFlowFile = session.write(failureFlowFile, out -> out.write(value)); - } - failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); - - final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); - session.getProvenanceReporter().receive(failureFlowFile, transitUri); - - session.transfer(failureFlowFile, REL_PARSE_FAILURE); - - if (cause == null) { - logger.error(message); - } else { - logger.error(message, cause); - } - - session.adjustCounter("Parse Failures", 1, false); - } - - private void writeRecordData(final ProcessSession session, final List> records, final TopicPartition topicPartition) { + private void writeRecordData(final ProcessSession session, final List> messages, final TopicPartition topicPartition) { RecordSetWriter writer = null; + final BiConsumer, Exception> handleParseFailure = (consumerRecord, e) -> { + // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship + // And continue to the next message. + final Map attributes = new HashMap<>(); + attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset())); + attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition())); + attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic()); + + FlowFile failureFlowFile = session.create(); + failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); + failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); + + final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic()); + session.getProvenanceReporter().receive(failureFlowFile, transitUri); + + session.transfer(failureFlowFile, REL_PARSE_FAILURE); + logger.error("Failed to parse message from Kafka using the configured Record Reader. " + + "Will route message as its own FlowFile to the 'parse.failure' relationship", e); + + session.adjustCounter("Parse Failures", 1, false); + }; + try { - for (final ConsumerRecord consumerRecord : records) { - final Record record; + for (final ConsumerRecord consumerRecord : messages) { try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) { - final RecordReader reader = readerFactory.createRecordReader(Collections.EMPTY_MAP, in, logger); - record = reader.nextRecord(); - } catch (final Exception e) { - handleParseFailure(consumerRecord, session, e); - continue; - } - if (record == null) { - handleParseFailure(consumerRecord, session, null); - continue; - } + final RecordReader reader; + final Record firstRecord; - // Determine the bundle for this record. - final RecordSchema recordSchema = record.getSchema(); - final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema); - - BundleTracker tracker = bundleMap.get(bundleInfo); - if (tracker == null) { - FlowFile flowFile = session.create(); - final OutputStream rawOut = session.write(flowFile); - - final RecordSchema writeSchema; try { - writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema); + reader = readerFactory.createRecordReader(Collections.emptyMap(), in, logger); + firstRecord = reader.nextRecord(); } catch (final Exception e) { - logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); - - try { - rollback(topicPartition); - } catch (final Exception rollbackException) { - logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); - } - - yield(); - throw new ProcessException(e); + handleParseFailure.accept(consumerRecord, e); + continue; } - writer = writerFactory.createWriter(logger, writeSchema, rawOut); - writer.beginRecordSet(); + if (firstRecord == null) { + // If the message doesn't contain any record, do nothing. + continue; + } - tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer); - tracker.updateFlowFile(flowFile); - bundleMap.put(bundleInfo, tracker); - } else { - writer = tracker.recordWriter; + // Determine the bundle for this record. + final RecordSchema recordSchema = firstRecord.getSchema(); + final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema); + + BundleTracker tracker = bundleMap.get(bundleInfo); + if (tracker == null) { + FlowFile flowFile = session.create(); + final OutputStream rawOut = session.write(flowFile); + + final RecordSchema writeSchema; + try { + writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema); + } catch (final Exception e) { + logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); + + try { + rollback(topicPartition); + } catch (final Exception rollbackException) { + logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); + } + + yield(); + throw new ProcessException(e); + } + + writer = writerFactory.createWriter(logger, writeSchema, rawOut); + writer.beginRecordSet(); + + tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer); + tracker.updateFlowFile(flowFile); + bundleMap.put(bundleInfo, tracker); + } else { + writer = tracker.recordWriter; + } + + try { + for (Record record = firstRecord; record != null; record = reader.nextRecord()) { + writer.write(record); + tracker.incrementRecordCount(1L); + session.adjustCounter("Records Received", 1, false); + } + } catch (Exception e) { + // Transfer it to 'parse failure' and continue to the next message. + handleParseFailure.accept(consumerRecord, e); + } } - - try { - writer.write(record); - } catch (final RuntimeException re) { - handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. " - + "Will route message as its own FlowFile to the 'parse.failure' relationship"); - continue; - } - - tracker.incrementRecordCount(1L); } - - session.adjustCounter("Records Received", records.size(), false); } catch (final Exception e) { logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);