NIFI-2739: Call KafkaConsumer.wakeup() if consumer is blocking for at least 30 seconds when OnUnscheduled is called

This commit is contained in:
Mark Payne 2016-09-06 20:27:10 -04:00
parent 2afc739ab7
commit 8d6e12fdc4
4 changed files with 102 additions and 2 deletions

View File

@ -21,12 +21,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -35,6 +37,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@ -157,6 +160,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
static final Set<Relationship> RELATIONSHIPS;
private volatile ConsumerPool consumerPool = null;
private final Set<ConsumerLease> activeLeases = Collections.synchronizedSet(new HashSet<>());
static {
List<PropertyDescriptor> descriptors = new ArrayList<>();
@ -239,6 +243,37 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
}
@OnUnscheduled
public void interruptActiveThreads() {
// There are known issues with the Kafka client library that result in the client code hanging
// indefinitely when unable to communicate with the broker. In order to address this, we will wait
// up to 30 seconds for the Threads to finish and then will call Consumer.wakeup() to trigger the
// thread to wakeup when it is blocked, waiting on a response.
final long nanosToWait = TimeUnit.SECONDS.toNanos(5L);
final long start = System.nanoTime();
while (System.nanoTime() - start < nanosToWait && !activeLeases.isEmpty()) {
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
if (!activeLeases.isEmpty()) {
int count = 0;
for (final ConsumerLease lease : activeLeases) {
getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease});
lease.wakeup();
count++;
}
getLogger().info("Woke up {} consumers", new Object[] {count});
}
activeLeases.clear();
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ConsumerPool pool = getConsumerPool(context);
@ -252,6 +287,8 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
context.yield();
return;
}
activeLeases.add(lease);
try {
while (this.isScheduled() && lease.continuePolling()) {
lease.poll();
@ -259,12 +296,17 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
if (this.isScheduled() && !lease.commit()) {
context.yield();
}
} catch (final WakeupException we) {
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
+ "Will roll back session and discard any partially received data.", new Object[] {lease});
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
new Object[]{lease, kex}, kex);
} catch (final Throwable t) {
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
new Object[]{lease, t}, t);
} finally {
activeLeases.remove(lease);
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
@ -248,6 +249,13 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
return poisoned;
}
/**
* Trigger the consumer's {@link KafkaConsumer#wakeup() wakeup()} method.
*/
public void wakeup() {
kafkaConsumer.wakeup();
}
/**
* Abstract method that is intended to be extended by the pool that created
* this ConsumerLease object. It should ensure that the session given to

View File

@ -21,12 +21,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -35,6 +37,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@ -157,6 +160,7 @@ public class ConsumeKafka extends AbstractProcessor {
static final Set<Relationship> RELATIONSHIPS;
private volatile ConsumerPool consumerPool = null;
private final Set<ConsumerLease> activeLeases = Collections.synchronizedSet(new HashSet<>());
static {
List<PropertyDescriptor> descriptors = new ArrayList<>();
@ -239,6 +243,37 @@ public class ConsumeKafka extends AbstractProcessor {
return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
}
@OnUnscheduled
public void interruptActiveThreads() {
// There are known issues with the Kafka client library that result in the client code hanging
// indefinitely when unable to communicate with the broker. In order to address this, we will wait
// up to 30 seconds for the Threads to finish and then will call Consumer.wakeup() to trigger the
// thread to wakeup when it is blocked, waiting on a response.
final long nanosToWait = TimeUnit.SECONDS.toNanos(5L);
final long start = System.nanoTime();
while (System.nanoTime() - start < nanosToWait && !activeLeases.isEmpty()) {
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
if (!activeLeases.isEmpty()) {
int count = 0;
for (final ConsumerLease lease : activeLeases) {
getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease});
lease.wakeup();
count++;
}
getLogger().info("Woke up {} consumers", new Object[] {count});
}
activeLeases.clear();
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ConsumerPool pool = getConsumerPool(context);
@ -252,6 +287,8 @@ public class ConsumeKafka extends AbstractProcessor {
context.yield();
return;
}
activeLeases.add(lease);
try {
while (this.isScheduled() && lease.continuePolling()) {
lease.poll();
@ -259,12 +296,17 @@ public class ConsumeKafka extends AbstractProcessor {
if (this.isScheduled() && !lease.commit()) {
context.yield();
}
} catch (final WakeupException we) {
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
+ "Will roll back session and discard any partially received data.", new Object[] {lease});
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
new Object[]{lease, kex}, kex);
new Object[] {lease, kex}, kex);
} catch (final Throwable t) {
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
new Object[]{lease, t}, t);
new Object[] {lease, t}, t);
} finally {
activeLeases.remove(lease);
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
@ -248,6 +249,13 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
return poisoned;
}
/**
* Trigger the consumer's {@link KafkaConsumer#wakeup() wakeup()} method.
*/
public void wakeup() {
kafkaConsumer.wakeup();
}
/**
* Abstract method that is intended to be extended by the pool that created
* this ConsumerLease object. It should ensure that the session given to