Avoid interrupting ZookeeperConsumerConnector.shutdown() #3346 (#3403)

This commit is contained in:
Gleb Smirnov 2016-09-15 03:44:27 +03:00 committed by Charles Allen
parent 7a2a4bc6de
commit d981a2aa02
2 changed files with 21 additions and 5 deletions

View File

@ -192,7 +192,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
@Override
public void run()
{
while (!executorService.isShutdown() && !Thread.currentThread().isInterrupted()) {
while (!executorService.isShutdown()) {
final ConsumerConnector consumerConnector = buildConnector(kafkaProperties);
try {
final List<KafkaStream<String, String>> streams = consumerConnector.createMessageStreamsByFilter(
@ -268,7 +268,8 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
}
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
if (!future.isDone() && !future.cancel(true) && !future.isDone()) {
executorService.shutdown();
if (!future.isDone() && !future.cancel(false)) {
LOG.warn("Could not cancel kafka listening thread");
}
LOG.error(e, "Failed to start kafka extraction factory");
@ -298,10 +299,10 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
return !started.get();
}
started.set(false);
executorService.shutdownNow();
executorService.shutdown();
final ListenableFuture<?> future = this.future;
if (future != null) {
if (!future.isDone() && !future.cancel(true) && !future.isDone()) {
if (!future.isDone() && !future.cancel(false)) {
LOG.error("Error cancelling future for topic [%s]", getKafkaTopic());
return false;
}

View File

@ -34,6 +34,7 @@ import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.javaapi.consumer.ConsumerConnector;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -46,6 +47,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static io.druid.query.lookup.KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER;
@ -271,8 +273,18 @@ public class KafkaLookupExtractorFactoryTest
.andReturn(new ConcurrentHashMap<String, String>())
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
final AtomicBoolean threadWasInterrupted = new AtomicBoolean(false);
consumerConnector.shutdown();
EasyMock.expectLastCall().once();
EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
{
@Override
public Object answer() throws Throwable {
threadWasInterrupted.set(Thread.currentThread().isInterrupted());
return null;
}
}).once();
EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
@ -288,9 +300,12 @@ public class KafkaLookupExtractorFactoryTest
return consumerConnector;
}
};
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
Assert.assertTrue(factory.getFuture().isDone());
Assert.assertFalse(threadWasInterrupted.get());
EasyMock.verify(cacheManager);
}