Upgrade Kafka library for kafka-lookup module (#8078)

* Upgrade Kafka library for kafka-lookup module

* Update licenes.yaml

* Adopt class workaround from KafkaRecordSupplier#getKafkaConsumer

* Update lisences for kafka clients
This commit is contained in:
Sayat 2019-08-14 22:46:25 +02:00 committed by Clint Wylie
parent 7fa0ff5e11
commit 1f3a99616d
5 changed files with 251 additions and 422 deletions

View File

@ -33,6 +33,10 @@
<relativePath>../../pom.xml</relativePath>
</parent>
<properties>
<apache.kafka.version>2.1.0</apache.kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
@ -59,17 +63,9 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<artifactId>kafka-clients</artifactId>
<version>${apache.kafka.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@ -98,6 +94,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${apache.kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mx4j</groupId>
<artifactId>mx4j-tools</artifactId>

View File

@ -30,26 +30,25 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.Min;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@ -62,21 +61,11 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@JsonTypeName("kafka")
public class KafkaLookupExtractorFactory implements LookupExtractorFactory
{
private static final Logger LOG = new Logger(KafkaLookupExtractorFactory.class);
static final Decoder<String> DEFAULT_STRING_DECODER = new Decoder<String>()
{
@Override
public String fromBytes(byte[] bytes)
{
return StringUtils.fromUtf8(bytes);
}
};
private final ListeningExecutorService executorService;
private final AtomicLong doubleEventCount = new AtomicLong(0L);
private final NamespaceExtractionCacheManager cacheManager;
@ -85,7 +74,6 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
private final AtomicBoolean started = new AtomicBoolean(false);
private CacheHandler cacheHandler;
private volatile ConsumerConnector consumerConnector;
private volatile ListenableFuture<?> future = null;
@JsonProperty
@ -156,92 +144,57 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
synchronized (started) {
if (started.get()) {
LOG.warn("Already started, not starting again");
return started.get();
return true;
}
if (executorService.isShutdown()) {
LOG.warn("Already shut down, not starting again");
return false;
}
final Properties kafkaProperties = new Properties();
kafkaProperties.putAll(getKafkaProperties());
if (kafkaProperties.containsKey("group.id")) {
throw new IAE(
"Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]",
kafkaProperties.getProperty("group.id")
);
}
if (kafkaProperties.containsKey("auto.offset.reset")) {
throw new IAE(
"Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]",
kafkaProperties.getProperty("auto.offset.reset")
);
}
Preconditions.checkNotNull(
kafkaProperties.getProperty("zookeeper.connect"),
"zookeeper.connect required property"
);
verifyKafkaProperties();
kafkaProperties.setProperty("group.id", factoryId);
final String topic = getKafkaTopic();
LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, factoryId);
cacheHandler = cacheManager.createCache();
final ConcurrentMap<String, String> map = cacheHandler.getCache();
mapRef.set(map);
// Enable publish-subscribe
kafkaProperties.setProperty("auto.offset.reset", "smallest");
final CountDownLatch startingReads = new CountDownLatch(1);
final ListenableFuture<?> future = executorService.submit(
new Runnable()
{
@Override
public void run()
{
while (!executorService.isShutdown()) {
consumerConnector = buildConnector(kafkaProperties);
try {
if (executorService.isShutdown()) {
break;
}
final ListenableFuture<?> future = executorService.submit(() -> {
final Consumer<String, String> consumer = getConsumer();
consumer.subscribe(Collections.singletonList(topic));
try {
while (!executorService.isShutdown()) {
try {
if (executorService.isShutdown()) {
break;
}
final ConsumerRecords<String, String> records = consumer.poll(1000);
startingReads.countDown();
final List<KafkaStream<String, String>> streams = consumerConnector.createMessageStreamsByFilter(
new Whitelist(Pattern.quote(topic)), 1, DEFAULT_STRING_DECODER, DEFAULT_STRING_DECODER
);
if (streams == null || streams.isEmpty()) {
throw new IAE("Topic [%s] had no streams", topic);
}
if (streams.size() > 1) {
throw new ISE("Topic [%s] has %d streams! expected 1", topic, streams.size());
}
final KafkaStream<String, String> kafkaStream = streams.get(0);
startingReads.countDown();
for (final MessageAndMetadata<String, String> messageAndMetadata : kafkaStream) {
final String key = messageAndMetadata.key();
final String message = messageAndMetadata.message();
if (key == null || message == null) {
LOG.error("Bad key/message from topic [%s]: [%s]", topic, messageAndMetadata);
continue;
}
doubleEventCount.incrementAndGet();
map.put(key, message);
doubleEventCount.incrementAndGet();
LOG.trace("Placed key[%s] val[%s]", key, message);
}
}
catch (Exception e) {
LOG.error(e, "Error reading stream for topic [%s]", topic);
}
finally {
consumerConnector.shutdown();
for (final ConsumerRecord<String, String> record : records) {
final String key = record.key();
final String message = record.value();
if (key == null || message == null) {
LOG.error("Bad key/message from topic [%s]: [%s]", topic, record);
continue;
}
doubleEventCount.incrementAndGet();
map.put(key, message);
doubleEventCount.incrementAndGet();
LOG.trace("Placed key[%s] val[%s]", key, message);
}
}
catch (Exception e) {
LOG.error(e, "Error reading stream for topic [%s]", topic);
}
}
);
}
finally {
consumer.close();
}
});
Futures.addCallback(
future,
new FutureCallback<Object>()
@ -293,14 +246,6 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
}
}
// Overridden in tests
ConsumerConnector buildConnector(Properties properties)
{
return new ZookeeperConsumerConnector(
new ConsumerConfig(properties)
);
}
@Override
public boolean close()
{
@ -312,10 +257,6 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
started.set(false);
executorService.shutdown();
if (consumerConnector != null) {
consumerConnector.shutdown();
}
final ListenableFuture<?> future = this.future;
if (future != null) {
if (!future.isDone() && !future.cancel(false)) {
@ -413,4 +354,51 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
{
return future;
}
private void verifyKafkaProperties()
{
if (kafkaProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
throw new IAE(
"Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]",
kafkaProperties.get(ConsumerConfig.GROUP_ID_CONFIG)
);
}
if (kafkaProperties.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
throw new IAE(
"Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]",
kafkaProperties.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
);
}
Preconditions.checkNotNull(
kafkaProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
"bootstrap.servers required property"
);
}
// Overridden in tests
Consumer<String, String> getConsumer()
{
// Workaround for Kafka String Serializer could not be found
// Adopted from org.apache.druid.indexing.kafka.KafkaRecordSupplier#getKafkaConsumer
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
final Properties properties = getConsumerProperties();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
return new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer());
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
}
}
@Nonnull
private Properties getConsumerProperties()
{
final Properties properties = new Properties();
properties.putAll(kafkaProperties);
// Enable publish-subscribe
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, factoryId);
return properties;
}
}

View File

@ -23,17 +23,14 @@ import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Bytes;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
@ -51,10 +48,8 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@RunWith(PowerMockRunner.class)
@ -278,17 +273,7 @@ public class KafkaLookupExtractorFactoryTest
@Test
public void testStartStop()
{
final KafkaStream<String, String> kafkaStream = PowerMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = PowerMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = PowerMock.createStrictMock(ConsumerConnector.class);
EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
EasyMock.anyObject(TopicFilter.class),
EasyMock.anyInt(),
EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER),
EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER)
)).andReturn(ImmutableList.of(kafkaStream)).once();
EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
Consumer<String, String> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
@ -296,34 +281,26 @@ public class KafkaLookupExtractorFactoryTest
cacheHandler.close();
EasyMock.expectLastCall();
final AtomicBoolean threadWasInterrupted = new AtomicBoolean(false);
consumerConnector.shutdown();
EasyMock.expectLastCall().andAnswer(() -> {
threadWasInterrupted.set(Thread.currentThread().isInterrupted());
return null;
}).times(2);
PowerMock.replay(cacheManager, cacheHandler);
PowerMock.replay(cacheManager, cacheHandler, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("zookeeper.connect", "localhost"),
ImmutableMap.of("bootstrap.servers", "localhost"),
10_000L,
false
)
{
@Override
ConsumerConnector buildConnector(Properties properties)
Consumer<String, String> getConsumer()
{
return consumerConnector;
return kafkaConsumer;
}
};
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
Assert.assertTrue(factory.getFuture().isDone());
Assert.assertFalse(threadWasInterrupted.get());
PowerMock.verify(cacheManager, cacheHandler);
}
@ -334,20 +311,20 @@ public class KafkaLookupExtractorFactoryTest
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<String, String>()).once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
EasyMock.expectLastCall();
PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("zookeeper.connect", "localhost"),
ImmutableMap.of("bootstrap.servers", "localhost"),
1,
false
)
{
@Override
ConsumerConnector buildConnector(Properties properties)
Consumer getConsumer()
{
// Lock up
try {
@ -368,36 +345,24 @@ public class KafkaLookupExtractorFactoryTest
@Test
public void testStartStopStart()
{
final KafkaStream<String, String> kafkaStream = PowerMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = PowerMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = PowerMock.createStrictMock(ConsumerConnector.class);
EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
EasyMock.anyObject(TopicFilter.class),
EasyMock.anyInt(),
EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER),
EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER)
)).andReturn(ImmutableList.of(kafkaStream)).once();
EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
Consumer<String, String> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
EasyMock.expectLastCall().once();
consumerConnector.shutdown();
EasyMock.expectLastCall().times(2);
PowerMock.replay(cacheManager, cacheHandler, kafkaStream, consumerConnector, consumerIterator);
PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("zookeeper.connect", "localhost")
ImmutableMap.of("bootstrap.servers", "localhost")
)
{
@Override
ConsumerConnector buildConnector(Properties properties)
Consumer<String, String> getConsumer()
{
return consumerConnector;
return kafkaConsumer;
}
};
Assert.assertTrue(factory.start());
@ -407,40 +372,28 @@ public class KafkaLookupExtractorFactoryTest
}
@Test
public void testStartStartStop()
public void testStartStartStopStop()
{
final KafkaStream<String, String> kafkaStream = PowerMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = PowerMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = PowerMock.createStrictMock(ConsumerConnector.class);
EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
EasyMock.anyObject(TopicFilter.class),
EasyMock.anyInt(),
EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER),
EasyMock.eq(KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER)
)).andReturn(ImmutableList.of(kafkaStream)).once();
EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
Consumer<String, String> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
EasyMock.expectLastCall().once();
consumerConnector.shutdown();
EasyMock.expectLastCall().times(3);
PowerMock.replay(cacheManager, cacheHandler, kafkaStream, consumerConnector, consumerIterator);
PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("zookeeper.connect", "localhost"),
ImmutableMap.of("bootstrap.servers", "localhost"),
10_000L,
false
)
{
@Override
ConsumerConnector buildConnector(Properties properties)
Consumer<String, String> getConsumer()
{
return consumerConnector;
return kafkaConsumer;
}
};
Assert.assertTrue(factory.start());
@ -453,7 +406,12 @@ public class KafkaLookupExtractorFactoryTest
@Test
public void testStartFailsOnMissingConnect()
{
expectedException.expectMessage("zookeeper.connect required property");
expectedException.expectMessage("bootstrap.servers required property");
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
PowerMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
@ -470,7 +428,12 @@ public class KafkaLookupExtractorFactoryTest
{
expectedException.expectMessage(
"Cannot set kafka property [group.id]. Property is randomly generated for you. Found");
PowerMock.replay(cacheManager);
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>());
cacheHandler.close();
PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
@ -486,6 +449,12 @@ public class KafkaLookupExtractorFactoryTest
{
expectedException.expectMessage(
"Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found ");
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
EasyMock.expectLastCall();
PowerMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
@ -533,13 +502,6 @@ public class KafkaLookupExtractorFactoryTest
Assert.assertEquals(injective, otherFactory.isInjective());
}
@Test
public void testDefaultDecoder()
{
final String str = "some string";
Assert.assertEquals(str, KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER.fromBytes(StringUtils.toUtf8(str)));
}
private IAnswer<Boolean> getBlockingAnswer()
{
return () -> {

View File

@ -25,34 +25,31 @@ import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import kafka.admin.AdminUtils;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkException;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.initialization.Initialization;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.lookup.namespace.NamespaceExtractionModule;
import org.apache.zookeeper.CreateMode;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Some;
import scala.collection.immutable.List$;
import java.io.Closeable;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@ -60,7 +57,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
*
@ -76,166 +72,37 @@ public class TestKafkaExtractionCluster
private final Closer closer = Closer.create();
private TestingCluster zkServer;
private KafkaServer kafkaServer;
private KafkaConfig kafkaConfig;
private TestingServer zkTestServer;
private ZkClient zkClient;
private Injector injector;
private ObjectMapper mapper;
private KafkaLookupExtractorFactory factory;
private static List<ProducerRecord<byte[], byte[]>> generateRecords()
{
return ImmutableList.of(
new ProducerRecord<>(topicName, 0,
StringUtils.toUtf8("abcdefg"),
StringUtils.toUtf8("abcdefg")));
}
@Before
public void setUp() throws Exception
{
zkTestServer = new TestingServer(-1, temporaryFolder.newFolder(), true);
zkTestServer.start();
zkServer = new TestingCluster(1);
zkServer.start();
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
zkTestServer.stop();
}
});
zkClient = new ZkClient(
zkTestServer.getConnectString(),
10000,
10000,
ZKStringSerializer$.MODULE$
);
closer.register(new Closeable()
{
@Override
public void close()
{
zkClient.close();
}
});
if (!zkClient.exists("/kafka")) {
zkClient.create("/kafka", null, CreateMode.PERSISTENT);
}
log.info("---------------------------Started ZK---------------------------");
final String zkKafkaPath = "/kafka";
final Properties serverProperties = new Properties();
serverProperties.putAll(kafkaProperties);
serverProperties.put("broker.id", "0");
serverProperties.put("log.dir", temporaryFolder.newFolder().getAbsolutePath());
serverProperties.put("log.cleaner.enable", "true");
serverProperties.put("host.name", "127.0.0.1");
serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
serverProperties.put("zookeeper.session.timeout.ms", "10000");
serverProperties.put("zookeeper.sync.time.ms", "200");
serverProperties.put("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
kafkaConfig = new KafkaConfig(serverProperties);
final long time = DateTimes.of("2015-01-01").getMillis();
kafkaServer = new KafkaServer(
kafkaConfig,
new Time()
{
getBrokerProperties(),
Time.SYSTEM,
Some.apply(StringUtils.format("TestingBroker[%d]-", 1)),
List$.MODULE$.empty());
@Override
public long milliseconds()
{
return time;
}
@Override
public long nanoseconds()
{
return TimeUnit.MILLISECONDS.toNanos(milliseconds());
}
@Override
public void sleep(long ms)
{
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
);
kafkaServer.startup();
closer.register(new Closeable()
{
@Override
public void close()
{
kafkaServer.shutdown();
kafkaServer.awaitShutdown();
}
});
log.info("---------------------------Started Kafka Broker ---------------------------");
int sleepCount = 0;
while (!kafkaServer.kafkaController().isActive()) {
Thread.sleep(100);
if (++sleepCount > 10) {
throw new InterruptedException("Controller took to long to awaken");
}
}
log.info("---------------------------Started Kafka Server---------------------------");
final ZkClient zkClient = new ZkClient(
zkTestServer.getConnectString() + zkKafkaPath, 10000, 10000,
ZKStringSerializer$.MODULE$
);
try (final AutoCloseable autoCloseable = new AutoCloseable()
{
@Override
public void close()
{
if (zkClient.exists(zkKafkaPath)) {
try {
zkClient.deleteRecursive(zkKafkaPath);
}
catch (ZkException ex) {
log.warn(ex, "error deleting %s zk node", zkKafkaPath);
}
}
zkClient.close();
}
}) {
final Properties topicProperties = new Properties();
topicProperties.put("cleanup.policy", "compact");
if (!AdminUtils.topicExists(zkClient, topicName)) {
AdminUtils.createTopic(zkClient, topicName, 1, 1, topicProperties);
}
log.info("---------------------------Created topic---------------------------");
Assert.assertTrue(AdminUtils.topicExists(zkClient, topicName));
}
final Properties kafkaProducerProperties = makeProducerProperties();
final Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
try (final AutoCloseable autoCloseable = new AutoCloseable()
{
@Override
public void close()
{
producer.close();
}
}) {
producer.send(
new KeyedMessage<>(
topicName,
StringUtils.toUtf8("abcdefg"),
StringUtils.toUtf8("abcdefg")
)
);
}
log.info("---------------------------Publish Messages to topic-----------------------");
publishRecordsToKafka();
System.setProperty("druid.extensions.searchCurrentClassloader", "false");
@ -260,10 +127,7 @@ public class TestKafkaExtractionCluster
mapper = injector.getInstance(ObjectMapper.class);
log.info("--------------------------- placed default item via producer ---------------------------");
final Map<String, String> consumerProperties = new HashMap<>(kafkaProperties);
consumerProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
consumerProperties.put("zookeeper.session.timeout.ms", "10000");
consumerProperties.put("zookeeper.sync.time.ms", "200");
final Map<String, String> consumerProperties = getConsumerProperties();
final KafkaLookupExtractorFactory kafkaLookupExtractorFactory = new KafkaLookupExtractorFactory(
null,
@ -278,17 +142,48 @@ public class TestKafkaExtractionCluster
Assert.assertEquals(kafkaLookupExtractorFactory.getKafkaTopic(), factory.getKafkaTopic());
Assert.assertEquals(kafkaLookupExtractorFactory.getKafkaProperties(), factory.getKafkaProperties());
factory.start();
closer.register(new Closeable()
{
@Override
public void close()
{
factory.close();
}
});
closer.register(() -> factory.close());
log.info("--------------------------- started rename manager ---------------------------");
}
@Nonnull
private Map<String, String> getConsumerProperties()
{
final Map<String, String> props = new HashMap<>(kafkaProperties);
int port = kafkaServer.socketServer().config().port();
props.put("bootstrap.servers", StringUtils.format("127.0.0.1:%d", port));
return props;
}
private void publishRecordsToKafka()
{
final Properties kafkaProducerProperties = makeProducerProperties();
try (final Producer<byte[], byte[]> producer = new KafkaProducer(kafkaProducerProperties)) {
generateRecords().forEach(producer::send);
}
}
@Nonnull
private KafkaConfig getBrokerProperties() throws IOException
{
final Properties serverProperties = new Properties();
serverProperties.putAll(kafkaProperties);
serverProperties.put("broker.id", "0");
serverProperties.put("zookeeper.connect", zkServer.getConnectString());
serverProperties.put("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
serverProperties.put("auto.create.topics.enable", "true");
serverProperties.put("log.dir", temporaryFolder.newFolder().getAbsolutePath());
serverProperties.put("num.partitions", "1");
serverProperties.put("offsets.topic.replication.factor", "1");
serverProperties.put("default.replication.factor", "1");
serverProperties.put("log.cleaner.enable", "true");
serverProperties.put("advertised.host.name", "localhost");
serverProperties.put("zookeeper.session.timeout.ms", "30000");
serverProperties.put("zookeeper.sync.time.ms", "200");
return new KafkaConfig(serverProperties);
}
@After
public void tearDown() throws Exception
{
@ -299,10 +194,11 @@ public class TestKafkaExtractionCluster
{
final Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.putAll(kafkaProperties);
kafkaProducerProperties.put(
"metadata.broker.list",
StringUtils.format("127.0.0.1:%d", kafkaServer.socketServer().port())
);
int port = kafkaServer.socketServer().config().port();
kafkaProducerProperties.put("bootstrap.servers", StringUtils.format("127.0.0.1:%d", port));
kafkaProducerProperties.put("key.serializer", ByteArraySerializer.class.getName());
kafkaProducerProperties.put("value.serializer", ByteArraySerializer.class.getName());
kafkaProducerProperties.put("acks", "all");
kafkaProperties.put("request.required.acks", "1");
return kafkaProducerProperties;
}
@ -315,56 +211,48 @@ public class TestKafkaExtractionCluster
}
@Test(timeout = 60_000L)
public void testSimpleRename() throws InterruptedException
public void testSimpleLookup() throws InterruptedException
{
final Properties kafkaProducerProperties = makeProducerProperties();
final Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
closer.register(new Closeable()
{
@Override
public void close()
{
producer.close();
try (final Producer<byte[], byte[]> producer = new KafkaProducer(makeProducerProperties())) {
checkServer();
assertUpdated(null, "foo");
assertReverseUpdated(ImmutableList.of(), "foo");
long events = factory.getCompletedEventCount();
log.info("------------------------- Sending foo bar -------------------------------");
producer.send(new ProducerRecord<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
long start = System.currentTimeMillis();
while (events == factory.getCompletedEventCount()) {
Thread.sleep(100);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
});
checkServer();
assertUpdated(null, "foo");
assertReverseUpdated(ImmutableList.of(), "foo");
log.info("------------------------- Checking foo bar -------------------------------");
assertUpdated("bar", "foo");
assertReverseUpdated(Collections.singletonList("foo"), "bar");
assertUpdated(null, "baz");
long events = factory.getCompletedEventCount();
checkServer();
events = factory.getCompletedEventCount();
log.info("------------------------- Sending foo bar -------------------------------");
producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
long start = System.currentTimeMillis();
while (events == factory.getCompletedEventCount()) {
Thread.sleep(100);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
log.info("------------------------- Sending baz bat -------------------------------");
producer.send(new ProducerRecord<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
while (events == factory.getCompletedEventCount()) {
Thread.sleep(10);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
log.info("------------------------- Checking baz bat -------------------------------");
Assert.assertEquals("bat", factory.get().apply("baz"));
Assert.assertEquals(Collections.singletonList("baz"), factory.get().unapply("bat"));
}
log.info("------------------------- Checking foo bar -------------------------------");
assertUpdated("bar", "foo");
assertReverseUpdated(Collections.singletonList("foo"), "bar");
assertUpdated(null, "baz");
checkServer();
events = factory.getCompletedEventCount();
log.info("------------------------- Sending baz bat -------------------------------");
producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
while (events == factory.getCompletedEventCount()) {
Thread.sleep(10);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
log.info("------------------------- Checking baz bat -------------------------------");
Assert.assertEquals("bat", factory.get().apply("baz"));
Assert.assertEquals(Collections.singletonList("baz"), factory.get().unapply("bat"));
}
private void assertUpdated(

View File

@ -2238,23 +2238,13 @@ name: Apache Kafka
license_category: binary
module: extensions/kafka-extraction-namespace
license_name: Apache License version 2.0
version: 0.8.2.1
version: 2.1.0
libraries:
- org.apache.kafka: kafka_2.10
- org.apache.kafka: kafka_2.12
- org.apache.kafka: kafka-clients
---
name: ZooKeeper Client
license_category: binary
module: extensions/kafka-extraction-namespace
license_name: Apache License version 2.0
version: 0.3
libraries:
- com.101tec: zkclient
---
name: Metrics Core Library
license_category: binary
module: extensions/kafka-extraction-namespace
@ -2280,10 +2270,9 @@ libraries:
name: Scala Library
license_category: binary
module: extensions/kafka-extraction-namespace
license_name: BSD-3-Clause License
copyright: EPFL, Lightbend Inc.
version: 2.10.4
license_file_path: licenses/bin/scala-lang.BSD3
license_name: Apache License version 2.0
copyright: LAMP/EPFL and Lightbend, Inc.
version: 2.12.7
libraries:
- org.scala-lang: scala-library