diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java index f6dda5b545..b51c64471e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java @@ -344,8 +344,10 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor { final ConsumerPool consumerPool = createConsumerPool(context, getLogger()); - final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties()); - if (numAssignedPartitions > 0) { + final boolean explicitAssignment = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties()); + if (explicitAssignment) { + final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties()); + // Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have // all of the partitions assigned. final int partitionCount = consumerPool.getPartitionCount(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java index 5e432b3329..1f54c7015e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java @@ -318,8 +318,10 @@ public class ConsumeKafka_2_6 extends AbstractProcessor { final ConsumerPool consumerPool = createConsumerPool(context, getLogger()); - final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties()); - if (numAssignedPartitions > 0) { + final boolean explicitAssignment = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties()); + if (explicitAssignment) { + final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties()); + // Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have // all of the partitions assigned. final int partitionCount = consumerPool.getPartitionCount(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java index 46bf97a9b0..591480a94d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java @@ -34,8 +34,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; @@ -102,7 +102,7 @@ public class ConsumerPool implements Closeable { final Charset headerCharacterSet, final Pattern headerNamePattern, final int[] partitionsToConsume) { - this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); + this.pooledLeases = new LinkedBlockingQueue<>(); this.maxWaitMillis = maxWaitMillis; this.logger = logger; this.demarcatorBytes = demarcator; @@ -119,6 +119,7 @@ public class ConsumerPool implements Closeable { this.headerNamePattern = headerNamePattern; this.separateByKey = separateByKey; this.partitionsToConsume = partitionsToConsume; + enqueueLeases(partitionsToConsume); } public ConsumerPool( @@ -136,7 +137,7 @@ public class ConsumerPool implements Closeable { final Charset headerCharacterSet, final Pattern headerNamePattern, final int[] partitionsToConsume) { - this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); + this.pooledLeases = new LinkedBlockingQueue<>(); this.maxWaitMillis = maxWaitMillis; this.logger = logger; this.demarcatorBytes = demarcator; @@ -153,6 +154,7 @@ public class ConsumerPool implements Closeable { this.headerNamePattern = headerNamePattern; this.separateByKey = separateByKey; this.partitionsToConsume = partitionsToConsume; + enqueueLeases(partitionsToConsume); } public ConsumerPool( @@ -171,7 +173,7 @@ public class ConsumerPool implements Closeable { final boolean separateByKey, final String keyEncoding, final int[] partitionsToConsume) { - this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); + this.pooledLeases = new LinkedBlockingQueue<>(); this.maxWaitMillis = maxWaitMillis; this.logger = logger; this.demarcatorBytes = null; @@ -188,6 +190,7 @@ public class ConsumerPool implements Closeable { this.separateByKey = separateByKey; this.keyEncoding = keyEncoding; this.partitionsToConsume = partitionsToConsume; + enqueueLeases(partitionsToConsume); } public ConsumerPool( @@ -206,7 +209,7 @@ public class ConsumerPool implements Closeable { final boolean separateByKey, final String keyEncoding, final int[] partitionsToConsume) { - this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); + this.pooledLeases = new LinkedBlockingQueue<>(); this.maxWaitMillis = maxWaitMillis; this.logger = logger; this.demarcatorBytes = null; @@ -223,6 +226,7 @@ public class ConsumerPool implements Closeable { this.separateByKey = separateByKey; this.keyEncoding = keyEncoding; this.partitionsToConsume = partitionsToConsume; + enqueueLeases(partitionsToConsume); } public int getPartitionCount() { @@ -282,16 +286,8 @@ public class ConsumerPool implements Closeable { consumer.subscribe(topicPattern, lease); } } else { - final List topicPartitions = new ArrayList<>(); - - for (final String topic : topics) { - for (final int partition : partitionsToConsume) { - final TopicPartition topicPartition = new TopicPartition(topic, partition); - topicPartitions.add(topicPartition); - } - } - - consumer.assign(topicPartitions); + logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease."); + return null; } } lease.setProcessSession(session, processContext); @@ -300,6 +296,32 @@ public class ConsumerPool implements Closeable { return lease; } + private SimpleConsumerLease createConsumerLease(final int partition) { + final List topicPartitions = new ArrayList<>(); + for (final String topic : topics) { + final TopicPartition topicPartition = new TopicPartition(topic, partition); + topicPartitions.add(topicPartition); + } + + final Consumer consumer = createKafkaConsumer(); + consumerCreatedCountRef.incrementAndGet(); + consumer.assign(topicPartitions); + + final SimpleConsumerLease lease = new SimpleConsumerLease(consumer); + return lease; + } + + private void enqueueLeases(final int[] partitionsToConsume) { + if (partitionsToConsume == null) { + return; + } + + for (final int partition : partitionsToConsume) { + final SimpleConsumerLease lease = createConsumerLease(partition); + pooledLeases.add(lease); + } + } + /** * Exposed as protected method for easier unit testing *