NIFI-2732 ensure session and consumer aligned and has registered rebalance listener. Make consumption far more memory and process efficient, fixed extraneous getbundled call

This closes #987.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
joewitt 2016-08-31 15:25:12 +10:00 committed by Bryan Bende
parent 088125451b
commit 7a451935a5
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
14 changed files with 1429 additions and 1889 deletions

View File

@ -25,13 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -39,13 +34,11 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -53,7 +46,8 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURITY_PROTOCOL;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.10 Consumer API. "
+ " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring"
@ -63,7 +57,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
@WritesAttributes({
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@ -75,18 +69,12 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
public class ConsumeKafka_0_10 extends AbstractProcessor {
private static final long FIVE_MB = 5L * 1024L * 1024L;
static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
.name("topic")
.displayName("Topic Name(s)")
@ -136,6 +124,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
+ "will result in a single FlowFile which "
+ "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
.build();
static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
.name("max.poll.records")
.displayName("Max Poll Records")
@ -145,6 +134,20 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder()
.name("max-uncommit-offset-wait")
.displayName("Max Uncommitted Time")
.description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+ "This value impacts how often offsets will be committed. Committing offsets less often increases "
+ "throughput but also increases the window of potential data duplication in the event of a rebalance "
+ "or JVM restart between commits. This value is also related to maximum poll records and the use "
+ "of a message demarcator. When using a message demarcator we can have far more uncommitted messages "
+ "than when we're not as there is much less for us to keep track of in memory.")
.required(false)
.defaultValue("1 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
@ -153,7 +156,6 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
static final List<PropertyDescriptor> DESCRIPTORS;
static final Set<Relationship> RELATIONSHIPS;
private volatile byte[] demarcatorBytes = null;
private volatile ConsumerPool consumerPool = null;
static {
@ -165,6 +167,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(MESSAGE_DEMARCATOR);
descriptors.add(MAX_POLL_RECORDS);
descriptors.add(MAX_UNCOMMITTED_TIME);
DESCRIPTORS = Collections.unmodifiableList(descriptors);
RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
}
@ -179,16 +182,8 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
return DESCRIPTORS;
}
@OnScheduled
public void prepareProcessing(final ProcessContext context) {
this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet()
? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
: null;
}
@OnStopped
public void close() {
demarcatorBytes = null;
final ConsumerPool pool = consumerPool;
consumerPool = null;
if (pool != null) {
@ -215,9 +210,21 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
return pool;
}
return consumerPool = createConsumerPool(context, getLogger());
}
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
final int maxLeases = context.getMaxConcurrentTasks();
final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final byte[] demarcator = context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).isSet()
? context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
: null;
final Map<String, String> props = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
final String topicListing = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
final String topicListing = context.getProperty(ConsumeKafka_0_10.TOPICS).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>();
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@ -225,212 +232,40 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
topics.add(trimmedName);
}
}
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
return consumerPool = createConsumerPool(context.getMaxConcurrentTasks(), topics, props, getLogger());
}
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return new ConsumerPool(maxLeases, topics, props, log);
return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final long startTimeNanos = System.nanoTime();
final ConsumerPool pool = getConsumerPool(context);
if (pool == null) {
context.yield();
return;
}
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap = new HashMap<>();
try (final ConsumerLease lease = pool.obtainConsumer()) {
try {
if (lease == null) {
context.yield();
return;
}
final boolean foundData = gatherDataFromKafka(lease, partitionRecordMap, context);
if (!foundData) {
session.rollback();
return;
}
writeSessionData(context, session, partitionRecordMap, startTimeNanos);
//At-least once commit handling (if order is reversed it is at-most once)
session.commit();
commitOffsets(lease, partitionRecordMap);
} catch (final KafkaException ke) {
lease.poison();
getLogger().error("Problem while accessing kafka consumer " + ke, ke);
try (final ConsumerLease lease = pool.obtainConsumer(session)) {
if (lease == null) {
context.yield();
session.rollback();
return;
}
}
}
private void commitOffsets(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap) {
final Map<TopicPartition, OffsetAndMetadata> partOffsetMap = new HashMap<>();
partitionRecordMap.entrySet().stream()
.filter(entry -> !entry.getValue().isEmpty())
.forEach((entry) -> {
long maxOffset = entry.getValue().stream()
.mapToLong(record -> record.offset())
.max()
.getAsLong();
partOffsetMap.put(entry.getKey(), new OffsetAndMetadata(maxOffset + 1L));
});
lease.commitOffsets(partOffsetMap);
}
private void writeSessionData(
final ProcessContext context, final ProcessSession session,
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap,
final long startTimeNanos) {
if (demarcatorBytes != null) {
partitionRecordMap.entrySet().stream()
.filter(entry -> !entry.getValue().isEmpty())
.forEach(entry -> {
writeData(context, session, entry.getValue(), startTimeNanos);
});
} else {
partitionRecordMap.entrySet().stream()
.filter(entry -> !entry.getValue().isEmpty())
.flatMap(entry -> entry.getValue().stream())
.forEach(record -> {
writeData(context, session, Collections.singletonList(record), startTimeNanos);
});
}
}
private String encodeKafkaKey(final byte[] key, final String encoding) {
if (key == null) {
return null;
}
if (HEX_ENCODING.getValue().equals(encoding)) {
return DatatypeConverter.printHexBinary(key);
} else if (UTF8_ENCODING.getValue().equals(encoding)) {
return new String(key, StandardCharsets.UTF_8);
} else {
return null; // won't happen because it is guaranteed by the Allowable Values
}
}
private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
final String offset = String.valueOf(firstRecord.offset());
final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
final String topic = firstRecord.topic();
final String partition = String.valueOf(firstRecord.partition());
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, out -> {
boolean useDemarcator = false;
for (final ConsumerRecord<byte[], byte[]> record : records) {
if (useDemarcator) {
out.write(demarcatorBytes);
try {
while (this.isScheduled() && lease.continuePolling()) {
lease.poll();
}
out.write(record.value());
useDemarcator = true;
}
});
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
if (keyValue != null && records.size() == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
}
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);
if (records.size() > 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(records.size()));
}
flowFile = session.putAllAttributes(flowFile, kafkaAttrs);
final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
final String transitUri = KafkaProcessorUtils.buildTransitURI(
context.getProperty(SECURITY_PROTOCOL).getValue(),
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(),
topic);
session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
this.getLogger().debug("Created {} containing {} messages from Kafka topic {}, partition {}, starting offset {} in {} millis",
new Object[]{flowFile, records.size(), topic, partition, offset, executionDurationMillis});
session.transfer(flowFile, REL_SUCCESS);
}
/**
* Populates the given partitionRecordMap with new records until we poll
* that returns no records or until we have enough data. It is important to
* ensure we keep items grouped by their topic and partition so that when we
* bundle them we bundle them intelligently and so that we can set offsets
* properly even across multiple poll calls.
*/
private boolean gatherDataFromKafka(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, ProcessContext context) {
final long startNanos = System.nanoTime();
boolean foundData = false;
ConsumerRecords<byte[], byte[]> records;
final int maxRecords = context.getProperty(MAX_POLL_RECORDS).asInteger();
do {
records = lease.poll();
for (final TopicPartition partition : records.partitions()) {
List<ConsumerRecord<byte[], byte[]>> currList = partitionRecordMap.get(partition);
if (currList == null) {
currList = new ArrayList<>();
partitionRecordMap.put(partition, currList);
}
currList.addAll(records.records(partition));
if (currList.size() > 0) {
foundData = true;
if (this.isScheduled() && !lease.commit()) {
context.yield();
}
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
new Object[]{lease, kex}, kex);
} catch (final Throwable t) {
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
new Object[]{lease, t}, t);
}
//If we received data and we still want to get more
} while (!records.isEmpty() && !checkIfGatheredEnoughData(partitionRecordMap, maxRecords, startNanos));
return foundData;
}
/**
* Determines if we have enough data as-is and should move on.
*
* @return true if we've been gathering for more than 500 ms or if we're
* demarcating and have more than 50 flowfiles worth or if we're per message
* and have more than 2000 flowfiles or if totalMessageSize is greater than
* two megabytes; false otherwise
*
* Implementation note: 500 millis and 5 MB are magic numbers. These may
* need to be tuned. They get at how often offsets will get committed to
* kafka relative to how many records will get buffered into memory in a
* poll call before writing to repos.
*/
private boolean checkIfGatheredEnoughData(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, final int maxRecords, final long startTimeNanos) {
final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
if (durationMillis > 500) {
return true;
}
int topicPartitionsFilled = 0;
int totalRecords = 0;
long totalRecordSize = 0;
for (final List<ConsumerRecord<byte[], byte[]>> recordList : partitionRecordMap.values()) {
if (!recordList.isEmpty()) {
topicPartitionsFilled++;
}
totalRecords += recordList.size();
for (final ConsumerRecord<byte[], byte[]> rec : recordList) {
totalRecordSize += rec.value().length;
}
}
if (demarcatorBytes != null && demarcatorBytes.length > 0) {
return topicPartitionsFilled > 50;
} else if (totalRecordSize > FIVE_MB) {
return true;
} else {
return totalRecords > maxRecords;
}
}
}

View File

