NIFI-3962: This closes #1846. Updated ConsumerLease to better handle batching of messages into a single FlowFile in the same way that it is handled for demarcated data

This commit is contained in:
Mark Payne 2017-05-23 15:08:37 -04:00 committed by joewitt
parent fb925fc182
commit 2c751a8e5b
1 changed files with 116 additions and 80 deletions

View File

@ -23,15 +23,16 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.DatatypeConverter;
@ -56,7 +57,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.Tuple;
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
@ -79,11 +79,11 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private boolean poisoned = false;
//used for tracking demarcated flowfiles to their TopicPartition so we can append
//to them on subsequent poll calls
private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<>();
private final Map<BundleInformation, BundleTracker> bundleMap = new HashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
private long leaseStartNanos = -1;
private boolean lastPollEmpty = false;
private int totalFlowFiles = 0;
private int totalMessages = 0;
ConsumerLease(
final long maxWaitMillis,
@ -115,7 +115,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
uncommittedOffsetsMap.clear();
leaseStartNanos = -1;
lastPollEmpty = false;
totalFlowFiles = 0;
totalMessages = 0;
}
/**
@ -206,6 +206,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
kafkaConsumer.commitSync(offsetsMap);
resetInternalState();
return true;
} catch (final IOException ioe) {
poison();
logger.error("Failed to finish writing out FlowFile bundle", ioe);
throw new ProcessException(ioe);
} catch (final KafkaException kex) {
poison();
logger.warn("Duplicates are likely as we were able to commit the process"
@ -253,7 +257,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
return false;
} else {
return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property
return totalMessages < 1000;//admittedlly a magic number - good candidate for processor property
}
}
@ -315,12 +319,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
} else if (readerFactory != null && writerFactory != null) {
writeRecordData(getProcessSession(), messages, partition);
} else {
totalFlowFiles += messages.size();
messages.stream().forEach(message -> {
writeData(getProcessSession(), message, partition);
});
}
totalMessages += messages.size();
uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
}
});
@ -340,15 +344,36 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
}
private Collection<FlowFile> getBundles() {
private Collection<FlowFile> getBundles() throws IOException {
final List<FlowFile> flowFiles = new ArrayList<>();
for (final BundleTracker tracker : bundleMap.values()) {
populateAttributes(tracker);
processBundle(tracker);
flowFiles.add(tracker.flowFile);
}
return flowFiles;
}
private void processBundle(final BundleTracker bundle) throws IOException {
final RecordSetWriter writer = bundle.recordWriter;
if (writer != null) {
final WriteResult writeResult;
try {
writeResult = writer.finishRecordSet();
} finally {
writer.close();
}
final Map<String, String> attributes = new HashMap<>();
attributes.putAll(writeResult.getAttributes());
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
bundle.flowFile = getProcessSession().putAllAttributes(bundle.flowFile, attributes);
}
populateAttributes(bundle);
}
private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
FlowFile flowFile = session.create();
final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
@ -364,7 +389,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private void writeDemarcatedData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
final boolean demarcateFirstRecord;
BundleTracker tracker = bundleMap.get(topicPartition);
final BundleInformation bundleInfo = new BundleInformation(topicPartition, null);
BundleTracker tracker = bundleMap.get(bundleInfo);
FlowFile flowFile;
if (tracker == null) {
tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding);
@ -388,39 +414,22 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
});
tracker.updateFlowFile(flowFile);
bundleMap.put(topicPartition, tracker);
bundleMap.put(bundleInfo, tracker);
}
private void rollback(final TopicPartition topicPartition) {
OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
if (offsetAndMetadata == null) {
offsetAndMetadata = kafkaConsumer.committed(topicPartition);
}
final long offset = offsetAndMetadata.offset();
kafkaConsumer.seek(topicPartition, offset);
}
private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
if (records.isEmpty()) {
return;
}
final Map<RecordSchema, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
// In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
// We don't want to create a new FlowFile for each record that we receive, so we will just create
// a "temporary flowfile" that will be removed in the finally block below and use that to pass to
// the createRecordReader method.
final FlowFile tempFlowFile = session.create();
try {
final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
while (itr.hasNext()) {
final ConsumerRecord<byte[], byte[]> consumerRecord = itr.next();
final InputStream in = new ByteArrayInputStream(consumerRecord.value());
RecordSetWriter writer = null;
try {
for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) {
final Record record;
try {
try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) {
final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger);
record = reader.nextRecord();
} catch (final Exception e) {
@ -445,10 +454,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
continue;
}
// Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema();
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema);
Tuple<FlowFile, RecordSetWriter> tuple = writers.get(recordSchema);
if (tuple == null) {
BundleTracker tracker = bundleMap.get(bundleInfo);
if (tracker == null) {
FlowFile flowFile = session.create();
final OutputStream rawOut = session.write(flowFile);
@ -468,19 +479,32 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
throw new ProcessException(e);
}
final RecordSetWriter writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut);
writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut);
writer.beginRecordSet();
tuple = new Tuple<>(flowFile, writer);
writers.put(recordSchema, tuple);
tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
tracker.updateFlowFile(flowFile);
bundleMap.put(bundleInfo, tracker);
} else {
writer = tracker.recordWriter;
}
final RecordSetWriter writer = tuple.getValue();
writer.write(record);
tracker.incrementRecordCount(1L);
}
session.adjustCounter("Records Received", records.size(), false);
} catch (final Exception e) {
logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);
try {
if (writer != null) {
writer.close();
}
} catch (final Exception ioe) {
logger.warn("Failed to close Record Writer", ioe);
}
try {
rollback(topicPartition);
} catch (final Exception rollbackException) {
@ -491,50 +515,21 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
} finally {
session.remove(tempFlowFile);
}
for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
FlowFile flowFile = tuple.getKey();
final RecordSetWriter writer = tuple.getValue();
final WriteResult writeResult;
try {
writeResult = writer.finishRecordSet();
writer.close();
} catch (final Exception e) {
logger.error("Failed to finish writing records to Content Repository", e);
try {
rollback(topicPartition);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
throw new ProcessException(e);
}
final int recordCount = writeResult.getRecordCount();
if (recordCount > 0) {
final Map<String, String> attributes = new HashMap<>();
attributes.putAll(writeResult.getAttributes());
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.put("record.count", String.valueOf(recordCount));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
flowFile = session.putAllAttributes(flowFile, attributes);
final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
session.adjustCounter("Records Received", recordCount, false);
session.transfer(flowFile, REL_SUCCESS);
} else {
session.remove(flowFile);
}
}
}
private void rollback(final TopicPartition topicPartition) {
OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
if (offsetAndMetadata == null) {
offsetAndMetadata = kafkaConsumer.committed(topicPartition);
}
final long offset = offsetAndMetadata.offset();
kafkaConsumer.seek(topicPartition, offset);
}
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
@ -544,7 +539,13 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
if (tracker.totalRecords > 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
// Add a record.count attribute to remain consistent with other record-oriented processors. If not
// reading/writing records, then use "kafka.count" attribute.
if (tracker.recordWriter == null) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
} else {
kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords));
}
}
final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
@ -559,13 +560,19 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final int partition;
final String topic;
final String key;
final RecordSetWriter recordWriter;
FlowFile flowFile;
long totalRecords = 0;
private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
this(initialRecord, topicPartition, keyEncoding, null);
}
private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
this.initialOffset = initialRecord.offset();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
this.recordWriter = recordWriter;
this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
}
@ -579,4 +586,33 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
private static class BundleInformation {
private final TopicPartition topicPartition;
private final RecordSchema schema;
public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema) {
this.topicPartition = topicPartition;
this.schema = schema;
}
@Override
public int hashCode() {
return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode());
}
@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof BundleInformation)) {
return false;
}
final BundleInformation other = (BundleInformation) obj;
return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema);
}
}
}