From 8d6e12fdc4002f6eba52edcf0670df40217eb43a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 6 Sep 2016 20:27:10 -0400 Subject: [PATCH] NIFI-2739: Call KafkaConsumer.wakeup() if consumer is blocking for at least 30 seconds when OnUnscheduled is called --- .../kafka/pubsub/ConsumeKafka_0_10.java | 42 +++++++++++++++++ .../kafka/pubsub/ConsumerLease.java | 8 ++++ .../processors/kafka/pubsub/ConsumeKafka.java | 46 ++++++++++++++++++- .../kafka/pubsub/ConsumerLease.java | 8 ++++ 4 files changed, 102 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java index e799876489..41c8cc6521 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java @@ -21,12 +21,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -35,6 +37,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -157,6 +160,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { static final Set RELATIONSHIPS; private volatile ConsumerPool consumerPool = null; + private final Set activeLeases = Collections.synchronizedSet(new HashSet<>()); static { List descriptors = new ArrayList<>(); @@ -239,6 +243,37 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log); } + @OnUnscheduled + public void interruptActiveThreads() { + // There are known issues with the Kafka client library that result in the client code hanging + // indefinitely when unable to communicate with the broker. In order to address this, we will wait + // up to 30 seconds for the Threads to finish and then will call Consumer.wakeup() to trigger the + // thread to wakeup when it is blocked, waiting on a response. + final long nanosToWait = TimeUnit.SECONDS.toNanos(5L); + final long start = System.nanoTime(); + while (System.nanoTime() - start < nanosToWait && !activeLeases.isEmpty()) { + try { + Thread.sleep(100L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + + if (!activeLeases.isEmpty()) { + int count = 0; + for (final ConsumerLease lease : activeLeases) { + getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease}); + lease.wakeup(); + count++; + } + + getLogger().info("Woke up {} consumers", new Object[] {count}); + } + + activeLeases.clear(); + } + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final ConsumerPool pool = getConsumerPool(context); @@ -252,6 +287,8 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { context.yield(); return; } + + activeLeases.add(lease); try { while (this.isScheduled() && lease.continuePolling()) { lease.poll(); @@ -259,12 +296,17 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { if (this.isScheduled() && !lease.commit()) { context.yield(); } + } catch (final WakeupException we) { + getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. " + + "Will roll back session and discard any partially received data.", new Object[] {lease}); } catch (final KafkaException kex) { getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", new Object[]{lease, kex}, kex); } catch (final Throwable t) { getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", new Object[]{lease, t}, t); + } finally { + activeLeases.remove(lease); } } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index f7a2e57e7c..c611fa2997 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; @@ -248,6 +249,13 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe return poisoned; } + /** + * Trigger the consumer's {@link KafkaConsumer#wakeup() wakeup()} method. + */ + public void wakeup() { + kafkaConsumer.wakeup(); + } + /** * Abstract method that is intended to be extended by the pool that created * this ConsumerLease object. It should ensure that the session given to diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java index 0b8a7520f7..c311e2a7bb 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java @@ -21,12 +21,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -35,6 +37,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -157,6 +160,7 @@ public class ConsumeKafka extends AbstractProcessor { static final Set RELATIONSHIPS; private volatile ConsumerPool consumerPool = null; + private final Set activeLeases = Collections.synchronizedSet(new HashSet<>()); static { List descriptors = new ArrayList<>(); @@ -239,6 +243,37 @@ public class ConsumeKafka extends AbstractProcessor { return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log); } + @OnUnscheduled + public void interruptActiveThreads() { + // There are known issues with the Kafka client library that result in the client code hanging + // indefinitely when unable to communicate with the broker. In order to address this, we will wait + // up to 30 seconds for the Threads to finish and then will call Consumer.wakeup() to trigger the + // thread to wakeup when it is blocked, waiting on a response. + final long nanosToWait = TimeUnit.SECONDS.toNanos(5L); + final long start = System.nanoTime(); + while (System.nanoTime() - start < nanosToWait && !activeLeases.isEmpty()) { + try { + Thread.sleep(100L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + + if (!activeLeases.isEmpty()) { + int count = 0; + for (final ConsumerLease lease : activeLeases) { + getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease}); + lease.wakeup(); + count++; + } + + getLogger().info("Woke up {} consumers", new Object[] {count}); + } + + activeLeases.clear(); + } + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final ConsumerPool pool = getConsumerPool(context); @@ -252,6 +287,8 @@ public class ConsumeKafka extends AbstractProcessor { context.yield(); return; } + + activeLeases.add(lease); try { while (this.isScheduled() && lease.continuePolling()) { lease.poll(); @@ -259,12 +296,17 @@ public class ConsumeKafka extends AbstractProcessor { if (this.isScheduled() && !lease.commit()) { context.yield(); } + } catch (final WakeupException we) { + getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. " + + "Will roll back session and discard any partially received data.", new Object[] {lease}); } catch (final KafkaException kex) { getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", - new Object[]{lease, kex}, kex); + new Object[] {lease, kex}, kex); } catch (final Throwable t) { getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", - new Object[]{lease, t}, t); + new Object[] {lease, t}, t); + } finally { + activeLeases.remove(lease); } } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 5b8ba1c331..ad66bb2c9d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; @@ -248,6 +249,13 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe return poisoned; } + /** + * Trigger the consumer's {@link KafkaConsumer#wakeup() wakeup()} method. + */ + public void wakeup() { + kafkaConsumer.wakeup(); + } + /** * Abstract method that is intended to be extended by the pool that created * this ConsumerLease object. It should ensure that the session given to