KafkaLookupExtractorFactory: shutdown kafka consumer on close() (#3539)

* shutdown kafka consumer on close

* handle close() race condition
This commit is contained in:
David Lim 2016-10-15 10:55:51 -06:00 committed by Charles Allen
parent 3b6261c690
commit 472c409b99
2 changed files with 18 additions and 6 deletions

View File

@ -82,6 +82,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
private final AtomicReference<Map<String, String>> mapRef = new AtomicReference<>(null);
private final AtomicBoolean started = new AtomicBoolean(false);
private volatile ConsumerConnector consumerConnector;
private volatile ListenableFuture<?> future = null;
@JsonProperty
@ -194,8 +195,12 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
public void run()
{
while (!executorService.isShutdown()) {
final ConsumerConnector consumerConnector = buildConnector(kafkaProperties);
consumerConnector = buildConnector(kafkaProperties);
try {
if (executorService.isShutdown()) {
break;
}
final List<KafkaStream<String, String>> streams = consumerConnector.createMessageStreamsByFilter(
new Whitelist(Pattern.quote(topic)), 1, DEFAULT_STRING_DECODER, DEFAULT_STRING_DECODER
);
@ -283,7 +288,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
}
}
// Overriden in tests
// Overridden in tests
ConsumerConnector buildConnector(Properties properties)
{
return new kafka.javaapi.consumer.ZookeeperConsumerConnector(
@ -301,6 +306,11 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
}
started.set(false);
executorService.shutdown();
if (consumerConnector != null) {
consumerConnector.shutdown();
}
final ListenableFuture<?> future = this.future;
if (future != null) {
if (!future.isDone() && !future.cancel(false)) {
@ -363,7 +373,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
.putLong(startCount)
.array();
} else {
// If the number of things added HAS changed during the coruse of this extractor's life, we CANNOT cache
// If the number of things added HAS changed during the course of this extractor's life, we CANNOT cache
final byte[] scrambler = StringUtils.toUtf8(UUID.randomUUID().toString());
return ByteBuffer
.allocate(idutf8.length + 1 + scrambler.length + 1)

View File

@ -283,7 +283,7 @@ public class KafkaLookupExtractorFactoryTest
threadWasInterrupted.set(Thread.currentThread().isInterrupted());
return null;
}
}).once();
}).times(2);
EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
@ -364,6 +364,8 @@ public class KafkaLookupExtractorFactoryTest
.andReturn(new ConcurrentHashMap<String, String>())
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(false).once();
consumerConnector.shutdown();
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
@ -404,7 +406,7 @@ public class KafkaLookupExtractorFactoryTest
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
consumerConnector.shutdown();
EasyMock.expectLastCall().once();
EasyMock.expectLastCall().times(2);
EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
@ -444,7 +446,7 @@ public class KafkaLookupExtractorFactoryTest
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
consumerConnector.shutdown();
EasyMock.expectLastCall().once();
EasyMock.expectLastCall().times(3);
EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,