diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java new file mode 100644 index 0000000000..e2cdea29dd --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.nifi.logging.ComponentLog; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A pool of Kafka Consumers for a given topic. Clients must create the ConsumerPool and call initialize() before + * acquiring consumers. Consumers should be returned by calling returnConsumerResource. + */ +public class ConsumerPool implements Closeable { + + private final int size; + private final BlockingQueue consumers; + private final String topic; + private final Properties kafkaProperties; + private final ComponentLog logger; + private boolean initialized = false; + + /** + * Initializes the pool with the given size, topic, properties, and logger, but does not create any consumers until initialize() is called. + * + * @param size the number of consumers to pool + * @param topic the topic to consume from + * @param kafkaProperties the properties for each consumer + * @param logger the logger to report any errors/warnings + */ + public ConsumerPool(final int size, final String topic, final Properties kafkaProperties, final ComponentLog logger) { + this.size = size; + this.logger = logger; + this.topic = topic; + this.kafkaProperties = kafkaProperties; + this.consumers = new LinkedBlockingQueue<>(size); + } + + /** + * Creates the consumers and subscribes them to the given topic. This method must be called before + * acquiring any consumers. + */ + public synchronized void initialize() { + if (initialized) { + return; + } + + for (int i=0; i < size; i++) { + ConsumerResource resource = createConsumerResource(); + consumers.offer(resource); + } + + initialized = true; + } + + /** + * @return a ConsumerResource from the pool, or a newly created ConsumerResource if none were available in the pool + * @throws IllegalStateException if attempting to get a consumer before calling initialize() + */ + public synchronized ConsumerResource getConsumerResource() { + if (!initialized) { + throw new IllegalStateException("ConsumerPool must be initialized before acquiring consumers"); + } + + ConsumerResource consumerResource = consumers.poll(); + if (consumerResource == null) { + consumerResource = createConsumerResource(); + } + return consumerResource; + } + + /** + * If the given ConsumerResource has been poisoned then it is closed and not returned to the pool, + * otherwise it is attempted to be returned to the pool. If the pool is already full then it is closed + * and not returned. + * + * @param consumerResource + */ + public synchronized void returnConsumerResource(final ConsumerResource consumerResource) { + if (consumerResource == null) { + return; + } + + if (consumerResource.isPoisoned()) { + closeConsumer(consumerResource.getConsumer()); + } else { + boolean added = consumers.offer(consumerResource); + if (!added) { + closeConsumer(consumerResource.getConsumer()); + } + } + } + + /** + * Closes all ConsumerResources in the pool and resets the initialization state of this pool. + * + * @throws IOException should never throw + */ + @Override + public synchronized void close() throws IOException { + ConsumerResource consumerResource; + while ((consumerResource = consumers.poll()) != null) { + closeConsumer(consumerResource.getConsumer()); + } + initialized = false; + } + + private ConsumerResource createConsumerResource() { + final Consumer kafkaConsumer = new KafkaConsumer<>(kafkaProperties); + kafkaConsumer.subscribe(Collections.singletonList(this.topic)); + return new ConsumerResource(kafkaConsumer, this, logger); + } + + private void closeConsumer(Consumer consumer) { + try { + consumer.unsubscribe(); + } catch (Exception e) { + logger.warn("Failed while unsubscribing " + consumer, e); + } + + try { + consumer.close(); + } catch (Exception e) { + logger.warn("Failed while closing " + consumer, e); + } + } + +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerResource.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerResource.java new file mode 100644 index 0000000000..baaf39fd0c --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerResource.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.nifi.logging.ComponentLog; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A wrapper for a Kafka Consumer obtained from a ConsumerPool. Client's should call poison() to indicate that this + * consumer should no longer be used by other clients, and should always call close(). Calling close() will pass + * this consumer back to the pool and the pool will determine the appropriate handling based on whether the consumer + * has been poisoned and whether the pool is already full. + */ +public class ConsumerResource implements Closeable { + + private final ComponentLog logger; + private final Consumer consumer; + private final ConsumerPool consumerPool; + private boolean poisoned = false; + + /** + * @param consumer the Kafka Consumer + * @param consumerPool the ConsumerPool this ConsumerResource was obtained from + * @param logger the logger to report any errors/warnings + */ + public ConsumerResource(Consumer consumer, ConsumerPool consumerPool, ComponentLog logger) { + this.logger = logger; + this.consumer = consumer; + this.consumerPool = consumerPool; + } + + /** + * @return the Kafka Consumer for this + */ + public Consumer getConsumer() { + return consumer; + } + + /** + * Sets the poison flag for this consumer to true. + */ + public void poison() { + poisoned = true; + } + + /** + * @return true if this consumer has been poisoned, false otherwise + */ + public boolean isPoisoned() { + return poisoned; + } + + @Override + public void close() throws IOException { + consumerPool.returnConsumerResource(this); + } + +}