From d981a2aa02aedfdf18e0bed5848e704bd9597563 Mon Sep 17 00:00:00 2001 From: Gleb Smirnov Date: Thu, 15 Sep 2016 03:44:27 +0300 Subject: [PATCH] Avoid interrupting ZookeeperConsumerConnector.shutdown() #3346 (#3403) --- .../lookup/KafkaLookupExtractorFactory.java | 9 +++++---- .../lookup/KafkaLookupExtractorFactoryTest.java | 17 ++++++++++++++++- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java index ffca5bc3c4d..2649794f3fc 100644 --- a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java +++ b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java @@ -192,7 +192,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory @Override public void run() { - while (!executorService.isShutdown() && !Thread.currentThread().isInterrupted()) { + while (!executorService.isShutdown()) { final ConsumerConnector consumerConnector = buildConnector(kafkaProperties); try { final List> streams = consumerConnector.createMessageStreamsByFilter( @@ -268,7 +268,8 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory } } catch (InterruptedException | ExecutionException | TimeoutException e) { - if (!future.isDone() && !future.cancel(true) && !future.isDone()) { + executorService.shutdown(); + if (!future.isDone() && !future.cancel(false)) { LOG.warn("Could not cancel kafka listening thread"); } LOG.error(e, "Failed to start kafka extraction factory"); @@ -298,10 +299,10 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory return !started.get(); } started.set(false); - executorService.shutdownNow(); + executorService.shutdown(); final ListenableFuture future = this.future; if (future != null) { - if (!future.isDone() && !future.cancel(true) && !future.isDone()) { + if (!future.isDone() && !future.cancel(false)) { LOG.error("Error cancelling future for topic [%s]", getKafkaTopic()); return false; } diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java index c2a02c88817..ba3a79cda04 100644 --- a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java +++ b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java @@ -34,6 +34,7 @@ import kafka.consumer.KafkaStream; import kafka.consumer.TopicFilter; import kafka.javaapi.consumer.ConsumerConnector; import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -46,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import static io.druid.query.lookup.KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER; @@ -271,8 +273,18 @@ public class KafkaLookupExtractorFactoryTest .andReturn(new ConcurrentHashMap()) .once(); EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once(); + + final AtomicBoolean threadWasInterrupted = new AtomicBoolean(false); consumerConnector.shutdown(); - EasyMock.expectLastCall().once(); + EasyMock.expectLastCall().andAnswer(new IAnswer() + { + @Override + public Object answer() throws Throwable { + threadWasInterrupted.set(Thread.currentThread().isInterrupted()); + return null; + } + }).once(); + EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator); final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory( cacheManager, @@ -288,9 +300,12 @@ public class KafkaLookupExtractorFactoryTest return consumerConnector; } }; + Assert.assertTrue(factory.start()); Assert.assertTrue(factory.close()); Assert.assertTrue(factory.getFuture().isDone()); + Assert.assertFalse(threadWasInterrupted.get()); + EasyMock.verify(cacheManager); }