@ -17,11 +17,27 @@
package org.apache.nifi.processors.kafka.pubsub;
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
@ -30,15 +46,108 @@ import org.apache.kafka.common.TopicPartition;
* the lease will be returned to the pool for future use by others. A given
* lease may only belong to a single thread a time.
*/
public interface ConsumerLease extends Closeable {
public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
private final long maxWaitMillis;
private final Consumer<byte[], byte[]> kafkaConsumer;
private final ComponentLog logger;
private final byte[] demarcatorBytes;
private final String keyEncoding;
private final String securityProtocol;
private final String bootstrapServers;
private boolean poisoned = false;
//used for tracking demarcated flowfiles to their TopicPartition so we can append
//to them on subsequent poll calls
private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
private long leaseStartNanos = -1;
private boolean lastPollEmpty = false;
private int totalFlowFiles = 0;
ConsumerLease(
final long maxWaitMillis,
final Consumer<byte[], byte[]> kafkaConsumer,
final byte[] demarcatorBytes,
final String keyEncoding,
final String securityProtocol,
final String bootstrapServers,
final ComponentLog logger) {
this.maxWaitMillis = maxWaitMillis;
this.kafkaConsumer = kafkaConsumer;
this.demarcatorBytes = demarcatorBytes;
this.keyEncoding = keyEncoding;
this.securityProtocol = securityProtocol;
this.bootstrapServers = bootstrapServers;
this.logger = logger;
}
/**
* Executes a poll on the underlying Kafka Consumer.
*
* @return ConsumerRecords retrieved in the poll.
* @throws KafkaException if issue occurs talking to underlying resource.
* clears out internal state elements excluding session and consumer as
* those are managed by the pool itself
*/
ConsumerRecords<byte[], byte[]> poll() throws KafkaException;
private void resetInternalState() {
bundleMap.clear();
uncommittedOffsetsMap.clear();
leaseStartNanos = -1;
lastPollEmpty = false;
totalFlowFiles = 0;
}
/**
* Kafka will call this method whenever it is about to rebalance the
* consumers for the given partitions. We'll simply take this to mean that
* we need to quickly commit what we've got and will return the consumer to
* the pool. This method will be called during the poll() method call of
* this class and will be called by the same thread calling poll according
* to the Kafka API docs. After this method executes the session and kafka
* offsets are committed and this lease is closed.
*
* @param partitions partitions being reassigned
*/
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
//force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns parittion
commit();
}
/**
* This will be called by Kafka when the rebalance has completed. We don't
* need to do anything with this information other than optionally log it as
* by this point we've committed what we've got and moved on.
*
* @param partitions topic partition set being reassigned
*/
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
}
/**
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
*/
void poll() {
/**
* Implementation note: If we take too long (30 secs?) between kafka
* poll calls and our own record processing to any subsequent poll calls
* or the commit we can run into a situation where the commit will
* succeed to the session but fail on committing offsets. This is
* apparently different than the Kafka scenario of electing to rebalance
* for other reasons but in this case is due a session timeout. It
* appears Kafka KIP-62 aims to offer more control over the meaning of
* various timeouts. If we do run into this case it could result in
* duplicates.
*/
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final Throwable t) {
this.poison();
throw t;
}
}
/**
* Notifies Kafka to commit the offsets for the specified topic/partition
@ -47,22 +156,244 @@ public interface ConsumerLease extends Closeable {
* kafka client to collect more data from Kafka before committing the
* offsets.
*
* @param offsets offsets
* @throws KafkaException if issue occurs talking to underlying resource.
* if false then we didn't do anything and should probably yield if true
* then we committed new data
*
*/
void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws KafkaException;
boolean commit() {
if (uncommittedOffsetsMap.isEmpty()) {
resetInternalState();
return false;
}
try {
/**
* Committing the nifi session then the offsets means we have an at
* least once guarantee here. If we reversed the order we'd have at
* most once.
*/
final Collection<FlowFile> bundledFlowFiles = getBundles();
if (!bundledFlowFiles.isEmpty()) {
getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
}
getProcessSession().commit();
kafkaConsumer.commitSync(uncommittedOffsetsMap);
resetInternalState();
return true;
} catch (final KafkaException kex) {
poison();
logger.warn("Duplicates are likely as we were able to commit the process"
+ " session but received an exception from Kafka while committing"
+ " offsets.");
throw kex;
} catch (final Throwable t) {
poison();
throw t;
}
}
/**
* Notifies that this lease is poisoned and should not be reused.
* Indicates whether we should continue polling for data. If we are not
* writing data with a demarcator then we're writing individual flow files
* per kafka message therefore we must be very mindful of memory usage for
* the flow file objects (not their content) being held in memory. The
* content of kafka messages will be written to the content repository
* immediately upon each poll call but we must still be mindful of how much
* memory can be used in each poll call. We will indicate that we should
* stop polling our last poll call produced no new results or if we've
* polling and processing data longer than the specified maximum polling
* time or if we have reached out specified max flow file limit or if a
* rebalance has been initiated for one of the partitions we're watching;
* otherwise true.
*
* @return true if should keep polling; false otherwise
*/
void poison();
boolean continuePolling() {
//stop if the last poll produced new no data
if (lastPollEmpty) {
return false;
}
//stop if we've gone past our desired max uncommitted wait time
if (leaseStartNanos < 0) {
leaseStartNanos = System.nanoTime();
}
final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
if (durationMillis > maxWaitMillis) {
return false;
}
//stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
return false;
} else {
return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property
}
}
/**
* Notifies that this lease is to be returned. The pool may optionally reuse
* this lease with another client. No further references by the caller
* should occur after calling close.
* Indicates that the underlying session and consumer should be immediately
* considered invalid. Once closed the session will be rolled back and the
* pool should destroy the underlying consumer. This is useful if due to
* external reasons, such as the processor no longer being scheduled, this
* lease should be terminated immediately.
*/
private void poison() {
poisoned = true;
}
/**
* @return true if this lease has been poisoned; false otherwise
*/
boolean isPoisoned() {
return poisoned;
}
/**
* Abstract method that is intended to be extended by the pool that created
* this ConsumerLease object. It should ensure that the session given to
* create this session is rolled back and that the underlying kafka consumer
* is either returned to the pool for continued use or destroyed if this
* lease has been poisoned. It can only be called once. Calling it more than
* once can result in undefined and non threadsafe behavior.
*/
@Override
void close();
public void close() {
resetInternalState();
}
public abstract ProcessSession getProcessSession();
private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
records.partitions().stream().forEach(partition -> {
List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
if (!messages.isEmpty()) {
//update maximum offset map for this topic partition
long maxOffset = messages.stream()
.mapToLong(record -> record.offset())
.max()
.getAsLong();
uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
//write records to content repository and session
if (demarcatorBytes == null) {
totalFlowFiles += messages.size();
messages.stream().forEach(message -> {
writeData(getProcessSession(), message, partition);
});
} else {
writeData(getProcessSession(), messages, partition);
}
}
});
}
private static String encodeKafkaKey(final byte[] key, final String encoding) {
if (key == null) {
return null;
}
if (HEX_ENCODING.getValue().equals(encoding)) {
return DatatypeConverter.printHexBinary(key);
} else if (UTF8_ENCODING.getValue().equals(encoding)) {
return new String(key, StandardCharsets.UTF_8);
} else {
return null; // won't happen because it is guaranteed by the Allowable Values
}
}
private Collection<FlowFile> getBundles() {
final List<FlowFile> flowFiles = new ArrayList<>();
for (final BundleTracker tracker : bundleMap.values()) {
populateAttributes(tracker);
flowFiles.add(tracker.flowFile);
}
return flowFiles;
}
private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
FlowFile flowFile = session.create();
final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
tracker.incrementRecordCount(1);
flowFile = session.write(flowFile, out -> {
out.write(record.value());
});
tracker.updateFlowFile(flowFile);
populateAttributes(tracker);
session.transfer(tracker.flowFile, REL_SUCCESS);
}
private void writeData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
final boolean demarcateFirstRecord;
BundleTracker tracker = bundleMap.get(topicPartition);
FlowFile flowFile;
if (tracker == null) {
tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding);
flowFile = session.create();
tracker.updateFlowFile(flowFile);
demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
} else {
demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
}
flowFile = tracker.flowFile;
tracker.incrementRecordCount(records.size());
flowFile = session.append(flowFile, out -> {
boolean useDemarcator = demarcateFirstRecord;
for (final ConsumerRecord<byte[], byte[]> record : records) {
if (useDemarcator) {
out.write(demarcatorBytes);
}
out.write(record.value());
useDemarcator = true;
}
});
tracker.updateFlowFile(flowFile);
bundleMap.put(topicPartition, tracker);
}
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
if (tracker.key != null && tracker.totalRecords == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
}
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
if (tracker.totalRecords > 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
}
final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
tracker.updateFlowFile(newFlowFile);
}
private static class BundleTracker {
final long initialOffset;
final int partition;
final String topic;
final String key;
FlowFile flowFile;
long totalRecords = 0;
private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
this.initialOffset = initialRecord.offset();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
}
private void incrementRecordCount(final long count) {
totalRecords += count;
}
private void updateFlowFile(final FlowFile flowFile) {
this.flowFile = flowFile;
}
}
}

View File

