NIFI-8021: Fixed bug in ConsumeKafka_2_6 and ConsumeKafkaRecord_2_6 where explicit partition assignment causes issues with more than 1 concurrent task. Also fixed bug that prevented more nifi nodes than partitions because it didn't properly handle empty string for the list of partitions

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4744.
This commit is contained in:
Mark Payne 2021-01-06 09:39:17 -05:00 committed by Pierre Villard
parent ea80dad16a
commit 783633cac5
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 45 additions and 19 deletions

View File

@ -344,8 +344,10 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
final ConsumerPool consumerPool = createConsumerPool(context, getLogger());
final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
if (numAssignedPartitions > 0) {
final boolean explicitAssignment = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties());
if (explicitAssignment) {
final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
// Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
// all of the partitions assigned.
final int partitionCount = consumerPool.getPartitionCount();

View File

@ -318,8 +318,10 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
final ConsumerPool consumerPool = createConsumerPool(context, getLogger());
final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
if (numAssignedPartitions > 0) {
final boolean explicitAssignment = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties());
if (explicitAssignment) {
final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
// Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
// all of the partitions assigned.
final int partitionCount = consumerPool.getPartitionCount();

View File

@ -34,8 +34,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
@ -102,7 +102,7 @@ public class ConsumerPool implements Closeable {
final Charset headerCharacterSet,
final Pattern headerNamePattern,
final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.pooledLeases = new LinkedBlockingQueue<>();
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = demarcator;
@ -119,6 +119,7 @@ public class ConsumerPool implements Closeable {
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
this.partitionsToConsume = partitionsToConsume;
enqueueLeases(partitionsToConsume);
}
public ConsumerPool(
@ -136,7 +137,7 @@ public class ConsumerPool implements Closeable {
final Charset headerCharacterSet,
final Pattern headerNamePattern,
final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.pooledLeases = new LinkedBlockingQueue<>();
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = demarcator;
@ -153,6 +154,7 @@ public class ConsumerPool implements Closeable {
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
this.partitionsToConsume = partitionsToConsume;
enqueueLeases(partitionsToConsume);
}
public ConsumerPool(
@ -171,7 +173,7 @@ public class ConsumerPool implements Closeable {
final boolean separateByKey,
final String keyEncoding,
final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.pooledLeases = new LinkedBlockingQueue<>();
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = null;
@ -188,6 +190,7 @@ public class ConsumerPool implements Closeable {
this.separateByKey = separateByKey;
this.keyEncoding = keyEncoding;
this.partitionsToConsume = partitionsToConsume;
enqueueLeases(partitionsToConsume);
}
public ConsumerPool(
@ -206,7 +209,7 @@ public class ConsumerPool implements Closeable {
final boolean separateByKey,
final String keyEncoding,
final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.pooledLeases = new LinkedBlockingQueue<>();
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = null;
@ -223,6 +226,7 @@ public class ConsumerPool implements Closeable {
this.separateByKey = separateByKey;
this.keyEncoding = keyEncoding;
this.partitionsToConsume = partitionsToConsume;
enqueueLeases(partitionsToConsume);
}
public int getPartitionCount() {
@ -282,16 +286,8 @@ public class ConsumerPool implements Closeable {
consumer.subscribe(topicPattern, lease);
}
} else {
final List<TopicPartition> topicPartitions = new ArrayList<>();
for (final String topic : topics) {
for (final int partition : partitionsToConsume) {
final TopicPartition topicPartition = new TopicPartition(topic, partition);
topicPartitions.add(topicPartition);
}
}
consumer.assign(topicPartitions);
logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease.");
return null;
}
}
lease.setProcessSession(session, processContext);
@ -300,6 +296,32 @@ public class ConsumerPool implements Closeable {
return lease;
}
private SimpleConsumerLease createConsumerLease(final int partition) {
final List<TopicPartition> topicPartitions = new ArrayList<>();
for (final String topic : topics) {
final TopicPartition topicPartition = new TopicPartition(topic, partition);
topicPartitions.add(topicPartition);
}
final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
consumerCreatedCountRef.incrementAndGet();
consumer.assign(topicPartitions);
final SimpleConsumerLease lease = new SimpleConsumerLease(consumer);
return lease;
}
private void enqueueLeases(final int[] partitionsToConsume) {
if (partitionsToConsume == null) {
return;
}
for (final int partition : partitionsToConsume) {
final SimpleConsumerLease lease = createConsumerLease(partition);
pooledLeases.add(lease);
}
}
/**
* Exposed as protected method for easier unit testing
*