diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java index b97a275cb71..42b442ddd40 100644 --- a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java +++ b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java @@ -82,6 +82,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory private final AtomicReference> mapRef = new AtomicReference<>(null); private final AtomicBoolean started = new AtomicBoolean(false); + private volatile ConsumerConnector consumerConnector; private volatile ListenableFuture future = null; @JsonProperty @@ -194,8 +195,12 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory public void run() { while (!executorService.isShutdown()) { - final ConsumerConnector consumerConnector = buildConnector(kafkaProperties); + consumerConnector = buildConnector(kafkaProperties); try { + if (executorService.isShutdown()) { + break; + } + final List> streams = consumerConnector.createMessageStreamsByFilter( new Whitelist(Pattern.quote(topic)), 1, DEFAULT_STRING_DECODER, DEFAULT_STRING_DECODER ); @@ -283,7 +288,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory } } - // Overriden in tests + // Overridden in tests ConsumerConnector buildConnector(Properties properties) { return new kafka.javaapi.consumer.ZookeeperConsumerConnector( @@ -301,6 +306,11 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory } started.set(false); executorService.shutdown(); + + if (consumerConnector != null) { + consumerConnector.shutdown(); + } + final ListenableFuture future = this.future; if (future != null) { if (!future.isDone() && !future.cancel(false)) { @@ -363,7 +373,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory .putLong(startCount) .array(); } else { - // If the number of things added HAS changed during the coruse of this extractor's life, we CANNOT cache + // If the number of things added HAS changed during the course of this extractor's life, we CANNOT cache final byte[] scrambler = StringUtils.toUtf8(UUID.randomUUID().toString()); return ByteBuffer .allocate(idutf8.length + 1 + scrambler.length + 1) diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java index ba3a79cda04..c1e4ff56377 100644 --- a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java +++ b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java @@ -283,7 +283,7 @@ public class KafkaLookupExtractorFactoryTest threadWasInterrupted.set(Thread.currentThread().isInterrupted()); return null; } - }).once(); + }).times(2); EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator); final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory( @@ -364,6 +364,8 @@ public class KafkaLookupExtractorFactoryTest .andReturn(new ConcurrentHashMap()) .once(); EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(false).once(); + consumerConnector.shutdown(); + EasyMock.expectLastCall().anyTimes(); EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator); final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory( @@ -404,7 +406,7 @@ public class KafkaLookupExtractorFactoryTest .once(); EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once(); consumerConnector.shutdown(); - EasyMock.expectLastCall().once(); + EasyMock.expectLastCall().times(2); EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator); final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory( cacheManager, @@ -444,7 +446,7 @@ public class KafkaLookupExtractorFactoryTest .once(); EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once(); consumerConnector.shutdown(); - EasyMock.expectLastCall().once(); + EasyMock.expectLastCall().times(3); EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator); final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory( cacheManager,