@ -21,18 +21,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.nifi.logging.ComponentLog;
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.processor.ProcessSession;
/**
* A pool of Kafka Consumers for a given topic. Consumers can be obtained by
@ -41,176 +38,118 @@ import org.apache.kafka.common.TopicPartition;
*/
public class ConsumerPool implements Closeable {
private final AtomicInteger activeLeaseCount = new AtomicInteger(0);
private final int maxLeases;
private final Queue<ConsumerLease> consumerLeases;
private final BlockingQueue<SimpleConsumerLease> pooledLeases;
private final List<String> topics;
private final Map<String, Object> kafkaProperties;
private final long maxWaitMillis;
private final ComponentLog logger;
private final byte[] demarcatorBytes;
private final String keyEncoding;
private final String securityProtocol;
private final String bootstrapServers;
private final AtomicLong consumerCreatedCountRef = new AtomicLong();
private final AtomicLong consumerClosedCountRef = new AtomicLong();
private final AtomicLong leasesObtainedCountRef = new AtomicLong();
private final AtomicLong productivePollCountRef = new AtomicLong();
private final AtomicLong unproductivePollCountRef = new AtomicLong();
/**
* Creates a pool of KafkaConsumer objects that will grow up to the maximum
* indicated leases. Consumers are lazily initialized.
* indicated threads from the given context. Consumers are lazily
* initialized. We may elect to not create up to the maximum number of
* configured consumers if the broker reported lag time for all topics is
* below a certain threshold.
*
* @param maxLeases maximum number of active leases in the pool
* @param topics the topics to consume from
* @param kafkaProperties the properties for each consumer
* @param maxConcurrentLeases max allowable consumers at once
* @param demarcator bytes to use as demarcator between messages; null or
* empty means no demarcator
* @param kafkaProperties properties to use to initialize kafka consumers
* @param topics the topics to subscribe to
* @param maxWaitMillis maximum time to wait for a given lease to acquire
* data before committing
* @param keyEncoding the encoding to use for the key of a kafka message if
* found
* @param securityProtocol the security protocol used
* @param bootstrapServers the bootstrap servers
* @param logger the logger to report any errors/warnings
*/
public ConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> kafkaProperties, final ComponentLog logger) {
this.maxLeases = maxLeases;
if (maxLeases <= 0) {
throw new IllegalArgumentException("Max leases value must be greather than zero.");
}
public ConsumerPool(
final int maxConcurrentLeases,
final byte[] demarcator,
final Map<String, String> kafkaProperties,
final List<String> topics,
final long maxWaitMillis,
final String keyEncoding,
final String securityProtocol,
final String bootstrapServers,
final ComponentLog logger) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
if (topics == null || topics.isEmpty()) {
throw new IllegalArgumentException("Must have a list of one or more topics");
}
this.topics = topics;
this.kafkaProperties = new HashMap<>(kafkaProperties);
this.consumerLeases = new ArrayDeque<>();
this.demarcatorBytes = demarcator;
this.keyEncoding = keyEncoding;
this.securityProtocol = securityProtocol;
this.bootstrapServers = bootstrapServers;
this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
this.topics = Collections.unmodifiableList(topics);
}
/**
* Obtains a consumer from the pool if one is available
* Obtains a consumer from the pool if one is available or lazily
* initializes a new one if deemed necessary.
*
* @return consumer from the pool
* @throws IllegalArgumentException if pool already contains
* @param session the session for which the consumer lease will be
* associated
* @return consumer to use or null if not available or necessary
*/
public ConsumerLease obtainConsumer() {
final ConsumerLease lease;
final int activeLeases;
synchronized (this) {
lease = consumerLeases.poll();
activeLeases = activeLeaseCount.get();
}
if (lease == null && activeLeases >= maxLeases) {
logger.warn("No available consumers and cannot create any as max consumer leases limit reached - verify pool settings");
return null;
public ConsumerLease obtainConsumer(final ProcessSession session) {
SimpleConsumerLease lease = pooledLeases.poll();
if (lease == null) {
final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
consumerCreatedCountRef.incrementAndGet();
/**
* For now return a new consumer lease. But we could later elect to
* have this return null if we determine the broker indicates that
* the lag time on all topics being monitored is sufficiently low.
* For now we should encourage conservative use of threads because
* having too many means we'll have at best useless threads sitting
* around doing frequent network calls and at worst having consumers
* sitting idle which could prompt excessive rebalances.
*/
lease = new SimpleConsumerLease(consumer);
/**
* This subscription tightly couples the lease to the given
* consumer. They cannot be separated from then on.
*/
consumer.subscribe(topics, lease);
}
lease.setProcessSession(session);
leasesObtainedCountRef.incrementAndGet();
return (lease == null) ? createConsumer() : lease;
}
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return new KafkaConsumer<>(kafkaProperties);
}
private ConsumerLease createConsumer() {
final Consumer<byte[], byte[]> kafkaConsumer = createKafkaConsumer();
consumerCreatedCountRef.incrementAndGet();
try {
kafkaConsumer.subscribe(topics);
} catch (final KafkaException kex) {
try {
kafkaConsumer.close();
consumerClosedCountRef.incrementAndGet();
} catch (final Exception ex) {
consumerClosedCountRef.incrementAndGet();
//ignore
}
throw kex;
}
final ConsumerLease lease = new ConsumerLease() {
private volatile boolean poisoned = false;
private volatile boolean closed = false;
@Override
public ConsumerRecords<byte[], byte[]> poll() {
if (poisoned) {
throw new KafkaException("The consumer is poisoned and should no longer be used");
}
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(50);
if (records.isEmpty()) {
unproductivePollCountRef.incrementAndGet();
} else {
productivePollCountRef.incrementAndGet();
}
return records;
} catch (final KafkaException kex) {
logger.warn("Unable to poll from Kafka consumer so will poison and close this " + kafkaConsumer, kex);
poison();
close();
throw kex;
}
}
@Override
public void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (poisoned) {
throw new KafkaException("The consumer is poisoned and should no longer be used");
}
try {
kafkaConsumer.commitSync(offsets);
} catch (final KafkaException kex) {
logger.warn("Unable to commit kafka consumer offsets so will poison and close this " + kafkaConsumer, kex);
poison();
close();
throw kex;
}
}
@Override
public void close() {
if (closed) {
return;
}
if (poisoned || activeLeaseCount.get() > maxLeases) {
closeConsumer(kafkaConsumer);
activeLeaseCount.decrementAndGet();
closed = true;
} else {
final boolean added;
synchronized (ConsumerPool.this) {
added = consumerLeases.offer(this);
}
if (!added) {
closeConsumer(kafkaConsumer);
activeLeaseCount.decrementAndGet();
}
}
}
@Override
public void poison() {
poisoned = true;
}
};
activeLeaseCount.incrementAndGet();
return lease;
}
/**
* Closes all consumers in the pool. Can be safely recalled.
* Exposed as protected method for easier unit testing
*
* @return consumer
* @throws KafkaException if unable to subscribe to the given topics
*/
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return new KafkaConsumer<>(kafkaProperties);
}
/**
* Closes all consumers in the pool. Can be safely called repeatedly.
*/
@Override
public void close() {
final List<ConsumerLease> leases = new ArrayList<>();
synchronized (this) {
ConsumerLease lease = null;
while ((lease = consumerLeases.poll()) != null) {
leases.add(lease);
}
}
for (final ConsumerLease lease : leases) {
lease.poison();
lease.close();
}
final List<SimpleConsumerLease> leases = new ArrayList<>();
pooledLeases.drainTo(leases);
leases.stream().forEach((lease) -> {
lease.close(true);
});
}
private void closeConsumer(final Consumer consumer) {
consumerClosedCountRef.incrementAndGet();
try {
consumer.unsubscribe();
} catch (Exception e) {
@ -219,15 +158,55 @@ public class ConsumerPool implements Closeable {
try {
consumer.close();
consumerClosedCountRef.incrementAndGet();
} catch (Exception e) {
consumerClosedCountRef.incrementAndGet();
logger.warn("Failed while closing " + consumer, e);
}
}
PoolStats getPoolStats() {
return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get(), productivePollCountRef.get(), unproductivePollCountRef.get());
return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get());
}
private class SimpleConsumerLease extends ConsumerLease {
private final Consumer<byte[], byte[]> consumer;
private volatile ProcessSession session;
private volatile boolean closedConsumer;
private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, logger);
this.consumer = consumer;
}
void setProcessSession(final ProcessSession session) {
this.session = session;
}
@Override
public ProcessSession getProcessSession() {
return session;
}
@Override
public void close() {
super.close();
close(false);
}
public void close(final boolean forceClose) {
if (closedConsumer) {
return;
}
super.close();
if (session != null) {
session.rollback();
setProcessSession(null);
}
if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
closedConsumer = true;
closeConsumer(consumer);
}
}
}
static final class PoolStats {
@ -235,30 +214,22 @@ public class ConsumerPool implements Closeable {
final long consumerCreatedCount;
final long consumerClosedCount;
final long leasesObtainedCount;
final long productivePollCount;
final long unproductivePollCount;
PoolStats(
final long consumerCreatedCount,
final long consumerClosedCount,
final long leasesObtainedCount,
final long productivePollCount,
final long unproductivePollCount
final long leasesObtainedCount
) {
this.consumerCreatedCount = consumerCreatedCount;
this.consumerClosedCount = consumerClosedCount;
this.leasesObtainedCount = leasesObtainedCount;
this.productivePollCount = productivePollCount;
this.unproductivePollCount = unproductivePollCount;
}
@Override
public String toString() {
return "Created Consumers [" + consumerCreatedCount + "]\n"
+ "Closed Consumers [" + consumerClosedCount + "]\n"
+ "Leases Obtained [" + leasesObtainedCount + "]\n"
+ "Productive Polls [" + productivePollCount + "]\n"
+ "Unproductive Polls [" + unproductivePollCount + "]\n";
+ "Leases Obtained [" + leasesObtainedCount + "]\n";
}
}

View File

@ -55,6 +55,10 @@ final class KafkaProcessorUtils {
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
static final String KAFKA_KEY = "kafka.key";

View File

@ -55,6 +55,8 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -104,10 +106,6 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"DefaultPartitioner", "Messages will be assigned to random partitions.");
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("topic")
.displayName("Topic Name")

View File

@ -16,104 +16,36 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Before;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class ConsumeKafkaTest {
static class MockConsumerPool extends ConsumerPool {
final int actualMaxLeases;
final List<String> actualTopics;
final Map<String, String> actualKafkaProperties;
boolean throwKafkaExceptionOnPoll = false;
boolean throwKafkaExceptionOnCommit = false;
Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new ArrayDeque<>();
Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = null;
Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null;
boolean wasConsumerLeasePoisoned = false;
boolean wasConsumerLeaseClosed = false;
boolean wasPoolClosed = false;
public MockConsumerPool(int maxLeases, List<String> topics, Map<String, String> kafkaProperties, ComponentLog logger) {
super(maxLeases, topics, kafkaProperties, null);
actualMaxLeases = maxLeases;
actualTopics = topics;
actualKafkaProperties = kafkaProperties;
}
@Override
public ConsumerLease obtainConsumer() {
return new ConsumerLease() {
@Override
public ConsumerRecords<byte[], byte[]> poll() {
if (throwKafkaExceptionOnPoll) {
throw new KafkaException("i planned to fail");
}
final ConsumerRecords<byte[], byte[]> records = nextPlannedRecordsQueue.poll();
return (records == null) ? ConsumerRecords.empty() : records;
}
@Override
public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
if (throwKafkaExceptionOnCommit) {
throw new KafkaException("i planned to fail");
}
actualCommitOffsets = offsets;
}
@Override
public void poison() {
wasConsumerLeasePoisoned = true;
}
@Override
public void close() {
wasConsumerLeaseClosed = true;
}
};
}
@Override
public void close() {
wasPoolClosed = true;
}
void resetState() {
throwKafkaExceptionOnPoll = false;
throwKafkaExceptionOnCommit = false;
nextPlannedRecordsQueue = null;
nextExpectedCommitOffsets = null;
wasConsumerLeasePoisoned = false;
wasConsumerLeaseClosed = false;
wasPoolClosed = false;
}
Consumer<byte[], byte[]> mockConsumer = null;
ConsumerLease mockLease = null;
ConsumerPool mockConsumerPool = null;
@Before
public void setup() {
mockConsumer = mock(Consumer.class);
mockLease = mock(ConsumerLease.class);
mockConsumerPool = mock(ConsumerPool.class);
}
@Test
@ -174,31 +106,14 @@ public class ConsumeKafkaTest {
public void validateGetAllMessages() throws Exception {
String groupName = "validateGetAllMessages";
final byte[][] firstPassValues = new byte[][]{
"Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8),
"Hello-3".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
final byte[][] secondPassValues = new byte[][]{
"Hello-4".getBytes(StandardCharsets.UTF_8),
"Hello-5".getBytes(StandardCharsets.UTF_8),
"Hello-6".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
mockPool.nextPlannedRecordsQueue.add(firstRecs);
mockPool.nextPlannedRecordsQueue.add(secondRecs);
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
@ -207,69 +122,29 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
assertEquals(expectedTopics, mockPool.actualTopics);
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
if (mockPool.nextPlannedRecordsQueue.isEmpty()) {
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count());
assertEquals(2, mockPool.actualCommitOffsets.size());
assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
} else {
assertEquals(2, mockPool.actualCommitOffsets.size());
assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
}
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}
@Test
public void validateGetLotsOfMessages() throws Exception {
String groupName = "validateGetLotsOfMessages";
public void validateGetErrorMessages() throws Exception {
String groupName = "validateGetErrorMessages";
final byte[][] firstPassValues = new byte[10010][1];
for (final byte[] value : firstPassValues) {
value[0] = 0x12;
}
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
final byte[][] secondPassValues = new byte[][]{
"Hello-4".getBytes(StandardCharsets.UTF_8),
"Hello-5".getBytes(StandardCharsets.UTF_8),
"Hello-6".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
mockPool.nextPlannedRecordsQueue.add(firstRecs);
mockPool.nextPlannedRecordsQueue.add(secondRecs);
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(true, false);
when(mockLease.commit()).thenReturn(Boolean.FALSE);
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
@ -278,352 +153,15 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
assertEquals(10010, flowFiles.stream().map(ff -> ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 0x12).count());
assertEquals(1, mockPool.nextPlannedRecordsQueue.size());
assertEquals(1, mockPool.actualCommitOffsets.size());
assertEquals(10011L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
verify(mockLease, times(2)).continuePolling();
verify(mockLease, times(1)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}
@SuppressWarnings({"rawtypes", "unchecked"})
private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
final TopicPartition tPart = new TopicPartition(topic, partition);
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
long offset = startingOffset;
for (final byte[] rawRecord : rawRecords) {
final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
records.add(rec);
}
map.put(tPart, records);
return new ConsumerRecords(map);
}
@SuppressWarnings({"rawtypes", "unchecked"})
private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
final TopicPartition tPart = new TopicPartition(topic, partition);
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
long offset = startingOffset;
for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
final byte[] key = entry.getKey();
final byte[] rawRecord = entry.getValue();
final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
records.add(rec);
}
map.put(tPart, records);
return new ConsumerRecords(map);
}
private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
for (final ConsumerRecords<byte[], byte[]> rec : records) {
rec.partitions().stream().forEach((part) -> {
final List<ConsumerRecord<byte[], byte[]>> conRecs = rec.records(part);
if (map.get(part) != null) {
throw new IllegalStateException("already have that topic/partition in the record map");
}
map.put(part, conRecs);
});
}
return new ConsumerRecords<>(map);
}
@Test
public void validateGetAllMessagesWithProvidedDemarcator() throws Exception {
String groupName = "validateGetAllMessagesWithProvidedDemarcator";
final byte[][] firstPassValues = new byte[][]{
"Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8),
"Hello-3".getBytes(StandardCharsets.UTF_8)
};
final byte[][] secondPassValues = new byte[][]{
"Hello-4".getBytes(StandardCharsets.UTF_8),
"Hello-5".getBytes(StandardCharsets.UTF_8),
"Hello-6".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
createConsumerRecords("foo", 1, 1L, firstPassValues),
createConsumerRecords("bar", 1, 1L, secondPassValues)
);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
mockPool.nextPlannedRecordsQueue.add(consumerRecs);
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
assertEquals(2, flowFiles.size());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4blahHello-5blahHello-6")).count());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
assertEquals(2, mockPool.actualCommitOffsets.size());
assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
}
@Test
public void validatePollException() throws Exception {
String groupName = "validatePollException";
final byte[][] firstPassValues = new byte[][]{
"Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8),
"Hello-3".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
createConsumerRecords("foo", 1, 1L, firstPassValues)
);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
mockPool.nextPlannedRecordsQueue.add(consumerRecs);
mockPool.throwKafkaExceptionOnPoll = true;
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
runner.run(1, true);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
assertEquals(0, flowFiles.size());
assertNull(null, mockPool.actualCommitOffsets);
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertTrue(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
}
@Test
public void validateCommitOffsetException() throws Exception {
String groupName = "validateCommitOffsetException";
final byte[][] firstPassValues = new byte[][]{
"Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8),
"Hello-3".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
createConsumerRecords("foo", 1, 1L, firstPassValues)
);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
mockPool.nextPlannedRecordsQueue.add(consumerRecs);
mockPool.throwKafkaExceptionOnCommit = true;
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
runner.run(1, true);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
assertEquals(1, flowFiles.size());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertTrue(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
assertNull(null, mockPool.actualCommitOffsets);
}
@Test
public void validateUtf8Key() {
String groupName = "validateGetAllMessages";
final Map<byte[], byte[]> rawRecords = new HashMap<>();
rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
rawRecords.put(new byte[0], "Hello-2".getBytes());
rawRecords.put(null, "Hello-3".getBytes());
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
mockPool.nextPlannedRecordsQueue.add(firstRecs);
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
assertEquals(expectedTopics, mockPool.actualTopics);
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
}
@Test
public void validateHexKey() {
String groupName = "validateGetAllMessages";
final Map<byte[], byte[]> rawRecords = new HashMap<>();
rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
rawRecords.put(new byte[0], "Hello-2".getBytes());
rawRecords.put(null, "Hello-3".getBytes());
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
mockPool.nextPlannedRecordsQueue.add(firstRecs);
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka_0_10.KEY_ATTRIBUTE_ENCODING, ConsumeKafka_0_10.HEX_ENCODING);
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
assertEquals(expectedTopics, mockPool.actualTopics);
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
}
}

View File

@ -16,109 +16,203 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsumerPoolTest {
Consumer<byte[], byte[]> consumer = null;
ProcessSession mockSession = null;
ProvenanceReporter mockReporter = null;
ConsumerPool testPool = null;
ConsumerPool testDemarcatedPool = null;
ComponentLog logger = null;
@Before
public void setup() {
consumer = mock(Consumer.class);
logger = mock(ComponentLog.class);
mockSession = mock(ProcessSession.class);
mockReporter = mock(ProvenanceReporter.class);
when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
testPool = new ConsumerPool(
1,
null,
Collections.emptyMap(),
Collections.singletonList("nifi"),
100L,
"utf-8",
"ssl",
"localhost",
logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
testDemarcatedPool = new ConsumerPool(
1,
"--demarcator--".getBytes(StandardCharsets.UTF_8),
Collections.emptyMap(),
Collections.singletonList("nifi"),
100L,
"utf-8",
"ssl",
"localhost",
logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
}
@Test
public void validatePoolSimpleCreateClose() throws Exception {
final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
try (final ConsumerLease lease = testPool.obtainConsumer()) {
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
}
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
}
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
}
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
lease.commitOffsets(Collections.emptyMap());
}
testPool.close();
verify(mockSession, times(0)).create();
verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(4, stats.leasesObtainedCount);
}
@Test
public void validatePoolSimpleCreatePollClose() throws Exception {
final byte[][] firstPassValues = new byte[][]{
"Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8),
"Hello-3".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
lease.commit();
}
testPool.close();
verify(mockSession, times(3)).create();
verify(mockSession, times(1)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
assertEquals(1, stats.unproductivePollCount);
assertEquals(0, stats.productivePollCount);
}
@Test
public void validatePoolSimpleBatchCreateClose() throws Exception {
final ConsumerPool testPool = new ConsumerPool(5, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
try (final ConsumerLease lease = testPool.obtainConsumer()) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
for (int j = 0; j < 100; j++) {
lease.poll();
}
lease.commitOffsets(Collections.emptyMap());
}
}
testPool.close();
verify(mockSession, times(0)).create();
verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(100, stats.leasesObtainedCount);
assertEquals(10000, stats.unproductivePollCount);
assertEquals(0, stats.productivePollCount);
}
@Test
public void validatePoolBatchCreatePollClose() throws Exception {
final byte[][] firstPassValues = new byte[][]{
"Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8),
"Hello-3".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) {
lease.poll();
lease.commit();
}
testDemarcatedPool.close();
verify(mockSession, times(1)).create();
verify(mockSession, times(1)).commit();
final PoolStats stats = testDemarcatedPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
}
@Test
public void validatePoolConsumerFails() throws Exception {
final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
try {
lease.poll();
fail();
} catch (final KafkaException ke) {
}
};
when(consumer.poll(anyInt())).thenThrow(new KafkaException());
try (final ConsumerLease lease = testPool.obtainConsumer()) {
lease.poll();
fail();
} catch (final KafkaException ke) {
}
testPool.close();
verify(mockSession, times(0)).create();
verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
assertEquals(0, stats.unproductivePollCount);
assertEquals(0, stats.productivePollCount);
}
@SuppressWarnings({"rawtypes", "unchecked"})
static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
final TopicPartition tPart = new TopicPartition(topic, partition);
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
long offset = startingOffset;
for (final byte[] rawRecord : rawRecords) {
final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
records.add(rec);
}
map.put(tPart, records);
return new ConsumerRecords(map);
}
}

View File

@ -358,7 +358,7 @@ public class PublishKafkaTest {
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, PublishKafka_0_10.HEX_ENCODING);
runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, KafkaProcessorUtils.HEX_ENCODING);
runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");

