diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index effd2e4cdb..ee6b1ff03a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -23,15 +23,16 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E import java.io.ByteArrayInputStream; import java.io.Closeable; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.xml.bind.DatatypeConverter; @@ -56,7 +57,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.util.Tuple; /** * This class represents a lease to access a Kafka Consumer object. The lease is @@ -79,11 +79,11 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe private boolean poisoned = false; //used for tracking demarcated flowfiles to their TopicPartition so we can append //to them on subsequent poll calls - private final Map bundleMap = new HashMap<>(); + private final Map bundleMap = new HashMap<>(); private final Map uncommittedOffsetsMap = new HashMap<>(); private long leaseStartNanos = -1; private boolean lastPollEmpty = false; - private int totalFlowFiles = 0; + private int totalMessages = 0; ConsumerLease( final long maxWaitMillis, @@ -115,7 +115,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe uncommittedOffsetsMap.clear(); leaseStartNanos = -1; lastPollEmpty = false; - totalFlowFiles = 0; + totalMessages = 0; } /** @@ -206,6 +206,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe kafkaConsumer.commitSync(offsetsMap); resetInternalState(); return true; + } catch (final IOException ioe) { + poison(); + logger.error("Failed to finish writing out FlowFile bundle", ioe); + throw new ProcessException(ioe); } catch (final KafkaException kex) { poison(); logger.warn("Duplicates are likely as we were able to commit the process" @@ -253,7 +257,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track return false; } else { - return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property + return totalMessages < 1000;//admittedlly a magic number - good candidate for processor property } } @@ -315,12 +319,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } else if (readerFactory != null && writerFactory != null) { writeRecordData(getProcessSession(), messages, partition); } else { - totalFlowFiles += messages.size(); messages.stream().forEach(message -> { writeData(getProcessSession(), message, partition); }); } + totalMessages += messages.size(); uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L)); } }); @@ -340,15 +344,36 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } } - private Collection getBundles() { + private Collection getBundles() throws IOException { final List flowFiles = new ArrayList<>(); for (final BundleTracker tracker : bundleMap.values()) { - populateAttributes(tracker); + processBundle(tracker); flowFiles.add(tracker.flowFile); } return flowFiles; } + private void processBundle(final BundleTracker bundle) throws IOException { + final RecordSetWriter writer = bundle.recordWriter; + if (writer != null) { + final WriteResult writeResult; + + try { + writeResult = writer.finishRecordSet(); + } finally { + writer.close(); + } + + final Map attributes = new HashMap<>(); + attributes.putAll(writeResult.getAttributes()); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + + bundle.flowFile = getProcessSession().putAllAttributes(bundle.flowFile, attributes); + } + + populateAttributes(bundle); + } + private void writeData(final ProcessSession session, ConsumerRecord record, final TopicPartition topicPartition) { FlowFile flowFile = session.create(); final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding); @@ -364,7 +389,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe private void writeDemarcatedData(final ProcessSession session, final List> records, final TopicPartition topicPartition) { final ConsumerRecord firstRecord = records.get(0); final boolean demarcateFirstRecord; - BundleTracker tracker = bundleMap.get(topicPartition); + final BundleInformation bundleInfo = new BundleInformation(topicPartition, null); + BundleTracker tracker = bundleMap.get(bundleInfo); FlowFile flowFile; if (tracker == null) { tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding); @@ -388,39 +414,22 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } }); tracker.updateFlowFile(flowFile); - bundleMap.put(topicPartition, tracker); + bundleMap.put(bundleInfo, tracker); } - private void rollback(final TopicPartition topicPartition) { - OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition); - if (offsetAndMetadata == null) { - offsetAndMetadata = kafkaConsumer.committed(topicPartition); - } - - final long offset = offsetAndMetadata.offset(); - kafkaConsumer.seek(topicPartition, offset); - } private void writeRecordData(final ProcessSession session, final List> records, final TopicPartition topicPartition) { - if (records.isEmpty()) { - return; - } - - final Map> writers = new HashMap<>(); - // In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile. // We don't want to create a new FlowFile for each record that we receive, so we will just create // a "temporary flowfile" that will be removed in the finally block below and use that to pass to // the createRecordReader method. final FlowFile tempFlowFile = session.create(); - try { - final Iterator> itr = records.iterator(); - while (itr.hasNext()) { - final ConsumerRecord consumerRecord = itr.next(); - final InputStream in = new ByteArrayInputStream(consumerRecord.value()); + RecordSetWriter writer = null; + try { + for (final ConsumerRecord consumerRecord : records) { final Record record; - try { + try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) { final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger); record = reader.nextRecord(); } catch (final Exception e) { @@ -445,10 +454,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe continue; } + // Determine the bundle for this record. final RecordSchema recordSchema = record.getSchema(); + final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema); - Tuple tuple = writers.get(recordSchema); - if (tuple == null) { + BundleTracker tracker = bundleMap.get(bundleInfo); + if (tracker == null) { FlowFile flowFile = session.create(); final OutputStream rawOut = session.write(flowFile); @@ -468,19 +479,32 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe throw new ProcessException(e); } - final RecordSetWriter writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut); + writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut); writer.beginRecordSet(); - tuple = new Tuple<>(flowFile, writer); - writers.put(recordSchema, tuple); + tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer); + tracker.updateFlowFile(flowFile); + bundleMap.put(bundleInfo, tracker); + } else { + writer = tracker.recordWriter; } - final RecordSetWriter writer = tuple.getValue(); writer.write(record); + tracker.incrementRecordCount(1L); } + + session.adjustCounter("Records Received", records.size(), false); } catch (final Exception e) { logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e); + try { + if (writer != null) { + writer.close(); + } + } catch (final Exception ioe) { + logger.warn("Failed to close Record Writer", ioe); + } + try { rollback(topicPartition); } catch (final Exception rollbackException) { @@ -491,50 +515,21 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } finally { session.remove(tempFlowFile); } - - for (final Tuple tuple : writers.values()) { - FlowFile flowFile = tuple.getKey(); - final RecordSetWriter writer = tuple.getValue(); - - final WriteResult writeResult; - try { - writeResult = writer.finishRecordSet(); - writer.close(); - } catch (final Exception e) { - logger.error("Failed to finish writing records to Content Repository", e); - try { - rollback(topicPartition); - } catch (final Exception rollbackException) { - logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); - } - throw new ProcessException(e); - } - - final int recordCount = writeResult.getRecordCount(); - if (recordCount > 0) { - final Map attributes = new HashMap<>(); - attributes.putAll(writeResult.getAttributes()); - attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - attributes.put("record.count", String.valueOf(recordCount)); - - attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition())); - attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic()); - - flowFile = session.putAllAttributes(flowFile, attributes); - - final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos); - final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic()); - session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis); - - session.adjustCounter("Records Received", recordCount, false); - session.transfer(flowFile, REL_SUCCESS); - } else { - session.remove(flowFile); - } - } } + private void rollback(final TopicPartition topicPartition) { + OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition); + if (offsetAndMetadata == null) { + offsetAndMetadata = kafkaConsumer.committed(topicPartition); + } + + final long offset = offsetAndMetadata.offset(); + kafkaConsumer.seek(topicPartition, offset); + } + + + private void populateAttributes(final BundleTracker tracker) { final Map kafkaAttrs = new HashMap<>(); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset)); @@ -544,7 +539,13 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition)); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic); if (tracker.totalRecords > 1) { - kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords)); + // Add a record.count attribute to remain consistent with other record-oriented processors. If not + // reading/writing records, then use "kafka.count" attribute. + if (tracker.recordWriter == null) { + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords)); + } else { + kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords)); + } } final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs); final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos); @@ -559,13 +560,19 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final int partition; final String topic; final String key; + final RecordSetWriter recordWriter; FlowFile flowFile; long totalRecords = 0; private BundleTracker(final ConsumerRecord initialRecord, final TopicPartition topicPartition, final String keyEncoding) { + this(initialRecord, topicPartition, keyEncoding, null); + } + + private BundleTracker(final ConsumerRecord initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) { this.initialOffset = initialRecord.offset(); this.partition = topicPartition.partition(); this.topic = topicPartition.topic(); + this.recordWriter = recordWriter; this.key = encodeKafkaKey(initialRecord.key(), keyEncoding); } @@ -579,4 +586,33 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } + private static class BundleInformation { + private final TopicPartition topicPartition; + private final RecordSchema schema; + + public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema) { + this.topicPartition = topicPartition; + this.schema = schema; + } + + @Override + public int hashCode() { + return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof BundleInformation)) { + return false; + } + final BundleInformation other = (BundleInformation) obj; + return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema); + } + } }