NIFI-1629 This closes #282. downgraded Kafka back to 0.8 - added context.yield to PutKafka - added lifecycle hooks to defend from Kafka deadlocks

NIFI-1629 changd thread pool implementation in Get/PutKafka

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Oleg Zhurakousky 2016-03-16 09:55:08 -04:00 committed by joewitt
parent 68cfc8c612
commit 148b4497b4
6 changed files with 143 additions and 25 deletions

View File

@ -37,12 +37,12 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.0</version>
<version>0.8.2.2</version>
<exclusions>
<!-- Transitive dependencies excluded because they are located
in a legacy Maven repository, which Maven 3 doesn't support. -->

View File

@ -28,8 +28,14 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.nifi.annotation.behavior.DynamicProperty;
@ -40,6 +46,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
@ -174,6 +181,10 @@ public class GetKafka extends AbstractProcessor {
private final AtomicBoolean consumerStreamsReady = new AtomicBoolean();
private volatile long deadlockTimeout;
private volatile ExecutorService executor;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder()
@ -287,6 +298,18 @@ public class GetKafka extends AbstractProcessor {
consumer.shutdown();
}
}
if (this.executor != null) {
this.executor.shutdown();
try {
if (!this.executor.awaitTermination(30000, TimeUnit.MILLISECONDS)) {
this.executor.shutdownNow();
getLogger().warn("Executor did not stop in 30 sec. Terminated.");
}
this.executor = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override
@ -297,6 +320,14 @@ public class GetKafka extends AbstractProcessor {
.build();
}
@OnScheduled
public void schedule(ProcessContext context) {
this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
if (this.executor == null || this.executor.isShutdown()) {
this.executor = Executors.newCachedThreadPool();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
/*
@ -305,12 +336,55 @@ public class GetKafka extends AbstractProcessor {
*/
synchronized (this.consumerStreamsReady) {
if (!this.consumerStreamsReady.get()) {
this.createConsumers(context);
Future<Void> f = this.executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
createConsumers(context);
return null;
}
});
try {
f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
this.consumerStreamsReady.set(false);
f.cancel(true);
Thread.currentThread().interrupt();
getLogger().warn("Interrupted while waiting to get connection", e);
} catch (ExecutionException e) {
throw new IllegalStateException(e);
} catch (TimeoutException e) {
this.consumerStreamsReady.set(false);
f.cancel(true);
getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while waiting to get connection", e);
}
}
}
ConsumerIterator<byte[], byte[]> iterator = this.getStreamIterator();
if (iterator != null) {
this.consumeFromKafka(context, session, iterator);
//===
if (this.consumerStreamsReady.get()) {
Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
if (iterator != null) {
consumeFromKafka(context, session, iterator);
}
return null;
}
});
try {
consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
this.consumerStreamsReady.set(false);
consumptionFuture.cancel(true);
Thread.currentThread().interrupt();
getLogger().warn("Interrupted while consuming messages", e);
} catch (ExecutionException e) {
throw new IllegalStateException(e);
} catch (TimeoutException e) {
this.consumerStreamsReady.set(false);
consumptionFuture.cancel(true);
getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while consuming messages", e);
}
}
}

View File

@ -25,7 +25,6 @@ import org.I0Itec.zkclient.serialize.ZkSerializer;
import kafka.admin.AdminUtils;
import kafka.api.TopicMetadata;
import kafka.utils.ZKStringSerializer;
import kafka.utils.ZkUtils;
import scala.collection.JavaConversions;
/**
@ -52,7 +51,7 @@ class KafkaUtils {
}
});
scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), ZkUtils.apply(zkClient, false));
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient);
return topicMetadatas.size();
}
}

View File

@ -30,10 +30,16 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
@ -255,6 +261,9 @@ public class PutKafka extends AbstractSessionFactoryProcessor {
private volatile Producer<byte[], byte[]> producer;
private volatile ExecutorService executor;
private volatile long deadlockTimeout;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
@ -316,10 +325,26 @@ public class PutKafka extends AbstractSessionFactoryProcessor {
for (final FlowFileMessageBatch batch : activeBatches) {
batch.cancelOrComplete();
}
if (this.executor != null) {
this.executor.shutdown();
try {
if (!this.executor.awaitTermination(30000, TimeUnit.MILLISECONDS)) {
this.executor.shutdownNow();
getLogger().warn("Executor did not stop in 30 sec. Terminated.");
}
this.executor = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@OnScheduled
public void createProducer(final ProcessContext context) {
this.deadlockTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
if (this.executor == null || this.executor.isShutdown()) {
this.executor = Executors.newCachedThreadPool();
}
producer = new KafkaProducer<byte[], byte[]>(createConfig(context), new ByteArraySerializer(), new ByteArraySerializer());
}
@ -421,6 +446,7 @@ public class PutKafka extends AbstractSessionFactoryProcessor {
.build();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
FlowFileMessageBatch batch;
@ -430,10 +456,32 @@ public class PutKafka extends AbstractSessionFactoryProcessor {
final ProcessSession session = sessionFactory.createSession();
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
if (flowFile != null){
Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
doOnTrigger(context, session, flowFile);
return null;
}
});
try {
consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
consumptionFuture.cancel(true);
Thread.currentThread().interrupt();
getLogger().warn("Interrupted while sending messages", e);
} catch (ExecutionException e) {
throw new IllegalStateException(e);
} catch (TimeoutException e) {
consumptionFuture.cancel(true);
getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while sending messages", e);
}
} else {
context.yield();
}
}
private void doOnTrigger(final ProcessContext context, ProcessSession session, final FlowFile flowFile) throws ProcessException {
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8);

View File

@ -16,12 +16,11 @@
*/
package org.apache.nifi.processors.kafka;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import kafka.consumer.ConsumerIterator;
import kafka.message.MessageAndMetadata;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.BasicConfigurator;
import org.apache.nifi.processor.ProcessContext;
@ -35,6 +34,9 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import kafka.consumer.ConsumerIterator;
import kafka.message.MessageAndMetadata;
public class TestGetKafka {
@BeforeClass
@ -119,6 +121,13 @@ public class TestGetKafka {
@Override
public void createConsumers(ProcessContext context) {
try {
Field f = GetKafka.class.getDeclaredField("consumerStreamsReady");
f.setAccessible(true);
((AtomicBoolean) f.get(this)).set(true);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override

View File

@ -27,7 +27,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.Callback;
@ -475,16 +474,5 @@ public class TestPutKafka {
@Override
public void close() {
}
@Override
public void close(long arg0, TimeUnit arg1) {
// ignore, not used in test
}
@Override
public void flush() {
// ignore, not used in test
}
}
}