View File

@ -25,13 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -39,13 +34,11 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -53,17 +46,18 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURITY_PROTOCOL;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.9 Consumer API. "
+ " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring"
+ " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the mean time"
+ " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.")
@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"})
@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9"})
@WritesAttributes({
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@ -75,18 +69,12 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
public class ConsumeKafka extends AbstractProcessor {
private static final long TWO_MB = 2L * 1024L * 1024L;
static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
.name("topic")
.displayName("Topic Name(s)")
@ -136,6 +124,7 @@ public class ConsumeKafka extends AbstractProcessor {
+ "will result in a single FlowFile which "
+ "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
.build();
static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
.name("max.poll.records")
.displayName("Max Poll Records")
@ -145,6 +134,20 @@ public class ConsumeKafka extends AbstractProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder()
.name("max-uncommit-offset-wait")
.displayName("Max Uncommitted Time")
.description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+ "This value impacts how often offsets will be committed. Committing offsets less often increases "
+ "throughput but also increases the window of potential data duplication in the event of a rebalance "
+ "or JVM restart between commits. This value is also related to maximum poll records and the use "
+ "of a message demarcator. When using a message demarcator we can have far more uncommitted messages "
+ "than when we're not as there is much less for us to keep track of in memory.")
.required(false)
.defaultValue("1 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
@ -153,7 +156,6 @@ public class ConsumeKafka extends AbstractProcessor {
static final List<PropertyDescriptor> DESCRIPTORS;
static final Set<Relationship> RELATIONSHIPS;
private volatile byte[] demarcatorBytes = null;
private volatile ConsumerPool consumerPool = null;
static {
@ -165,6 +167,7 @@ public class ConsumeKafka extends AbstractProcessor {
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(MESSAGE_DEMARCATOR);
descriptors.add(MAX_POLL_RECORDS);
descriptors.add(MAX_UNCOMMITTED_TIME);
DESCRIPTORS = Collections.unmodifiableList(descriptors);
RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
}
@ -179,16 +182,8 @@ public class ConsumeKafka extends AbstractProcessor {
return DESCRIPTORS;
}
@OnScheduled
public void prepareProcessing(final ProcessContext context) {
this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet()
? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
: null;
}
@OnStopped
public void close() {
demarcatorBytes = null;
final ConsumerPool pool = consumerPool;
consumerPool = null;
if (pool != null) {
@ -215,9 +210,21 @@ public class ConsumeKafka extends AbstractProcessor {
return pool;
}
return consumerPool = createConsumerPool(context, getLogger());
}
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
final int maxLeases = context.getMaxConcurrentTasks();
final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final byte[] demarcator = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).isSet()
? context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
: null;
final Map<String, String> props = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
final String topicListing = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
final String topicListing = context.getProperty(ConsumeKafka.TOPICS).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>();
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@ -225,213 +232,40 @@ public class ConsumeKafka extends AbstractProcessor {
topics.add(trimmedName);
}
}
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
return consumerPool = createConsumerPool(context.getMaxConcurrentTasks(), topics, props, getLogger());
}
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return new ConsumerPool(maxLeases, topics, props, log);
return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final long startTimeNanos = System.nanoTime();
final ConsumerPool pool = getConsumerPool(context);
if (pool == null) {
context.yield();
return;
}
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap = new HashMap<>();
try (final ConsumerLease lease = pool.obtainConsumer()) {
try {
if (lease == null) {
context.yield();
return;
}
final boolean foundData = gatherDataFromKafka(lease, partitionRecordMap, context);
if (!foundData) {
session.rollback();
return;
}
writeSessionData(context, session, partitionRecordMap, startTimeNanos);
//At-least once commit handling (if order is reversed it is at-most once)
session.commit();
commitOffsets(lease, partitionRecordMap);
} catch (final KafkaException ke) {
lease.poison();
getLogger().error("Problem while accessing kafka consumer " + ke, ke);
try (final ConsumerLease lease = pool.obtainConsumer(session)) {
if (lease == null) {
context.yield();
session.rollback();
return;
}
}
}
private void commitOffsets(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap) {
final Map<TopicPartition, OffsetAndMetadata> partOffsetMap = new HashMap<>();
partitionRecordMap.entrySet().stream()
.filter(entry -> !entry.getValue().isEmpty())
.forEach((entry) -> {
long maxOffset = entry.getValue().stream()
.mapToLong(record -> record.offset())
.max()
.getAsLong();
partOffsetMap.put(entry.getKey(), new OffsetAndMetadata(maxOffset + 1L));
});
lease.commitOffsets(partOffsetMap);
}
private void writeSessionData(
final ProcessContext context, final ProcessSession session,
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap,
final long startTimeNanos) {
if (demarcatorBytes != null) {
partitionRecordMap.entrySet().stream()
.filter(entry -> !entry.getValue().isEmpty())
.forEach(entry -> {
writeData(context, session, entry.getValue(), startTimeNanos);
});
} else {
partitionRecordMap.entrySet().stream()
.filter(entry -> !entry.getValue().isEmpty())
.flatMap(entry -> entry.getValue().stream())
.forEach(record -> {
writeData(context, session, Collections.singletonList(record), startTimeNanos);
});
}
}
private String encodeKafkaKey(final byte[] key, final String encoding) {
if (key == null) {
return null;
}
if (HEX_ENCODING.getValue().equals(encoding)) {
return DatatypeConverter.printHexBinary(key);
} else if (UTF8_ENCODING.getValue().equals(encoding)) {
return new String(key, StandardCharsets.UTF_8);
} else {
return null; // won't happen because it is guaranteed by the Allowable Values
}
}
private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
final String offset = String.valueOf(firstRecord.offset());
final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
final String topic = firstRecord.topic();
final String partition = String.valueOf(firstRecord.partition());
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, out -> {
boolean useDemarcator = false;
for (final ConsumerRecord<byte[], byte[]> record : records) {
if (useDemarcator) {
out.write(demarcatorBytes);
try {
while (this.isScheduled() && lease.continuePolling()) {
lease.poll();
}
out.write(record.value());
useDemarcator = true;
}
});
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
if (keyValue != null && records.size() == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
}
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);
if (records.size() > 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(records.size()));
}
flowFile = session.putAllAttributes(flowFile, kafkaAttrs);
final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
final String transitUri = KafkaProcessorUtils.buildTransitURI(
context.getProperty(SECURITY_PROTOCOL).getValue(),
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(),
topic);
session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
this.getLogger().debug("Created {} containing {} messages from Kafka topic {}, partition {}, starting offset {} in {} millis",
new Object[]{flowFile, records.size(), topic, partition, offset, executionDurationMillis});
session.transfer(flowFile, REL_SUCCESS);
}
/**
* Populates the given partitionRecordMap with new records until we poll
* that returns no records or until we have enough data. It is important to
* ensure we keep items grouped by their topic and partition so that when we
* bundle them we bundle them intelligently and so that we can set offsets
* properly even across multiple poll calls.
*/
private boolean gatherDataFromKafka(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, ProcessContext context) {
final long startNanos = System.nanoTime();
boolean foundData = false;
ConsumerRecords<byte[], byte[]> records;
final int maxRecords = context.getProperty(MAX_POLL_RECORDS).asInteger();
do {
records = lease.poll();
for (final TopicPartition partition : records.partitions()) {
List<ConsumerRecord<byte[], byte[]>> currList = partitionRecordMap.get(partition);
if (currList == null) {
currList = new ArrayList<>();
partitionRecordMap.put(partition, currList);
}
currList.addAll(records.records(partition));
if (currList.size() > 0) {
foundData = true;
if (this.isScheduled() && !lease.commit()) {
context.yield();
}
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
new Object[]{lease, kex}, kex);
} catch (final Throwable t) {
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
new Object[]{lease, t}, t);
}
//If we received data and we still want to get more
} while (!records.isEmpty() && !checkIfGatheredEnoughData(partitionRecordMap, maxRecords, startNanos));
return foundData;
}
/**
* Determines if we have enough data as-is and should move on.
*
* @return true if we've been gathering for more than 500 ms or if we're
* demarcating and have more than 50 flowfiles worth or if we're per message
* and have more than 2000 flowfiles or if totalMessageSize is greater than
* two megabytes; false otherwise
*
* Implementation note: 500 millis and 5 MB are magic numbers. These may
* need to be tuned. They get at how often offsets will get committed to
* kafka relative to how many records will get buffered into memory in a
* poll call before writing to repos.
*/
private boolean checkIfGatheredEnoughData(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, final long maxRecords, final long startTimeNanos) {
final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
if (durationMillis > 500) {
return true;
}
int topicPartitionsFilled = 0;
int totalRecords = 0;
long totalRecordSize = 0;
for (final List<ConsumerRecord<byte[], byte[]>> recordList : partitionRecordMap.values()) {
if (!recordList.isEmpty()) {
topicPartitionsFilled++;
}
totalRecords += recordList.size();
for (final ConsumerRecord<byte[], byte[]> rec : recordList) {
totalRecordSize += rec.value().length;
}
}
if (demarcatorBytes != null && demarcatorBytes.length > 0) {
return topicPartitionsFilled > 50;
} else if (totalRecordSize > TWO_MB) {
return true;
} else {
return totalRecords > maxRecords;
}
}
}

View File

@ -17,11 +17,27 @@
package org.apache.nifi.processors.kafka.pubsub;
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafka.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
@ -30,15 +46,108 @@ import org.apache.kafka.common.TopicPartition;
* the lease will be returned to the pool for future use by others. A given
* lease may only belong to a single thread a time.
*/
public interface ConsumerLease extends Closeable {
public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
private final long maxWaitMillis;
private final Consumer<byte[], byte[]> kafkaConsumer;
private final ComponentLog logger;
private final byte[] demarcatorBytes;
private final String keyEncoding;
private final String securityProtocol;
private final String bootstrapServers;
private boolean poisoned = false;
//used for tracking demarcated flowfiles to their TopicPartition so we can append
//to them on subsequent poll calls
private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
private long leaseStartNanos = -1;
private boolean lastPollEmpty = false;
private int totalFlowFiles = 0;
ConsumerLease(
final long maxWaitMillis,
final Consumer<byte[], byte[]> kafkaConsumer,
final byte[] demarcatorBytes,
final String keyEncoding,
final String securityProtocol,
final String bootstrapServers,
final ComponentLog logger) {
this.maxWaitMillis = maxWaitMillis;
this.kafkaConsumer = kafkaConsumer;
this.demarcatorBytes = demarcatorBytes;
this.keyEncoding = keyEncoding;
this.securityProtocol = securityProtocol;
this.bootstrapServers = bootstrapServers;
this.logger = logger;
}
/**
* Executes a poll on the underlying Kafka Consumer.
*
* @return ConsumerRecords retrieved in the poll.
* @throws KafkaException if issue occurs talking to underlying resource.
* clears out internal state elements excluding session and consumer as
* those are managed by the pool itself
*/
ConsumerRecords<byte[], byte[]> poll() throws KafkaException;
private void resetInternalState() {
bundleMap.clear();
uncommittedOffsetsMap.clear();
leaseStartNanos = -1;
lastPollEmpty = false;
totalFlowFiles = 0;
}
/**
* Kafka will call this method whenever it is about to rebalance the
* consumers for the given partitions. We'll simply take this to mean that
* we need to quickly commit what we've got and will return the consumer to
* the pool. This method will be called during the poll() method call of
* this class and will be called by the same thread calling poll according
* to the Kafka API docs. After this method executes the session and kafka
* offsets are committed and this lease is closed.
*
* @param partitions partitions being reassigned
*/
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
//force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns parittion
commit();
}
/**
* This will be called by Kafka when the rebalance has completed. We don't
* need to do anything with this information other than optionally log it as
* by this point we've committed what we've got and moved on.
*
* @param partitions topic partition set being reassigned
*/
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
}
/**
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
*/
void poll() {
/**
* Implementation note: If we take too long (30 secs?) between kafka
* poll calls and our own record processing to any subsequent poll calls
* or the commit we can run into a situation where the commit will
* succeed to the session but fail on committing offsets. This is
* apparently different than the Kafka scenario of electing to rebalance
* for other reasons but in this case is due a session timeout. It
* appears Kafka KIP-62 aims to offer more control over the meaning of
* various timeouts. If we do run into this case it could result in
* duplicates.
*/
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final Throwable t) {
this.poison();
throw t;
}
}
/**
* Notifies Kafka to commit the offsets for the specified topic/partition
@ -47,22 +156,244 @@ public interface ConsumerLease extends Closeable {
* kafka client to collect more data from Kafka before committing the
* offsets.
*
* @param offsets offsets
* @throws KafkaException if issue occurs talking to underlying resource.
* if false then we didn't do anything and should probably yield if true
* then we committed new data
*
*/
void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws KafkaException;
boolean commit() {
if (uncommittedOffsetsMap.isEmpty()) {
resetInternalState();
return false;
}
try {
/**
* Committing the nifi session then the offsets means we have an at
* least once guarantee here. If we reversed the order we'd have at
* most once.
*/
final Collection<FlowFile> bundledFlowFiles = getBundles();
if (!bundledFlowFiles.isEmpty()) {
getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
}
getProcessSession().commit();
kafkaConsumer.commitSync(uncommittedOffsetsMap);
resetInternalState();
return true;
} catch (final KafkaException kex) {
poison();
logger.warn("Duplicates are likely as we were able to commit the process"
+ " session but received an exception from Kafka while committing"
+ " offsets.");
throw kex;
} catch (final Throwable t) {
poison();
throw t;
}
}
/**
* Notifies that this lease is poisoned and should not be reused.
* Indicates whether we should continue polling for data. If we are not
* writing data with a demarcator then we're writing individual flow files
* per kafka message therefore we must be very mindful of memory usage for
* the flow file objects (not their content) being held in memory. The
* content of kafka messages will be written to the content repository
* immediately upon each poll call but we must still be mindful of how much
* memory can be used in each poll call. We will indicate that we should
* stop polling our last poll call produced no new results or if we've
* polling and processing data longer than the specified maximum polling
* time or if we have reached out specified max flow file limit or if a
* rebalance has been initiated for one of the partitions we're watching;
* otherwise true.
*
* @return true if should keep polling; false otherwise
*/
void poison();
boolean continuePolling() {
//stop if the last poll produced new no data
if (lastPollEmpty) {
return false;
}
//stop if we've gone past our desired max uncommitted wait time
if (leaseStartNanos < 0) {
leaseStartNanos = System.nanoTime();
}
final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
if (durationMillis > maxWaitMillis) {
return false;
}
//stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
return false;
} else {
return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property
}
}
/**
* Notifies that this lease is to be returned. The pool may optionally reuse
* this lease with another client. No further references by the caller
* should occur after calling close.
* Indicates that the underlying session and consumer should be immediately
* considered invalid. Once closed the session will be rolled back and the
* pool should destroy the underlying consumer. This is useful if due to
* external reasons, such as the processor no longer being scheduled, this
* lease should be terminated immediately.
*/
private void poison() {
poisoned = true;
}
/**
* @return true if this lease has been poisoned; false otherwise
*/
boolean isPoisoned() {
return poisoned;
}
/**
* Abstract method that is intended to be extended by the pool that created
* this ConsumerLease object. It should ensure that the session given to
* create this session is rolled back and that the underlying kafka consumer
* is either returned to the pool for continued use or destroyed if this
* lease has been poisoned. It can only be called once. Calling it more than
* once can result in undefined and non threadsafe behavior.
*/
@Override
void close();
public void close() {
resetInternalState();
}
public abstract ProcessSession getProcessSession();
private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
records.partitions().stream().forEach(partition -> {
List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
if (!messages.isEmpty()) {
//update maximum offset map for this topic partition
long maxOffset = messages.stream()
.mapToLong(record -> record.offset())
.max()
.getAsLong();
uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
//write records to content repository and session
if (demarcatorBytes == null) {
totalFlowFiles += messages.size();
messages.stream().forEach(message -> {
writeData(getProcessSession(), message, partition);
});
} else {
writeData(getProcessSession(), messages, partition);
}
}
});
}
private static String encodeKafkaKey(final byte[] key, final String encoding) {
if (key == null) {
return null;
}
if (HEX_ENCODING.getValue().equals(encoding)) {
return DatatypeConverter.printHexBinary(key);
} else if (UTF8_ENCODING.getValue().equals(encoding)) {
return new String(key, StandardCharsets.UTF_8);
} else {
return null; // won't happen because it is guaranteed by the Allowable Values
}
}
private Collection<FlowFile> getBundles() {
final List<FlowFile> flowFiles = new ArrayList<>();
for (final BundleTracker tracker : bundleMap.values()) {
populateAttributes(tracker);
flowFiles.add(tracker.flowFile);
}
return flowFiles;
}
private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
FlowFile flowFile = session.create();
final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
tracker.incrementRecordCount(1);
flowFile = session.write(flowFile, out -> {
out.write(record.value());
});
tracker.updateFlowFile(flowFile);
populateAttributes(tracker);
session.transfer(tracker.flowFile, REL_SUCCESS);
}
private void writeData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
final boolean demarcateFirstRecord;
BundleTracker tracker = bundleMap.get(topicPartition);
FlowFile flowFile;
if (tracker == null) {
tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding);
flowFile = session.create();
tracker.updateFlowFile(flowFile);
demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
} else {
demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
}
flowFile = tracker.flowFile;
tracker.incrementRecordCount(records.size());
flowFile = session.append(flowFile, out -> {
boolean useDemarcator = demarcateFirstRecord;
for (final ConsumerRecord<byte[], byte[]> record : records) {
if (useDemarcator) {
out.write(demarcatorBytes);
}
out.write(record.value());
useDemarcator = true;
}
});
tracker.updateFlowFile(flowFile);
bundleMap.put(topicPartition, tracker);
}
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
if (tracker.key != null && tracker.totalRecords == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
}
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
if (tracker.totalRecords > 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
}
final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
tracker.updateFlowFile(newFlowFile);
}
private static class BundleTracker {
final long initialOffset;
final int partition;
final String topic;
final String key;
FlowFile flowFile;
long totalRecords = 0;
private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
this.initialOffset = initialRecord.offset();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
}
private void incrementRecordCount(final long count) {
totalRecords += count;
}
private void updateFlowFile(final FlowFile flowFile) {
this.flowFile = flowFile;
}
}
}

View File

@ -21,18 +21,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.nifi.logging.ComponentLog;
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.processor.ProcessSession;
/**
* A pool of Kafka Consumers for a given topic. Consumers can be obtained by
@ -41,176 +38,118 @@ import org.apache.kafka.common.TopicPartition;
*/
public class ConsumerPool implements Closeable {
private final AtomicInteger activeLeaseCount = new AtomicInteger(0);
private final int maxLeases;
private final Queue<ConsumerLease> consumerLeases;
private final BlockingQueue<SimpleConsumerLease> pooledLeases;
private final List<String> topics;
private final Map<String, Object> kafkaProperties;
private final long maxWaitMillis;
private final ComponentLog logger;
private final byte[] demarcatorBytes;
private final String keyEncoding;
private final String securityProtocol;
private final String bootstrapServers;
private final AtomicLong consumerCreatedCountRef = new AtomicLong();
private final AtomicLong consumerClosedCountRef = new AtomicLong();
private final AtomicLong leasesObtainedCountRef = new AtomicLong();
private final AtomicLong productivePollCountRef = new AtomicLong();
private final AtomicLong unproductivePollCountRef = new AtomicLong();
/**
* Creates a pool of KafkaConsumer objects that will grow up to the maximum
* indicated leases. Consumers are lazily initialized.
* indicated threads from the given context. Consumers are lazily
* initialized. We may elect to not create up to the maximum number of
* configured consumers if the broker reported lag time for all topics is
* below a certain threshold.
*
* @param maxLeases maximum number of active leases in the pool
* @param topics the topics to consume from
* @param kafkaProperties the properties for each consumer
* @param maxConcurrentLeases max allowable consumers at once
* @param demarcator bytes to use as demarcator between messages; null or
* empty means no demarcator
* @param kafkaProperties properties to use to initialize kafka consumers
* @param topics the topics to subscribe to
* @param maxWaitMillis maximum time to wait for a given lease to acquire
* data before committing
* @param keyEncoding the encoding to use for the key of a kafka message if
* found
* @param securityProtocol the security protocol used
* @param bootstrapServers the bootstrap servers
* @param logger the logger to report any errors/warnings
*/
public ConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> kafkaProperties, final ComponentLog logger) {
this.maxLeases = maxLeases;
if (maxLeases <= 0) {
throw new IllegalArgumentException("Max leases value must be greather than zero.");
}
public ConsumerPool(
final int maxConcurrentLeases,
final byte[] demarcator,
final Map<String, String> kafkaProperties,
final List<String> topics,
final long maxWaitMillis,
final String keyEncoding,
final String securityProtocol,
final String bootstrapServers,
final ComponentLog logger) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
if (topics == null || topics.isEmpty()) {
throw new IllegalArgumentException("Must have a list of one or more topics");
}
this.topics = topics;
this.kafkaProperties = new HashMap<>(kafkaProperties);
this.consumerLeases = new ArrayDeque<>();
this.demarcatorBytes = demarcator;
this.keyEncoding = keyEncoding;
this.securityProtocol = securityProtocol;
this.bootstrapServers = bootstrapServers;
this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
this.topics = Collections.unmodifiableList(topics);
}
/**
* Obtains a consumer from the pool if one is available
* Obtains a consumer from the pool if one is available or lazily
* initializes a new one if deemed necessary.
*
* @return consumer from the pool
* @throws IllegalArgumentException if pool already contains
* @param session the session for which the consumer lease will be
* associated
* @return consumer to use or null if not available or necessary
*/
public ConsumerLease obtainConsumer() {
final ConsumerLease lease;
final int activeLeases;
synchronized (this) {
lease = consumerLeases.poll();
activeLeases = activeLeaseCount.get();
}
if (lease == null && activeLeases >= maxLeases) {
logger.warn("No available consumers and cannot create any as max consumer leases limit reached - verify pool settings");
return null;
public ConsumerLease obtainConsumer(final ProcessSession session) {
SimpleConsumerLease lease = pooledLeases.poll();
if (lease == null) {
final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
consumerCreatedCountRef.incrementAndGet();
/**
* For now return a new consumer lease. But we could later elect to
* have this return null if we determine the broker indicates that
* the lag time on all topics being monitored is sufficiently low.
* For now we should encourage conservative use of threads because
* having too many means we'll have at best useless threads sitting
* around doing frequent network calls and at worst having consumers
* sitting idle which could prompt excessive rebalances.
*/
lease = new SimpleConsumerLease(consumer);
/**
* This subscription tightly couples the lease to the given
* consumer. They cannot be separated from then on.
*/
consumer.subscribe(topics, lease);
}
lease.setProcessSession(session);
leasesObtainedCountRef.incrementAndGet();
return (lease == null) ? createConsumer() : lease;
}
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return new KafkaConsumer<>(kafkaProperties);
}
private ConsumerLease createConsumer() {
final Consumer<byte[], byte[]> kafkaConsumer = createKafkaConsumer();
consumerCreatedCountRef.incrementAndGet();
try {
kafkaConsumer.subscribe(topics);
} catch (final KafkaException kex) {
try {
kafkaConsumer.close();
consumerClosedCountRef.incrementAndGet();
} catch (final Exception ex) {
consumerClosedCountRef.incrementAndGet();
//ignore
}
throw kex;
}
final ConsumerLease lease = new ConsumerLease() {
private volatile boolean poisoned = false;
private volatile boolean closed = false;
@Override
public ConsumerRecords<byte[], byte[]> poll() {
if (poisoned) {
throw new KafkaException("The consumer is poisoned and should no longer be used");
}
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(50);
if (records.isEmpty()) {
unproductivePollCountRef.incrementAndGet();
} else {
productivePollCountRef.incrementAndGet();
}
return records;
} catch (final KafkaException kex) {
logger.warn("Unable to poll from Kafka consumer so will poison and close this " + kafkaConsumer, kex);
poison();
close();
throw kex;
}
}
@Override
public void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (poisoned) {
throw new KafkaException("The consumer is poisoned and should no longer be used");
}
try {
kafkaConsumer.commitSync(offsets);
} catch (final KafkaException kex) {
logger.warn("Unable to commit kafka consumer offsets so will poison and close this " + kafkaConsumer, kex);
poison();
close();
throw kex;
}
}
@Override
public void close() {
if (closed) {
return;
}
if (poisoned || activeLeaseCount.get() > maxLeases) {
closeConsumer(kafkaConsumer);
activeLeaseCount.decrementAndGet();
closed = true;
} else {
final boolean added;
synchronized (ConsumerPool.this) {
added = consumerLeases.offer(this);
}
if (!added) {
closeConsumer(kafkaConsumer);
activeLeaseCount.decrementAndGet();
}
}
}
@Override
public void poison() {
poisoned = true;
}
};
activeLeaseCount.incrementAndGet();
return lease;
}
/**
* Closes all consumers in the pool. Can be safely recalled.
* Exposed as protected method for easier unit testing
*
* @return consumer
* @throws KafkaException if unable to subscribe to the given topics
*/
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return new KafkaConsumer<>(kafkaProperties);
}
/**
* Closes all consumers in the pool. Can be safely called repeatedly.
*/
@Override
public void close() {
final List<ConsumerLease> leases = new ArrayList<>();
synchronized (this) {
ConsumerLease lease = null;
while ((lease = consumerLeases.poll()) != null) {
leases.add(lease);
}
}
for (final ConsumerLease lease : leases) {
lease.poison();
lease.close();
}
final List<SimpleConsumerLease> leases = new ArrayList<>();
pooledLeases.drainTo(leases);
leases.stream().forEach((lease) -> {
lease.close(true);
});
}
private void closeConsumer(final Consumer consumer) {
consumerClosedCountRef.incrementAndGet();
try {
consumer.unsubscribe();
} catch (Exception e) {
@ -219,15 +158,55 @@ public class ConsumerPool implements Closeable {
try {
consumer.close();
consumerClosedCountRef.incrementAndGet();
} catch (Exception e) {
consumerClosedCountRef.incrementAndGet();
logger.warn("Failed while closing " + consumer, e);
}
}
PoolStats getPoolStats() {
return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get(), productivePollCountRef.get(), unproductivePollCountRef.get());
return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get());
}
private class SimpleConsumerLease extends ConsumerLease {
private final Consumer<byte[], byte[]> consumer;
private volatile ProcessSession session;
private volatile boolean closedConsumer;
private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, logger);
this.consumer = consumer;
}
void setProcessSession(final ProcessSession session) {
this.session = session;
}
@Override
public ProcessSession getProcessSession() {
return session;
}
@Override
public void close() {
super.close();
close(false);
}
public void close(final boolean forceClose) {
if (closedConsumer) {
return;
}
super.close();
if (session != null) {
session.rollback();
setProcessSession(null);
}
if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
closedConsumer = true;
closeConsumer(consumer);
}
}
}
static final class PoolStats {
@ -235,30 +214,22 @@ public class ConsumerPool implements Closeable {
final long consumerCreatedCount;
final long consumerClosedCount;
final long leasesObtainedCount;
final long productivePollCount;
final long unproductivePollCount;
PoolStats(
final long consumerCreatedCount,
final long consumerClosedCount,
final long leasesObtainedCount,
final long productivePollCount,
final long unproductivePollCount
final long leasesObtainedCount
) {
this.consumerCreatedCount = consumerCreatedCount;
this.consumerClosedCount = consumerClosedCount;
this.leasesObtainedCount = leasesObtainedCount;
this.productivePollCount = productivePollCount;
this.unproductivePollCount = unproductivePollCount;
}
@Override
public String toString() {
return "Created Consumers [" + consumerCreatedCount + "]\n"
+ "Closed Consumers [" + consumerClosedCount + "]\n"
+ "Leases Obtained [" + leasesObtainedCount + "]\n"
+ "Productive Polls [" + productivePollCount + "]\n"
+ "Unproductive Polls [" + unproductivePollCount + "]\n";
+ "Leases Obtained [" + leasesObtainedCount + "]\n";
}
}

View File

@ -55,6 +55,10 @@ final class KafkaProcessorUtils {
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
static final String KAFKA_KEY = "kafka.key";
@ -96,7 +100,6 @@ final class KafkaProcessorUtils {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(false)
.build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.context.service")
.displayName("SSL Context Service")
@ -227,7 +230,6 @@ final class KafkaProcessorUtils {
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
}
}
String pName = propertyDescriptor.getName();
String pValue = propertyDescriptor.isExpressionLanguageSupported()
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()

View File

@ -16,105 +16,36 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Before;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class ConsumeKafkaTest {
static class MockConsumerPool extends ConsumerPool {
final int actualMaxLeases;
final List<String> actualTopics;
final Map<String, String> actualKafkaProperties;
boolean throwKafkaExceptionOnPoll = false;
boolean throwKafkaExceptionOnCommit = false;
Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new ArrayDeque<>();
Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = null;
Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null;
boolean wasConsumerLeasePoisoned = false;
boolean wasConsumerLeaseClosed = false;
boolean wasPoolClosed = false;
public MockConsumerPool(int maxLeases, List<String> topics, Map<String, String> kafkaProperties, ComponentLog logger) {
super(maxLeases, topics, kafkaProperties, null);
actualMaxLeases = maxLeases;
actualTopics = topics;
actualKafkaProperties = kafkaProperties;
}
@Override
public ConsumerLease obtainConsumer() {
return new ConsumerLease() {
@Override
public ConsumerRecords<byte[], byte[]> poll() {
if (throwKafkaExceptionOnPoll) {
throw new KafkaException("i planned to fail");
}
final ConsumerRecords<byte[], byte[]> records = nextPlannedRecordsQueue.poll();
return (records == null) ? ConsumerRecords.empty() : records;
}
@Override
public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
if (throwKafkaExceptionOnCommit) {
throw new KafkaException("i planned to fail");
}
actualCommitOffsets = offsets;
}
@Override
public void poison() {
wasConsumerLeasePoisoned = true;
}
@Override
public void close() {
wasConsumerLeaseClosed = true;
}
};
}
@Override
public void close() {
wasPoolClosed = true;
}
void resetState() {
throwKafkaExceptionOnPoll = false;
throwKafkaExceptionOnCommit = false;
nextPlannedRecordsQueue = null;
nextExpectedCommitOffsets = null;
wasConsumerLeasePoisoned = false;
wasConsumerLeaseClosed = false;
wasPoolClosed = false;
}
Consumer<byte[], byte[]> mockConsumer = null;
ConsumerLease mockLease = null;
ConsumerPool mockConsumerPool = null;
@Before
public void setup() {
mockConsumer = mock(Consumer.class);
mockLease = mock(ConsumerLease.class);
mockConsumerPool = mock(ConsumerPool.class);
}
@Test
@ -175,31 +106,14 @@ public class ConsumeKafkaTest {
public void validateGetAllMessages() throws Exception {
String groupName = "validateGetAllMessages";
final byte[][] firstPassValues = new byte[][]{
"Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8),
"Hello-3".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
final byte[][] secondPassValues = new byte[][]{
"Hello-4".getBytes(StandardCharsets.UTF_8),
"Hello-5".getBytes(StandardCharsets.UTF_8),
"Hello-6".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
mockPool.nextPlannedRecordsQueue.add(firstRecs);
mockPool.nextPlannedRecordsQueue.add(secondRecs);
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);
ConsumeKafka proc = new ConsumeKafka() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
@ -208,69 +122,29 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
assertEquals(expectedTopics, mockPool.actualTopics);
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
if (mockPool.nextPlannedRecordsQueue.isEmpty()) {
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count());
assertEquals(2, mockPool.actualCommitOffsets.size());
assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
} else {
assertEquals(2, mockPool.actualCommitOffsets.size());
assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
}
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}
@Test
public void validateGetLotsOfMessages() throws Exception {
String groupName = "validateGetLotsOfMessages";
public void validateGetErrorMessages() throws Exception {
String groupName = "validateGetErrorMessages";
final byte[][] firstPassValues = new byte[10010][1];
for (final byte[] value : firstPassValues) {
value[0] = 0x12;
}
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
final byte[][] secondPassValues = new byte[][]{
"Hello-4".getBytes(StandardCharsets.UTF_8),
"Hello-5".getBytes(StandardCharsets.UTF_8),
"Hello-6".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
mockPool.nextPlannedRecordsQueue.add(firstRecs);
mockPool.nextPlannedRecordsQueue.add(secondRecs);
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(true, false);
when(mockLease.commit()).thenReturn(Boolean.FALSE);
ConsumeKafka proc = new ConsumeKafka() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
@ -279,352 +153,15 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
assertEquals(10010, flowFiles.stream().map(ff -> ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 0x12).count());
assertEquals(1, mockPool.nextPlannedRecordsQueue.size());
assertEquals(1, mockPool.actualCommitOffsets.size());
assertEquals(10011L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
verify(mockLease, times(2)).continuePolling();
verify(mockLease, times(1)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}
@SuppressWarnings({"rawtypes", "unchecked"})
private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
final TopicPartition tPart = new TopicPartition(topic, partition);
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
long offset = startingOffset;
for (final byte[] rawRecord : rawRecords) {
final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
records.add(rec);
}
map.put(tPart, records);
return new ConsumerRecords(map);
}
@SuppressWarnings({"rawtypes", "unchecked"})
private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
final TopicPartition tPart = new TopicPartition(topic, partition);
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
long offset = startingOffset;
for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
final byte[] key = entry.getKey();
final byte[] rawRecord = entry.getValue();
final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
records.add(rec);
}
map.put(tPart, records);
return new ConsumerRecords(map);
}
private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
for (final ConsumerRecords<byte[], byte[]> rec : records) {
rec.partitions().stream().forEach((part) -> {
final List<ConsumerRecord<byte[], byte[]>> conRecs = rec.records(part);
if (map.get(part) != null) {
throw new IllegalStateException("already have that topic/partition in the record map");
}
map.put(part, conRecs);
});
}
return new ConsumerRecords<>(map);
}
@Test
public void validateGetAllMessagesWithProvidedDemarcator() throws Exception {
String groupName = "validateGetAllMessagesWithProvidedDemarcator";
final byte[][] firstPassValues = new byte[][]{
"Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8),
"Hello-3".getBytes(StandardCharsets.UTF_8)
};
final byte[][] secondPassValues = new byte[][]{
"Hello-4".getBytes(StandardCharsets.UTF_8),
"Hello-5".getBytes(StandardCharsets.UTF_8),
"Hello-6".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
createConsumerRecords("foo", 1, 1L, firstPassValues),
createConsumerRecords("bar", 1, 1L, secondPassValues)
);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
mockPool.nextPlannedRecordsQueue.add(consumerRecs);
ConsumeKafka proc = new ConsumeKafka() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
assertEquals(2, flowFiles.size());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4blahHello-5blahHello-6")).count());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
assertEquals(2, mockPool.actualCommitOffsets.size());
assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
}
@Test
public void validatePollException() throws Exception {
String groupName = "validatePollException";
final byte[][] firstPassValues = new byte[][]{
"Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8),
"Hello-3".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
createConsumerRecords("foo", 1, 1L, firstPassValues)
);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
mockPool.nextPlannedRecordsQueue.add(consumerRecs);
mockPool.throwKafkaExceptionOnPoll = true;
ConsumeKafka proc = new ConsumeKafka() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka.TOPICS, "foo");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
runner.run(1, true);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
assertEquals(0, flowFiles.size());
assertNull(null, mockPool.actualCommitOffsets);
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertTrue(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
}
@Test
public void validateCommitOffsetException() throws Exception {
String groupName = "validateCommitOffsetException";
final byte[][] firstPassValues = new byte[][]{
"Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8),
"Hello-3".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
createConsumerRecords("foo", 1, 1L, firstPassValues)
);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
mockPool.nextPlannedRecordsQueue.add(consumerRecs);
mockPool.throwKafkaExceptionOnCommit = true;
ConsumeKafka proc = new ConsumeKafka() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka.TOPICS, "foo");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
runner.run(1, true);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
assertEquals(1, flowFiles.size());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertTrue(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
assertNull(null, mockPool.actualCommitOffsets);
}
@Test
public void validateUtf8Key() {
String groupName = "validateGetAllMessages";
final Map<byte[], byte[]> rawRecords = new HashMap<>();
rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
rawRecords.put(new byte[0], "Hello-2".getBytes());
rawRecords.put(null, "Hello-3".getBytes());
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
mockPool.nextPlannedRecordsQueue.add(firstRecs);
ConsumeKafka proc = new ConsumeKafka() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
assertEquals(expectedTopics, mockPool.actualTopics);
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
}
@Test
public void validateHexKey() {
String groupName = "validateGetAllMessages";
final Map<byte[], byte[]> rawRecords = new HashMap<>();
rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
rawRecords.put(new byte[0], "Hello-2".getBytes());
rawRecords.put(null, "Hello-3".getBytes());
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
mockPool.nextPlannedRecordsQueue.add(firstRecs);
ConsumeKafka proc = new ConsumeKafka() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka.KEY_ATTRIBUTE_ENCODING, ConsumeKafka.HEX_ENCODING);
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
assertEquals(expectedTopics, mockPool.actualTopics);
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
}
}

View File

@ -16,109 +16,203 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsumerPoolTest {
Consumer<byte[], byte[]> consumer = null;
ProcessSession mockSession = null;
ProvenanceReporter mockReporter = null;
ConsumerPool testPool = null;
ConsumerPool testDemarcatedPool = null;
ComponentLog logger = null;
@Before
public void setup() {
consumer = mock(Consumer.class);
logger = mock(ComponentLog.class);
mockSession = mock(ProcessSession.class);
mockReporter = mock(ProvenanceReporter.class);
when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
testPool = new ConsumerPool(
1,
null,
Collections.emptyMap(),
Collections.singletonList("nifi"),
100L,
"utf-8",
"ssl",
"localhost",
logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
testDemarcatedPool = new ConsumerPool(
1,
"--demarcator--".getBytes(StandardCharsets.UTF_8),
Collections.emptyMap(),
Collections.singletonList("nifi"),
100L,
"utf-8",
"ssl",
"localhost",
logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
}
@Test
public void validatePoolSimpleCreateClose() throws Exception {
final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
try (final ConsumerLease lease = testPool.obtainConsumer()) {
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
}
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
}
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
}
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
lease.commitOffsets(Collections.emptyMap());
}
testPool.close();
verify(mockSession, times(0)).create();
verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(4, stats.leasesObtainedCount);
}
@Test
public void validatePoolSimpleCreatePollClose() throws Exception {
final byte[][] firstPassValues = new byte[][]{
"Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8),
"Hello-3".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
lease.commit();
}
testPool.close();
verify(mockSession, times(3)).create();
verify(mockSession, times(1)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
assertEquals(1, stats.unproductivePollCount);
assertEquals(0, stats.productivePollCount);
}
@Test
public void validatePoolSimpleBatchCreateClose() throws Exception {
final ConsumerPool testPool = new ConsumerPool(5, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
try (final ConsumerLease lease = testPool.obtainConsumer()) {
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
for (int j = 0; j < 100; j++) {
lease.poll();
}
lease.commitOffsets(Collections.emptyMap());
}
}
testPool.close();
verify(mockSession, times(0)).create();
verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(100, stats.leasesObtainedCount);
assertEquals(10000, stats.unproductivePollCount);
assertEquals(0, stats.productivePollCount);
}
@Test
public void validatePoolBatchCreatePollClose() throws Exception {
final byte[][] firstPassValues = new byte[][]{
"Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8),
"Hello-3".getBytes(StandardCharsets.UTF_8)
};
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) {
lease.poll();
lease.commit();
}
testDemarcatedPool.close();
verify(mockSession, times(1)).create();
verify(mockSession, times(1)).commit();
final PoolStats stats = testDemarcatedPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
}
@Test
public void validatePoolConsumerFails() throws Exception {
final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
try {
lease.poll();
fail();
} catch (final KafkaException ke) {
}
};
when(consumer.poll(anyInt())).thenThrow(new KafkaException());
try (final ConsumerLease lease = testPool.obtainConsumer()) {
lease.poll();
fail();
} catch (final KafkaException ke) {
}
testPool.close();
verify(mockSession, times(0)).create();
verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
assertEquals(0, stats.unproductivePollCount);
assertEquals(0, stats.productivePollCount);
}
@SuppressWarnings({"rawtypes", "unchecked"})
static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
final TopicPartition tPart = new TopicPartition(topic, partition);
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
long offset = startingOffset;
for (final byte[] rawRecord : rawRecords) {
final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
records.add(rec);
}
map.put(tPart, records);
return new ConsumerRecords(map);
}
}