diff --git a/docs/content/development/extensions-core/kafka-extraction-namespace.md b/docs/content/development/extensions-core/kafka-extraction-namespace.md
index d9c63baf3fd..a9b4fe0547f 100644
--- a/docs/content/development/extensions-core/kafka-extraction-namespace.md
+++ b/docs/content/development/extensions-core/kafka-extraction-namespace.md
@@ -2,7 +2,7 @@
layout: doc_page
---
-# Kafka Namespaced Lookup
+# Kafka Lookups
Lookups are an
experimental feature.
@@ -10,44 +10,34 @@ Lookups are an
experimental feature.
Make sure to [include](../../operations/including-extensions.html) `druid-namespace-lookup` and `druid-kafka-extraction-namespace` as an extension.
-Note that this lookup does not employ a `pollPeriod`.
-
-If you need updates to populate as promptly as possible, it is possible to plug into a kafka topic whose key is the old value and message is the desired new value (both in UTF-8).
+If you need updates to populate as promptly as possible, it is possible to plug into a kafka topic whose key is the old value and message is the desired new value (both in UTF-8) as a LookupExtractorFactory.
```json
{
"type":"kafka",
- "namespace":"testTopic",
- "kafkaTopic":"testTopic"
+ "kafkaTopic":"testTopic",
+ "kafkaProperties":{"zookeeper.connect","somehost:2181/kafka"}
}
```
|Parameter|Description|Required|Default|
|---------|-----------|--------|-------|
-|`namespace`|The namespace to define|Yes||
|`kafkaTopic`|The kafka topic to read the data from|Yes||
-
-## Kafka renames
+|`kafkaProperties`|Kafka consumer properties. At least"zookeeper.connect" must be specified. Only the zookeeper connector is supported|Yes||
+|`connectTimeout`|How long to wait for an initial connection|No|`0` (do not wait)|
+|`isOneToOne`|The map is a one-to-one (see[Lookup DimensionSpecs](../querying/dimensionspecs.html))|No|`false`|
The extension `kafka-extraction-namespace` enables reading from a kafka feed which has name/key pairs to allow renaming of dimension values. An example use case would be to rename an ID to a human readable format.
-Currently the historical node caches the key/value pairs from the kafka feed in an ephemeral memory mapped DB via MapDB.
+The consumer properties `group.id` and `auto.offset.reset` CANNOT be set in `kafkaProperties` as they are set by the extension as `UUID.randomUUID().toString()` and `smallest` respectively.
-## Configuration
+See [lookups](../../querying/lookups.html) for how to configure and use lookups.
-The following options are used to define the behavior and should be included wherever the extension is included (all query servicing nodes):
+# Limitations
-|Property|Description|Default|
-|--------|-----------|-------|
-|`druid.query.rename.kafka.properties`|A json map of kafka consumer properties. See below for special properties.|See below|
-
-The following are the handling for kafka consumer properties in `druid.query.rename.kafka.properties`
-
-|Property|Description|Default|
-|--------|-----------|-------|
-|`zookeeper.connect`|Zookeeper connection string|`localhost:2181/kafka`|
-|`group.id`|Group ID, auto-assigned for publish-subscribe model and cannot be overridden|`UUID.randomUUID().toString()`|
-|`auto.offset.reset`|Setting to get the entire kafka rename stream. Cannot be overridden|`smallest`|
+Currently the Kafka lookup extractor feeds the entire kafka stream into a local cache. If you are using OnHeap caching, this can easily clobber your java heap if the kafka stream spews a lot of unique keys.
+OffHeap caching should alleviate these concerns, but there is still a limit to the quantity of data that can be stored.
+There is currently no eviction policy.
## Testing the Kafka rename functionality
diff --git a/extensions-core/kafka-extraction-namespace/pom.xml b/extensions-core/kafka-extraction-namespace/pom.xml
index 9f1a2a8b8ae..58bda817cc3 100644
--- a/extensions-core/kafka-extraction-namespace/pom.xml
+++ b/extensions-core/kafka-extraction-namespace/pom.xml
@@ -98,5 +98,10 @@
3.0.1
test
+
+ org.easymock
+ easymock
+ test
+
diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/extraction/namespace/KafkaExtractionNamespace.java b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/extraction/namespace/KafkaExtractionNamespace.java
deleted file mode 100644
index 2733f4a558d..00000000000
--- a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/extraction/namespace/KafkaExtractionNamespace.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to Metamarkets Group Inc. (Metamarkets) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Metamarkets licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package io.druid.query.extraction.namespace;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-
-import javax.validation.constraints.NotNull;
-
-/**
- *
- */
-@JsonTypeName("kafka")
-public class KafkaExtractionNamespace implements ExtractionNamespace
-{
- @JsonProperty
- private final String kafkaTopic;
- @JsonProperty
- private final String namespace;
-
- @JsonCreator
- public KafkaExtractionNamespace(
- @NotNull @JsonProperty(value = "kafkaTopic", required = true) final String kafkaTopic,
- @NotNull @JsonProperty(value = "namespace", required = true) final String namespace
- )
- {
- Preconditions.checkNotNull(kafkaTopic, "kafkaTopic required");
- Preconditions.checkNotNull(namespace, "namespace required");
- this.kafkaTopic = kafkaTopic;
- this.namespace = namespace;
- }
-
- public String getKafkaTopic()
- {
- return kafkaTopic;
- }
-
- @Override
- public String getNamespace()
- {
- return namespace;
- }
-
- @Override
- public long getPollMs()
- {
- return 0L;
- }
-
- @Override
- public String toString()
- {
- return String.format("KafkaExtractionNamespace = { kafkaTopic = '%s', namespace = '%s'", kafkaTopic, namespace);
- }
-}
diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaExtractionNamespaceModule.java b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaExtractionNamespaceModule.java
new file mode 100644
index 00000000000..17159dbb32a
--- /dev/null
+++ b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaExtractionNamespaceModule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.lookup;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import io.druid.initialization.DruidModule;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class KafkaExtractionNamespaceModule implements DruidModule
+{
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return ImmutableList.
of(
+ new SimpleModule("kafka-lookups").registerSubtypes(
+ KafkaLookupExtractorFactory.class
+ )
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ // NOOP
+ }
+}
diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java
new file mode 100644
index 00000000000..286a8d0d538
--- /dev/null
+++ b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.lookup;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.metamx.common.IAE;
+import com.metamx.common.ISE;
+import com.metamx.common.StringUtils;
+import com.metamx.common.logger.Logger;
+import io.druid.concurrent.Execs;
+import io.druid.query.extraction.MapLookupExtractor;
+import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import javax.validation.constraints.Min;
+import javax.ws.rs.GET;
+import javax.ws.rs.core.Response;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.consumer.Whitelist;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.Decoder;
+
+@JsonTypeName("kafka")
+public class KafkaLookupExtractorFactory implements LookupExtractorFactory
+{
+ private static final Logger LOG = new Logger(KafkaLookupExtractorFactory.class);
+ static final Decoder DEFAULT_STRING_DECODER = new Decoder()
+ {
+ @Override
+ public String fromBytes(byte[] bytes)
+ {
+ return StringUtils.fromUtf8(bytes);
+ }
+ };
+
+ private final ListeningExecutorService executorService;
+ private final AtomicLong doubleEventCount = new AtomicLong(0L);
+ private final NamespaceExtractionCacheManager cacheManager;
+ private final String factoryId = UUID.randomUUID().toString();
+ private final AtomicReference> mapRef = new AtomicReference<>(null);
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private volatile ListenableFuture> future = null;
+
+ @JsonProperty
+ private final String kafkaTopic;
+
+ @JsonProperty
+ private final Map kafkaProperties;
+
+ @JsonProperty
+ private final long connectTimeout;
+
+ @JsonProperty
+ private final boolean isOneToOne;
+
+ @JsonCreator
+ public KafkaLookupExtractorFactory(
+ @JacksonInject NamespaceExtractionCacheManager cacheManager,
+ @JsonProperty("kafkaTopic") final String kafkaTopic,
+ @JsonProperty("kafkaProperties") final Map kafkaProperties,
+ @JsonProperty("connectTimeout") @Min(0) long connectTimeout,
+ @JsonProperty("isOneToOne") boolean isOneToOne
+ )
+ {
+ this.kafkaTopic = Preconditions.checkNotNull(kafkaTopic, "kafkaTopic required");
+ this.kafkaProperties = Preconditions.checkNotNull(kafkaProperties, "kafkaProperties required");
+ executorService = MoreExecutors.listeningDecorator(Execs.singleThreaded(
+ "kafka-factory-" + kafkaTopic + "-%s",
+ Thread.MIN_PRIORITY
+ ));
+ this.cacheManager = cacheManager;
+ this.connectTimeout = connectTimeout;
+ this.isOneToOne = isOneToOne;
+ }
+
+ public KafkaLookupExtractorFactory(
+ NamespaceExtractionCacheManager cacheManager,
+ String kafkaTopic,
+ Map kafkaProperties
+ )
+ {
+ this(cacheManager, kafkaTopic, kafkaProperties, 0, false);
+ }
+
+ public String getKafkaTopic()
+ {
+ return kafkaTopic;
+ }
+
+ public Map getKafkaProperties()
+ {
+ return kafkaProperties;
+ }
+
+ public long getConnectTimeout()
+ {
+ return connectTimeout;
+ }
+
+ public boolean isOneToOne()
+ {
+ return isOneToOne;
+ }
+
+ @Override
+ public boolean start()
+ {
+ synchronized (started) {
+ if (started.get()) {
+ LOG.warn("Already started, not starting again");
+ return started.get();
+ }
+ if (executorService.isShutdown()) {
+ LOG.warn("Already shut down, not starting again");
+ return false;
+ }
+ final Properties kafkaProperties = new Properties();
+ kafkaProperties.putAll(getKafkaProperties());
+ if (kafkaProperties.containsKey("group.id")) {
+ throw new IAE(
+ "Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]",
+ kafkaProperties.getProperty("group.id")
+ );
+ }
+ if (kafkaProperties.containsKey("auto.offset.reset")) {
+ throw new IAE(
+ "Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]",
+ kafkaProperties.getProperty("auto.offset.reset")
+ );
+ }
+ Preconditions.checkNotNull(
+ kafkaProperties.getProperty("zookeeper.connect"),
+ "zookeeper.connect required property"
+ );
+
+ kafkaProperties.setProperty("group.id", factoryId);
+ final String topic = getKafkaTopic();
+ LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, factoryId);
+ final Map map = cacheManager.getCacheMap(factoryId);
+ mapRef.set(map);
+ // Enable publish-subscribe
+ kafkaProperties.setProperty("auto.offset.reset", "smallest");
+
+ final CountDownLatch startingReads = new CountDownLatch(1);
+
+ final ListenableFuture> future = executorService.submit(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ while (!executorService.isShutdown() && !Thread.currentThread().isInterrupted()) {
+ final ConsumerConnector consumerConnector = buildConnector(kafkaProperties);
+ try {
+ final List> streams = consumerConnector.createMessageStreamsByFilter(
+ new Whitelist(Pattern.quote(topic)), 1, DEFAULT_STRING_DECODER, DEFAULT_STRING_DECODER
+ );
+
+ if (streams == null || streams.isEmpty()) {
+ throw new IAE("Topic [%s] had no streams", topic);
+ }
+ if (streams.size() > 1) {
+ throw new ISE("Topic [%s] has %d streams! expected 1", topic, streams.size());
+ }
+ final KafkaStream kafkaStream = streams.get(0);
+
+ startingReads.countDown();
+
+ for (final MessageAndMetadata messageAndMetadata : kafkaStream) {
+ final String key = messageAndMetadata.key();
+ final String message = messageAndMetadata.message();
+ if (key == null || message == null) {
+ LOG.error("Bad key/message from topic [%s]: [%s]", topic, messageAndMetadata);
+ continue;
+ }
+ doubleEventCount.incrementAndGet();
+ map.put(key, message);
+ doubleEventCount.incrementAndGet();
+ LOG.trace("Placed key[%s] val[%s]", key, message);
+ }
+ }
+ catch (Exception e) {
+ LOG.error(e, "Error reading stream for topic [%s]", topic);
+ }
+ finally {
+ consumerConnector.shutdown();
+ }
+ }
+ }
+ }
+ );
+ Futures.addCallback(
+ future, new FutureCallback()
+ {
+ @Override
+ public void onSuccess(Object result)
+ {
+ LOG.debug("Success listening to [%s]", topic);
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ if (t instanceof CancellationException) {
+ LOG.debug("Topic [%s] cancelled", topic);
+ } else {
+ LOG.error(t, "Error in listening to [%s]", topic);
+ }
+ }
+ },
+ MoreExecutors.sameThreadExecutor()
+ );
+ this.future = future;
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ while (!startingReads.await(100, TimeUnit.MILLISECONDS) && connectTimeout > 0L) {
+ // Don't return until we have actually connected
+ if (future.isDone()) {
+ future.get();
+ } else {
+ if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > connectTimeout) {
+ throw new TimeoutException("Failed to connect to kafka in sufficient time");
+ }
+ }
+ }
+ }
+ catch (InterruptedException | ExecutionException | TimeoutException e) {
+ if (!future.isDone() && !future.cancel(true) && !future.isDone()) {
+ LOG.warn("Could not cancel kafka listening thread");
+ }
+ LOG.error(e, "Failed to start kafka extraction factory");
+ cacheManager.delete(factoryId);
+ return false;
+ }
+
+ started.set(true);
+ return true;
+ }
+ }
+
+ // Overriden in tests
+ ConsumerConnector buildConnector(Properties properties)
+ {
+ return new kafka.javaapi.consumer.ZookeeperConsumerConnector(
+ new ConsumerConfig(properties)
+ );
+ }
+
+ @Override
+ public boolean close()
+ {
+ synchronized (started) {
+ if (!started.get() || executorService.isShutdown()) {
+ LOG.info("Already shutdown, ignoring");
+ return !started.get();
+ }
+ started.set(false);
+ executorService.shutdownNow();
+ final ListenableFuture> future = this.future;
+ if (future != null) {
+ if (!future.isDone() && !future.cancel(true) && !future.isDone()) {
+ LOG.error("Error cancelling future for topic [%s]", getKafkaTopic());
+ return false;
+ }
+ }
+ if (!cacheManager.delete(factoryId)) {
+ LOG.error("Error removing [%s] for topic [%s] from cache", factoryId, getKafkaTopic());
+ return false;
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public boolean replaces(@Nullable LookupExtractorFactory other)
+ {
+ if (this == other) {
+ return false;
+ }
+
+ if (other == null) {
+ return false;
+ }
+
+ if (getClass() != other.getClass()) {
+ return true;
+ }
+
+ final KafkaLookupExtractorFactory that = (KafkaLookupExtractorFactory) other;
+
+ return !(getKafkaTopic().equals(that.getKafkaTopic())
+ && getKafkaProperties().equals(that.getKafkaProperties())
+ && getConnectTimeout() == that.getConnectTimeout()
+ && isOneToOne() == that.isOneToOne()
+ );
+ }
+
+ @Nullable
+ @Override
+ public LookupIntrospectHandler getIntrospectHandler()
+ {
+ return new KafkaLookupExtractorIntrospectionHandler();
+ }
+
+ @Override
+ public LookupExtractor get()
+ {
+ final Map map = Preconditions.checkNotNull(mapRef.get(), "Not started");
+ final long startCount = doubleEventCount.get();
+ return new MapLookupExtractor(map, isOneToOne())
+ {
+ @Override
+ public byte[] getCacheKey()
+ {
+ final byte[] idutf8 = StringUtils.toUtf8(factoryId);
+ // If the number of things added has not changed during the course of this extractor's life, we can cache it
+ if (startCount == doubleEventCount.get()) {
+ return ByteBuffer
+ .allocate(idutf8.length + 1 + Longs.BYTES)
+ .put(idutf8)
+ .put((byte) 0xFF)
+ .putLong(startCount)
+ .array();
+ } else {
+ // If the number of things added HAS changed during the coruse of this extractor's life, we CANNOT cache
+ final byte[] scrambler = StringUtils.toUtf8(UUID.randomUUID().toString());
+ return ByteBuffer
+ .allocate(idutf8.length + 1 + scrambler.length + 1)
+ .put(idutf8)
+ .put((byte) 0xFF)
+ .put(scrambler)
+ .put((byte) 0xFF)
+ .array();
+ }
+ }
+ };
+ }
+
+ public long getCompletedEventCount()
+ {
+ return doubleEventCount.get() >> 1;
+ }
+
+ // Used in tests
+ NamespaceExtractionCacheManager getCacheManager()
+ {
+ return cacheManager;
+ }
+
+ AtomicReference> getMapRef()
+ {
+ return mapRef;
+ }
+
+ AtomicLong getDoubleEventCount()
+ {
+ return doubleEventCount;
+ }
+
+ ListenableFuture> getFuture()
+ {
+ return future;
+ }
+
+
+ class KafkaLookupExtractorIntrospectionHandler implements LookupIntrospectHandler
+ {
+ @GET
+ public Response getActive()
+ {
+ final ListenableFuture> future = getFuture();
+ if (future != null && !future.isDone()) {
+ return Response.ok().build();
+ } else {
+ return Response.status(Response.Status.GONE).build();
+ }
+ }
+ }
+}
diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionManager.java b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionManager.java
deleted file mode 100644
index caf93162e4d..00000000000
--- a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionManager.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to Metamarkets Group Inc. (Metamarkets) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Metamarkets licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package io.druid.server.namespace;
-
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-import com.metamx.common.IAE;
-import com.metamx.common.ISE;
-import com.metamx.common.StringUtils;
-import com.metamx.common.lifecycle.LifecycleStart;
-import com.metamx.common.lifecycle.LifecycleStop;
-import com.metamx.common.logger.Logger;
-import io.druid.guice.ManageLifecycle;
-import io.druid.query.extraction.namespace.KafkaExtractionNamespace;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.Whitelist;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.serializer.Decoder;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-
-/**
- *
- */
-@ManageLifecycle
-public class KafkaExtractionManager
-{
- private static final Logger log = new Logger(KafkaExtractionManager.class);
-
- private final Properties kafkaProperties = new Properties();
- private final ConcurrentMap namespaceVersionMap;
- private final ConcurrentMap topicEvents = new ConcurrentHashMap<>();
- private final Collection> futures = new ConcurrentLinkedQueue<>();
- private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder()
- .setNameFormat("kafka-rename-consumer-%d")
- .setDaemon(true)
- .setPriority(Thread.MIN_PRIORITY)
- .build()
- )
- );
- private final AtomicInteger backgroundTaskCount = new AtomicInteger(0);
-
- // Bindings in KafkaExtractionNamespaceModule
- @Inject
- public KafkaExtractionManager(
- @Named("namespaceVersionMap") final ConcurrentMap namespaceVersionMap,
- @Named("renameKafkaProperties") final Properties kafkaProperties
- )
- {
- if (kafkaProperties.containsKey("group.id")) {
- throw new IAE(
- "Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]",
- kafkaProperties.getProperty("group.id")
- );
- }
- if (kafkaProperties.containsKey("auto.offset.reset")) {
- throw new IAE(
- "Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]",
- kafkaProperties.getProperty("auto.offset.reset")
- );
- }
- this.kafkaProperties.putAll(kafkaProperties);
- if (!this.kafkaProperties.containsKey("zookeeper.connect")) {
- this.kafkaProperties.put("zookeeper.connect", "localhost:2181/kafka");
- }
- // Enable publish-subscribe
- this.kafkaProperties.setProperty("auto.offset.reset", "smallest");
-
- this.namespaceVersionMap = namespaceVersionMap;
- }
-
- public long getBackgroundTaskCount()
- {
- return backgroundTaskCount.get();
- }
-
- private static final Decoder defaultStringDecoder = new Decoder()
- {
- @Override
- public String fromBytes(byte[] bytes)
- {
- return StringUtils.fromUtf8(bytes);
- }
- };
-
- public long getNumEvents(String namespace)
- {
- if(namespace == null){
- return 0L;
- } else {
- final AtomicLong eventCounter = topicEvents.get(namespace);
- if(eventCounter != null) {
- return eventCounter.get();
- } else {
- return 0L;
- }
- }
- }
-
- public void addListener(final KafkaExtractionNamespace kafkaNamespace, final Map map)
- {
- final String topic = kafkaNamespace.getKafkaTopic();
- final String namespace = kafkaNamespace.getNamespace();
- final ListenableFuture> future = executorService.submit(
- new Runnable()
- {
- @Override
- public void run()
- {
- final Properties privateProperties = new Properties();
- privateProperties.putAll(kafkaProperties);
- privateProperties.setProperty("group.id", UUID.randomUUID().toString());
- ConsumerConnector consumerConnector = new kafka.javaapi.consumer.ZookeeperConsumerConnector(
- new ConsumerConfig(
- privateProperties
- )
- );
- List> streams = consumerConnector.createMessageStreamsByFilter(
- new Whitelist(Pattern.quote(topic)), 1, defaultStringDecoder, defaultStringDecoder
- );
-
- if (streams == null || streams.isEmpty()) {
- throw new IAE("Topic [%s] had no streams", topic);
- }
- if (streams.size() > 1) {
- throw new ISE("Topic [%s] has %d streams! expected 1", topic, streams.size());
- }
- backgroundTaskCount.incrementAndGet();
- final KafkaStream kafkaStream = streams.get(0);
- final ConsumerIterator it = kafkaStream.iterator();
- log.info("Listening to topic [%s] for namespace [%s]", topic, namespace);
- AtomicLong eventCounter = topicEvents.get(namespace);
- if(eventCounter == null){
- topicEvents.putIfAbsent(namespace, new AtomicLong(0L));
- eventCounter = topicEvents.get(namespace);
- }
- while (it.hasNext()) {
- final MessageAndMetadata messageAndMetadata = it.next();
- final String key = messageAndMetadata.key();
- final String message = messageAndMetadata.message();
- if (key == null || message == null) {
- log.error("Bad key/message from topic [%s]: [%s]", topic, messageAndMetadata);
- continue;
- }
- map.put(key, message);
- namespaceVersionMap.put(namespace, Long.toString(eventCounter.incrementAndGet()));
- log.debug("Placed key[%s] val[%s]", key, message);
- }
- }
- }
- );
- Futures.addCallback(
- future, new FutureCallback()
- {
- @Override
- public void onSuccess(Object result)
- {
- topicEvents.remove(namespace);
- }
-
- @Override
- public void onFailure(Throwable t)
- {
- topicEvents.remove(namespace);
- if (t instanceof java.util.concurrent.CancellationException) {
- log.warn("Cancelled rename task for topic [%s]", topic);
- } else {
- Throwables.propagate(t);
- }
- }
- },
- MoreExecutors.sameThreadExecutor()
- );
- }
-
- @LifecycleStart
- public void start()
- {
- // NO-OP
- // all consumers are started through KafkaExtractionNamespaceFactory.getCachePopulator
- }
-
- @LifecycleStop
- public void stop()
- {
- executorService.shutdown();
- Futures.allAsList(futures).cancel(true);
- }
-}
diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionNamespaceFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionNamespaceFactory.java
deleted file mode 100644
index 182afc90874..00000000000
--- a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionNamespaceFactory.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to Metamarkets Group Inc. (Metamarkets) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Metamarkets licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package io.druid.server.namespace;
-
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.inject.Inject;
-import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
-import io.druid.query.extraction.namespace.KafkaExtractionNamespace;
-
-import javax.annotation.Nullable;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-/**
- *
- */
-public class KafkaExtractionNamespaceFactory implements ExtractionNamespaceFunctionFactory
-{
- private final KafkaExtractionManager kafkaExtractionManager;
- private static final String KAFKA_VERSION = "kafka versions are updated every time a new event comes in";
-
- @Inject
- public KafkaExtractionNamespaceFactory(
- final KafkaExtractionManager kafkaExtractionManager
- )
- {
- this.kafkaExtractionManager = kafkaExtractionManager;
- }
-
-
- @Override
- public Function buildFn(KafkaExtractionNamespace extractionNamespace, final Map cache)
- {
- return new Function()
- {
- @Nullable
- @Override
- public String apply(String input)
- {
- if (Strings.isNullOrEmpty(input)) {
- return null;
- }
- return Strings.emptyToNull(cache.get(input));
- }
- };
- }
-
- @Override
- public Function> buildReverseFn(
- KafkaExtractionNamespace extractionNamespace, final Map cache
- )
- {
- return new Function>()
- {
- @Nullable
- @Override
- public List apply(@Nullable final String value)
- {
- return Lists.newArrayList(Maps.filterKeys(cache, new Predicate()
- {
- @Override public boolean apply(@Nullable String key)
- {
- return cache.get(key).equals(Strings.nullToEmpty(value));
- }
- }).keySet());
- }
- };
- }
-
- // This only fires ONCE when the namespace is first added. The version is updated externally as events come in
- @Override
- public Callable getCachePopulator(
- final KafkaExtractionNamespace extractionNamespace,
- final String unused,
- final Map cache
- )
- {
- return new Callable()
- {
- @Override
- public String call()
- {
- kafkaExtractionManager.addListener(extractionNamespace, cache);
- return KAFKA_VERSION;
- }
- };
- }
-}
diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionNamespaceModule.java b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionNamespaceModule.java
deleted file mode 100644
index 4a756baf591..00000000000
--- a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionNamespaceModule.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to Metamarkets Group Inc. (Metamarkets) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Metamarkets licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package io.druid.server.namespace;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Binder;
-import com.google.inject.Provides;
-import com.google.inject.name.Named;
-import io.druid.guice.LazySingleton;
-import io.druid.guice.LifecycleModule;
-import io.druid.guice.annotations.Json;
-import io.druid.initialization.DruidModule;
-import io.druid.query.extraction.namespace.KafkaExtractionNamespace;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- *
- */
-public class KafkaExtractionNamespaceModule implements DruidModule
-{
- private static final String PROPERTIES_KEY = "druid.query.rename.kafka.properties";
-
- @Override
- public List extends Module> getJacksonModules()
- {
- return ImmutableList.of(
- new SimpleModule("kafka-lookups"){
- @Override
- public void setupModule(SetupContext setupContext)
- {
- setupContext.registerSubtypes(KafkaExtractionNamespace.class);
- super.setupModule(setupContext);
- }
- }
- );
- }
-
- @Provides
- @Named("renameKafkaProperties")
- @LazySingleton
- public Properties getProperties(
- @Json ObjectMapper mapper,
- Properties systemProperties
- )
- {
- String val = systemProperties.getProperty(PROPERTIES_KEY);
- if (val == null) {
- return new Properties();
- }
- try {
- final Properties properties = new Properties();
- properties.putAll(
- mapper.>readValue(
- val, new TypeReference>()
- {
- }
- )
- );
- return properties;
- }
- catch (IOException e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Provides
- @LazySingleton
- public KafkaExtractionNamespaceFactory factoryFactory(
- KafkaExtractionManager kafkaManager
- )
- {
- return new KafkaExtractionNamespaceFactory(kafkaManager);
- }
-
- @Override
- public void configure(Binder binder)
- {
- LifecycleModule.register(binder, KafkaExtractionManager.class);
- NamespacedExtractionModule
- .getNamespaceFactoryMapBinder(binder)
- .addBinding(KafkaExtractionNamespace.class)
- .to(KafkaExtractionNamespaceFactory.class)
- .in(LazySingleton.class);
- }
-}
diff --git a/extensions-core/kafka-extraction-namespace/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/kafka-extraction-namespace/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
index 6a1d6a2e05c..48fadfc15c4 100644
--- a/extensions-core/kafka-extraction-namespace/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
+++ b/extensions-core/kafka-extraction-namespace/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -16,4 +16,4 @@
# specific language governing permissions and limitations
# under the License.
#
-io.druid.server.namespace.KafkaExtractionNamespaceModule
+io.druid.query.lookup.KafkaExtractionNamespaceModule
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/KafkaExtractionNamespaceTest.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/KafkaExtractionNamespaceTest.java
deleted file mode 100644
index d95103c6871..00000000000
--- a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/KafkaExtractionNamespaceTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to Metamarkets Group Inc. (Metamarkets) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Metamarkets licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package io.druid.query.extraction.namespace;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.druid.jackson.DefaultObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-
-/**
- *
- */
-public class KafkaExtractionNamespaceTest
-{
- @Test
- public void testReflectiveSerde() throws IOException
- {
- ObjectMapper mapper = new DefaultObjectMapper();
- mapper.registerSubtypes(KafkaExtractionNamespace.class);
- final String val = "{\"type\":\"kafka\",\"kafkaTopic\":\"testTopic\",\"namespace\":\"testNamespace\"}";
- final ExtractionNamespace fn =
- mapper.reader(ExtractionNamespace.class)
- .readValue(
- val
- );
- Assert.assertEquals(val, mapper.writeValueAsString(fn));
- }
-
- @Test(expected = com.fasterxml.jackson.databind.JsonMappingException.class)
- public void testMissingTopic() throws IOException
- {
- ObjectMapper mapper = new DefaultObjectMapper();
- mapper.registerSubtypes(KafkaExtractionNamespace.class);
- final String val = "{\"type\":\"kafka\",\"namespace\":\"testNamespace\"}";
- final ExtractionNamespace fn =
- mapper.reader(ExtractionNamespace.class)
- .readValue(
- val
- );
- Assert.assertEquals(val, mapper.writeValueAsString(fn));
- }
-
- @Test(expected = com.fasterxml.jackson.databind.JsonMappingException.class)
- public void testMissingNamespace() throws IOException
- {
- ObjectMapper mapper = new DefaultObjectMapper();
- mapper.registerSubtypes(KafkaExtractionNamespace.class);
- final String val = "{\"type\":\"kafka\",\"kafkaTopic\":\"testTopic\"}";
- final ExtractionNamespace fn =
- mapper.reader(ExtractionNamespace.class)
- .readValue(
- val
- );
- Assert.assertEquals(val, mapper.writeValueAsString(fn));
- }
-}
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/TestKafkaExtractionCluster.java
deleted file mode 100644
index 38b9df8c2b8..00000000000
--- a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/TestKafkaExtractionCluster.java
+++ /dev/null
@@ -1,445 +0,0 @@
-/*
- * Licensed to Metamarkets Group Inc. (Metamarkets) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Metamarkets licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package io.druid.query.extraction.namespace;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.io.Files;
-import com.google.inject.Binder;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.google.inject.Module;
-import com.google.inject.Provider;
-import com.google.inject.TypeLiteral;
-import com.google.inject.name.Names;
-import com.metamx.common.ISE;
-import com.metamx.common.StringUtils;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.common.logger.Logger;
-import io.druid.guice.GuiceInjectors;
-import io.druid.guice.annotations.Json;
-import io.druid.initialization.Initialization;
-import io.druid.server.namespace.KafkaExtractionManager;
-import io.druid.server.namespace.KafkaExtractionNamespaceFactory;
-import io.druid.server.namespace.KafkaExtractionNamespaceModule;
-import io.druid.server.namespace.NamespacedExtractionModule;
-import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
-import kafka.admin.AdminUtils;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingServer;
-import org.apache.zookeeper.CreateMode;
-import org.joda.time.DateTime;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- *
- */
-public class TestKafkaExtractionCluster
-{
- private static final Logger log = new Logger(TestKafkaExtractionCluster.class);
-
- private static final Lifecycle lifecycle = new Lifecycle();
- private static final File tmpDir = Files.createTempDir();
- private static final String topicName = "testTopic";
- private static final String namespace = "testNamespace";
- private static final Properties kafkaProperties = new Properties();
-
- private KafkaServer kafkaServer;
- private KafkaConfig kafkaConfig;
- private TestingServer zkTestServer;
- private ZkClient zkClient;
- private KafkaExtractionManager renameManager;
- private NamespaceExtractionCacheManager extractionCacheManager;
- private Injector injector;
-
- public static class KafkaFactoryProvider implements Provider>
- {
- private final KafkaExtractionManager kafkaExtractionManager;
-
- @Inject
- public KafkaFactoryProvider(
- KafkaExtractionManager kafkaExtractionManager
- )
- {
- this.kafkaExtractionManager = kafkaExtractionManager;
- }
-
- @Override
- public ExtractionNamespaceFunctionFactory> get()
- {
- return new KafkaExtractionNamespaceFactory(kafkaExtractionManager);
- }
- }
-
- @Before
- public void setUp() throws Exception
- {
- zkTestServer = new TestingServer(-1, new File(tmpDir.getAbsolutePath() + "/zk"), true);
- zkTestServer.start();
-
- zkClient = new ZkClient(
- zkTestServer.getConnectString(),
- 10000,
- 10000,
- ZKStringSerializer$.MODULE$
- );
- if (!zkClient.exists("/kafka")) {
- zkClient.create("/kafka", null, CreateMode.PERSISTENT);
- }
-
-
- log.info("---------------------------Started ZK---------------------------");
-
-
- final Properties serverProperties = new Properties();
- serverProperties.putAll(kafkaProperties);
- serverProperties.put("broker.id", "0");
- serverProperties.put("log.dir", tmpDir.getAbsolutePath() + "/log");
- serverProperties.put("log.cleaner.enable", "true");
- serverProperties.put("host.name", "127.0.0.1");
- serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + "/kafka");
- serverProperties.put("zookeeper.session.timeout.ms", "10000");
- serverProperties.put("zookeeper.sync.time.ms", "200");
-
- kafkaConfig = new KafkaConfig(serverProperties);
-
- final long time = DateTime.parse("2015-01-01").getMillis();
- kafkaServer = new KafkaServer(
- kafkaConfig,
- new Time()
- {
-
- @Override
- public long milliseconds()
- {
- return time;
- }
-
- @Override
- public long nanoseconds()
- {
- return milliseconds() * 1_000_000;
- }
-
- @Override
- public void sleep(long ms)
- {
- try {
- Thread.sleep(ms);
- }
- catch (InterruptedException e) {
- throw Throwables.propagate(e);
- }
- }
- }
- );
- kafkaServer.startup();
-
- int sleepCount = 0;
-
- while (!kafkaServer.kafkaController().isActive()) {
- Thread.sleep(100);
- if (++sleepCount > 10) {
- throw new InterruptedException("Controller took to long to awaken");
- }
- }
-
- log.info("---------------------------Started Kafka Server---------------------------");
-
- ZkClient zkClient = new ZkClient(
- zkTestServer.getConnectString() + "/kafka", 10000, 10000,
- ZKStringSerializer$.MODULE$
- );
-
- try {
- final Properties topicProperties = new Properties();
- topicProperties.put("cleanup.policy", "compact");
- if (!AdminUtils.topicExists(zkClient, topicName)) {
- AdminUtils.createTopic(zkClient, topicName, 1, 1, topicProperties);
- }
-
- log.info("---------------------------Created topic---------------------------");
-
- Assert.assertTrue(AdminUtils.topicExists(zkClient, topicName));
- }
- finally {
- zkClient.close();
- }
-
- final Properties kafkaProducerProperties = makeProducerProperties();
- Producer producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
-
- try {
- producer.send(
- new KeyedMessage<>(
- topicName,
- StringUtils.toUtf8("abcdefg"),
- StringUtils.toUtf8("abcdefg")
- )
- );
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
- finally {
- producer.close();
- }
-
- System.setProperty("druid.extensions.searchCurrentClassloader", "false");
-
- injector = Initialization.makeInjectorWithModules(
- GuiceInjectors.makeStartupInjectorWithModules(
- ImmutableList.of()
- ),
- ImmutableList.of(
- new Module()
- {
- @Override
- public void configure(Binder binder)
- {
- binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
- binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
- }
- },
- new NamespacedExtractionModule(),
- new KafkaExtractionNamespaceModule()
- {
- @Override
- public Properties getProperties(
- @Json ObjectMapper mapper,
- Properties systemProperties
- )
- {
- final Properties consumerProperties = new Properties(kafkaProperties);
- consumerProperties.put("zookeeper.connect", zkTestServer.getConnectString() + "/kafka");
- consumerProperties.put("zookeeper.session.timeout.ms", "10000");
- consumerProperties.put("zookeeper.sync.time.ms", "200");
- return consumerProperties;
- }
- }
- )
- );
- renameManager = injector.getInstance(KafkaExtractionManager.class);
-
- log.info("--------------------------- placed default item via producer ---------------------------");
- extractionCacheManager = injector.getInstance(NamespaceExtractionCacheManager.class);
- extractionCacheManager.schedule(
- new KafkaExtractionNamespace(topicName, namespace)
- );
-
- long start = System.currentTimeMillis();
- while (renameManager.getBackgroundTaskCount() < 1) {
- Thread.sleep(100); // wait for map populator to start up
- if (System.currentTimeMillis() > start + 60_000) {
- throw new ISE("renameManager took too long to start");
- }
- }
- log.info("--------------------------- started rename manager ---------------------------");
- }
-
- @After
- public void tearDown() throws Exception
- {
-
- lifecycle.stop();
- if (null != renameManager) {
- renameManager.stop();
- }
-
- if (null != kafkaServer) {
- kafkaServer.shutdown();
- kafkaServer.awaitShutdown();
- }
-
- if (null != zkClient) {
- if (zkClient.exists("/kafka")) {
- try {
- zkClient.deleteRecursive("/kafka");
- }
- catch (org.I0Itec.zkclient.exception.ZkException ex) {
- log.warn(ex, "error deleting /kafka zk node");
- }
- }
- zkClient.close();
- }
- if (null != zkTestServer) {
- zkTestServer.stop();
- }
- if (tmpDir.exists()) {
- FileUtils.deleteDirectory(tmpDir);
- }
- }
-
- private final Properties makeProducerProperties()
- {
- final Properties kafkaProducerProperties = new Properties();
- kafkaProducerProperties.putAll(kafkaProperties);
- kafkaProducerProperties.put(
- "metadata.broker.list",
- String.format("127.0.0.1:%d", kafkaServer.socketServer().port())
- );
- kafkaProperties.put("request.required.acks", "1");
- return kafkaProducerProperties;
- }
-
- private void checkServer()
- {
- if (!kafkaServer.apis().controller().isActive()) {
- throw new ISE("server is not active!");
- }
- }
-
- @Test(timeout = 60_000L)
- public void testSimpleRename() throws InterruptedException
- {
- final Properties kafkaProducerProperties = makeProducerProperties();
- final Producer producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
-
- try {
- checkServer();
-
- final ConcurrentMap> fnFn =
- injector.getInstance(
- Key.get(
- new TypeLiteral>>()
- {
- },
- Names.named("namespaceExtractionFunctionCache")
- )
- );
-
- final ConcurrentMap>> reverseFn =
- injector.getInstance(
- Key.get(
- new TypeLiteral>>>()
- {
- },
- Names.named("namespaceReverseExtractionFunctionCache")
- )
- );
-
- KafkaExtractionNamespace extractionNamespace = new KafkaExtractionNamespace(topicName, namespace);
-
- assertUpdated(null, extractionNamespace.getNamespace(), "foo", fnFn);
- assertReverseUpdated(Collections.EMPTY_LIST, extractionNamespace.getNamespace(), "foo", reverseFn);
-
- long events = renameManager.getNumEvents(namespace);
-
- log.info("------------------------- Sending foo bar -------------------------------");
- producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
-
- long start = System.currentTimeMillis();
- while (events == renameManager.getNumEvents(namespace)) {
- Thread.sleep(100);
- if (System.currentTimeMillis() > start + 60_000) {
- throw new ISE("Took too long to update event");
- }
- }
-
- log.info("------------------------- Checking foo bar -------------------------------");
- assertUpdated("bar", extractionNamespace.getNamespace(), "foo", fnFn);
- assertReverseUpdated(Arrays.asList("foo"), extractionNamespace.getNamespace(), "bar", reverseFn);
- assertUpdated(null, extractionNamespace.getNamespace(), "baz", fnFn);
-
- checkServer();
- events = renameManager.getNumEvents(namespace);
-
- log.info("------------------------- Sending baz bat -------------------------------");
- producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
- while (events == renameManager.getNumEvents(namespace)) {
- Thread.sleep(10);
- if (System.currentTimeMillis() > start + 60_000) {
- throw new ISE("Took too long to update event");
- }
- }
-
- log.info("------------------------- Checking baz bat -------------------------------");
- Assert.assertEquals("bat", fnFn.get(extractionNamespace.getNamespace()).apply("baz"));
- Assert.assertEquals(Arrays.asList("baz"), reverseFn.get(extractionNamespace.getNamespace()).apply("bat"));
- }
- finally {
- producer.close();
- }
- }
-
- private void assertUpdated(
- String expected,
- String namespace,
- String key,
- ConcurrentMap> lookup
- )
- throws InterruptedException
- {
- Function extractionFn = lookup.get(namespace);
-
- if (expected == null) {
- while (extractionFn.apply(key) != null) {
- Thread.sleep(100);
- extractionFn = lookup.get(namespace);
- }
- } else {
- while (!expected.equals(extractionFn.apply(key))) {
- Thread.sleep(100);
- extractionFn = lookup.get(namespace);
- }
- }
-
- Assert.assertEquals("update check", expected, extractionFn.apply(key));
- }
-
- private void assertReverseUpdated(
- List expected,
- String namespace,
- String key,
- ConcurrentMap>> lookup
- )
- throws InterruptedException
- {
- Function> extractionFn = lookup.get(namespace);
-
- while (!extractionFn.apply(key).equals(expected)) {
- Thread.sleep(100);
- extractionFn = lookup.get(namespace);
- }
-
- Assert.assertEquals("update check", expected, extractionFn.apply(key));
- }
-}
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
new file mode 100644
index 00000000000..a8a40d0deaf
--- /dev/null
+++ b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.lookup;
+
+import com.fasterxml.jackson.databind.BeanProperty;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.metamx.common.StringUtils;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.consumer.TopicFilter;
+import kafka.javaapi.consumer.ConsumerConnector;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static io.druid.query.lookup.KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER;
+
+public class KafkaLookupExtractorFactoryTest
+{
+ private static final String TOPIC = "some_topic";
+ private static final Map DEFAULT_PROPERTIES = ImmutableMap.of(
+ "some.property", "some.value"
+ );
+ private final ObjectMapper mapper = new DefaultObjectMapper();
+ final NamespaceExtractionCacheManager cacheManager = EasyMock.createStrictMock(NamespaceExtractionCacheManager.class);
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Before
+ public void setUp()
+ {
+ mapper.setInjectableValues(new InjectableValues()
+ {
+ @Override
+ public Object findInjectableValue(
+ Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
+ )
+ {
+ if ("io.druid.server.namespace.cache.NamespaceExtractionCacheManager".equals(valueId)) {
+ return cacheManager;
+ } else {
+ return null;
+ }
+ }
+ });
+ }
+
+ @Test
+ public void testSimpleSerDe() throws Exception
+ {
+ final KafkaLookupExtractorFactory expected = new KafkaLookupExtractorFactory(null, TOPIC, DEFAULT_PROPERTIES);
+ final KafkaLookupExtractorFactory result = mapper.readValue(
+ mapper.writeValueAsString(expected),
+ KafkaLookupExtractorFactory.class
+ );
+ Assert.assertEquals(expected.getKafkaTopic(), result.getKafkaTopic());
+ Assert.assertEquals(expected.getKafkaProperties(), result.getKafkaProperties());
+ Assert.assertEquals(cacheManager, result.getCacheManager());
+ Assert.assertEquals(0, expected.getCompletedEventCount());
+ Assert.assertEquals(0, result.getCompletedEventCount());
+ }
+
+ @Test
+ public void testCacheKeyScramblesOnNewData()
+ {
+ final int n = 1000;
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ DEFAULT_PROPERTIES
+ );
+ factory.getMapRef().set(ImmutableMap.of());
+ final AtomicLong events = factory.getDoubleEventCount();
+
+ final LookupExtractor extractor = factory.get();
+
+ final List byteArrays = new ArrayList<>(n);
+ for (int i = 0; i < n; ++i) {
+ final byte[] myKey = extractor.getCacheKey();
+ // Not terribly efficient.. but who cares
+ for (byte[] byteArray : byteArrays) {
+ Assert.assertFalse(Arrays.equals(byteArray, myKey));
+ }
+ byteArrays.add(myKey);
+ events.incrementAndGet();
+ }
+ Assert.assertEquals(n, byteArrays.size());
+ }
+
+ @Test
+ public void testCacheKeyScramblesDifferentStarts()
+ {
+ final int n = 1000;
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ DEFAULT_PROPERTIES
+ );
+ factory.getMapRef().set(ImmutableMap.of());
+ final AtomicLong events = factory.getDoubleEventCount();
+
+ final List byteArrays = new ArrayList<>(n);
+ for (int i = 0; i < n; ++i) {
+ final LookupExtractor extractor = factory.get();
+ final byte[] myKey = extractor.getCacheKey();
+ // Not terribly efficient.. but who cares
+ for (byte[] byteArray : byteArrays) {
+ Assert.assertFalse(Arrays.equals(byteArray, myKey));
+ }
+ byteArrays.add(myKey);
+ events.incrementAndGet();
+ }
+ Assert.assertEquals(n, byteArrays.size());
+ }
+
+ @Test
+ public void testCacheKeySameOnNoChange()
+ {
+ final int n = 1000;
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ DEFAULT_PROPERTIES
+ );
+ factory.getMapRef().set(ImmutableMap.of());
+
+ final LookupExtractor extractor = factory.get();
+
+ final byte[] baseKey = extractor.getCacheKey();
+ for (int i = 0; i < n; ++i) {
+ Assert.assertArrayEquals(baseKey, factory.get().getCacheKey());
+ }
+ }
+
+ @Test
+ public void testCacheKeyDifferentForTopics()
+ {
+ final KafkaLookupExtractorFactory factory1 = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ DEFAULT_PROPERTIES
+ );
+ factory1.getMapRef().set(ImmutableMap.of());
+ final KafkaLookupExtractorFactory factory2 = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC + "b",
+ DEFAULT_PROPERTIES
+ );
+ factory2.getMapRef().set(ImmutableMap.of());
+
+ Assert.assertFalse(Arrays.equals(factory1.get().getCacheKey(), factory2.get().getCacheKey()));
+ }
+
+ @Test
+ public void testReplaces()
+ {
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ DEFAULT_PROPERTIES
+ );
+ Assert.assertTrue(factory.replaces(new MapLookupExtractorFactory(ImmutableMap.of(), false)));
+ Assert.assertFalse(factory.replaces(factory));
+ Assert.assertFalse(factory.replaces(new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ DEFAULT_PROPERTIES
+ )));
+
+ Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC + "b",
+ DEFAULT_PROPERTIES
+ )));
+
+ Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ ImmutableMap.of("some.property", "some.other.value")
+ )));
+
+ Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ ImmutableMap.of("some.other.property", "some.value")
+ )));
+
+ Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ DEFAULT_PROPERTIES,
+ 1,
+ false
+ )));
+
+ Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ DEFAULT_PROPERTIES,
+ 0,
+ true
+ )));
+ }
+
+ @Test
+ public void testStopWithoutStart()
+ {
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ DEFAULT_PROPERTIES
+ );
+ Assert.assertTrue(factory.close());
+ }
+
+ @Test
+ public void testStartStop()
+ {
+ final KafkaStream kafkaStream = EasyMock.createStrictMock(KafkaStream.class);
+ final ConsumerIterator consumerIterator = EasyMock.createStrictMock(ConsumerIterator.class);
+ final ConsumerConnector consumerConnector = EasyMock.createStrictMock(ConsumerConnector.class);
+ EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
+ EasyMock.anyObject(TopicFilter.class),
+ EasyMock.anyInt(),
+ EasyMock.eq(
+ DEFAULT_STRING_DECODER),
+ EasyMock.eq(DEFAULT_STRING_DECODER)
+ )).andReturn(ImmutableList.of(kafkaStream)).once();
+ EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
+ EasyMock.expect(consumerIterator.hasNext()).andReturn(false).anyTimes();
+ EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
+ .andReturn(new ConcurrentHashMap())
+ .once();
+ EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
+ consumerConnector.shutdown();
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ ImmutableMap.of("zookeeper.connect", "localhost"),
+ 10_000L,
+ false
+ )
+ {
+ @Override
+ ConsumerConnector buildConnector(Properties properties)
+ {
+ return consumerConnector;
+ }
+ };
+ Assert.assertTrue(factory.start());
+ Assert.assertTrue(factory.close());
+ Assert.assertTrue(factory.getFuture().isDone());
+ EasyMock.verify(cacheManager);
+ }
+
+
+ @Test
+ public void testStartFailsFromTimeout() throws Exception
+ {
+ EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
+ .andReturn(new ConcurrentHashMap())
+ .once();
+ EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
+ EasyMock.replay(cacheManager);
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ ImmutableMap.of("zookeeper.connect", "localhost"),
+ 1,
+ false
+ )
+ {
+ @Override
+ ConsumerConnector buildConnector(Properties properties)
+ {
+ // Lock up
+ try {
+ Thread.currentThread().join();
+ }
+ catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ throw new RuntimeException("shouldn't make it here");
+ }
+ };
+ Assert.assertFalse(factory.start());
+ Assert.assertTrue(factory.getFuture().isDone());
+ Assert.assertTrue(factory.getFuture().isCancelled());
+ EasyMock.verify(cacheManager);
+ }
+
+ @Test
+ public void testStopDeleteError()
+ {
+ final KafkaStream kafkaStream = EasyMock.createStrictMock(KafkaStream.class);
+ final ConsumerIterator consumerIterator = EasyMock.createStrictMock(ConsumerIterator.class);
+ final ConsumerConnector consumerConnector = EasyMock.createStrictMock(ConsumerConnector.class);
+ EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
+ EasyMock.anyObject(TopicFilter.class),
+ EasyMock.anyInt(),
+ EasyMock.eq(
+ DEFAULT_STRING_DECODER),
+ EasyMock.eq(DEFAULT_STRING_DECODER)
+ )).andReturn(ImmutableList.of(kafkaStream)).once();
+ EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
+ EasyMock.expect(consumerIterator.hasNext()).andReturn(false).anyTimes();
+ EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
+ .andReturn(new ConcurrentHashMap())
+ .once();
+ EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(false).once();
+
+ EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ ImmutableMap.of("zookeeper.connect", "localhost")
+ )
+ {
+ @Override
+ ConsumerConnector buildConnector(Properties properties)
+ {
+ return consumerConnector;
+ }
+ };
+ Assert.assertTrue(factory.start());
+ Assert.assertFalse(factory.close());
+ EasyMock.verify(cacheManager, kafkaStream, consumerConnector, consumerIterator);
+ }
+
+
+ @Test
+ public void testStartStopStart()
+ {
+ final KafkaStream kafkaStream = EasyMock.createStrictMock(KafkaStream.class);
+ final ConsumerIterator consumerIterator = EasyMock.createStrictMock(ConsumerIterator.class);
+ final ConsumerConnector consumerConnector = EasyMock.createStrictMock(ConsumerConnector.class);
+ EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
+ EasyMock.anyObject(TopicFilter.class),
+ EasyMock.anyInt(),
+ EasyMock.eq(
+ DEFAULT_STRING_DECODER),
+ EasyMock.eq(DEFAULT_STRING_DECODER)
+ )).andReturn(ImmutableList.of(kafkaStream)).once();
+ EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
+ EasyMock.expect(consumerIterator.hasNext()).andReturn(false).anyTimes();
+ EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
+ .andReturn(new ConcurrentHashMap())
+ .once();
+ EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
+ consumerConnector.shutdown();
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ ImmutableMap.of("zookeeper.connect", "localhost")
+ )
+ {
+ @Override
+ ConsumerConnector buildConnector(Properties properties)
+ {
+ return consumerConnector;
+ }
+ };
+ Assert.assertTrue(factory.start());
+ Assert.assertTrue(factory.close());
+ Assert.assertFalse(factory.start());
+ EasyMock.verify(cacheManager);
+ }
+
+ @Test
+ public void testStartStartStop()
+ {
+ final KafkaStream kafkaStream = EasyMock.createStrictMock(KafkaStream.class);
+ final ConsumerIterator consumerIterator = EasyMock.createStrictMock(ConsumerIterator.class);
+ final ConsumerConnector consumerConnector = EasyMock.createStrictMock(ConsumerConnector.class);
+ EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
+ EasyMock.anyObject(TopicFilter.class),
+ EasyMock.anyInt(),
+ EasyMock.eq(
+ DEFAULT_STRING_DECODER),
+ EasyMock.eq(DEFAULT_STRING_DECODER)
+ )).andReturn(ImmutableList.of(kafkaStream)).once();
+ EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
+ EasyMock.expect(consumerIterator.hasNext()).andReturn(false).anyTimes();
+ EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
+ .andReturn(new ConcurrentHashMap())
+ .once();
+ EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
+ consumerConnector.shutdown();
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ ImmutableMap.of("zookeeper.connect", "localhost"),
+ 10_000L,
+ false
+ )
+ {
+ @Override
+ ConsumerConnector buildConnector(Properties properties)
+ {
+ return consumerConnector;
+ }
+ };
+ Assert.assertTrue(factory.start());
+ Assert.assertTrue(factory.start());
+ Assert.assertTrue(factory.close());
+ Assert.assertTrue(factory.close());
+ EasyMock.verify(cacheManager);
+ }
+
+ @Test
+ public void testStartFailsOnMissingConnect()
+ {
+ expectedException.expectMessage("zookeeper.connect required property");
+ EasyMock.replay(cacheManager);
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ ImmutableMap.of()
+ );
+ Assert.assertTrue(factory.start());
+ Assert.assertTrue(factory.close());
+ EasyMock.verify(cacheManager);
+ }
+
+ @Test
+ public void testStartFailsOnGroupID()
+ {
+ expectedException.expectMessage(
+ "Cannot set kafka property [group.id]. Property is randomly generated for you. Found");
+ EasyMock.replay(cacheManager);
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ ImmutableMap.of("group.id", "make me fail")
+ );
+ Assert.assertTrue(factory.start());
+ Assert.assertTrue(factory.close());
+ EasyMock.verify(cacheManager);
+ }
+
+ @Test
+ public void testStartFailsOnAutoOffset()
+ {
+ expectedException.expectMessage(
+ "Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found ");
+ EasyMock.replay(cacheManager);
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ ImmutableMap.of("auto.offset.reset", "make me fail")
+ );
+ Assert.assertTrue(factory.start());
+ Assert.assertTrue(factory.close());
+ EasyMock.verify(cacheManager);
+ }
+
+ @Test
+ public void testFailsGetNotStarted()
+ {
+ expectedException.expectMessage("Not started");
+ new KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ DEFAULT_PROPERTIES
+ ).get();
+ }
+
+ @Test
+ public void testDefaultDecoder()
+ {
+ final String str = "some string";
+ Assert.assertEquals(str, DEFAULT_STRING_DECODER.fromBytes(StringUtils.toUtf8(str)));
+ }
+}
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
new file mode 100644
index 00000000000..bcd1aebe9bb
--- /dev/null
+++ b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.lookup;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Closer;
+import com.google.inject.Binder;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.name.Names;
+import com.metamx.common.ISE;
+import com.metamx.common.StringUtils;
+import com.metamx.common.logger.Logger;
+import io.druid.guice.GuiceInjectors;
+import io.druid.initialization.Initialization;
+import io.druid.server.namespace.NamespacedExtractionModule;
+import kafka.admin.AdminUtils;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.curator.test.TestingServer;
+import org.apache.zookeeper.CreateMode;
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ *
+ */
+public class TestKafkaExtractionCluster
+{
+ private static final Logger log = new Logger(TestKafkaExtractionCluster.class);
+ private static final String topicName = "testTopic";
+ private static final Map kafkaProperties = new HashMap<>();
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private final Closer closer = Closer.create();
+
+ private KafkaServer kafkaServer;
+ private KafkaConfig kafkaConfig;
+ private TestingServer zkTestServer;
+ private ZkClient zkClient;
+ private Injector injector;
+ private ObjectMapper mapper;
+ private KafkaLookupExtractorFactory factory;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ zkTestServer = new TestingServer(-1, temporaryFolder.newFolder(), true);
+ zkTestServer.start();
+
+ closer.register(new Closeable()
+ {
+ @Override
+ public void close() throws IOException
+ {
+ zkTestServer.stop();
+ }
+ });
+
+ zkClient = new ZkClient(
+ zkTestServer.getConnectString(),
+ 10000,
+ 10000,
+ ZKStringSerializer$.MODULE$
+ );
+ closer.register(new Closeable()
+ {
+ @Override
+ public void close() throws IOException
+ {
+ zkClient.close();
+ }
+ });
+ if (!zkClient.exists("/kafka")) {
+ zkClient.create("/kafka", null, CreateMode.PERSISTENT);
+ }
+
+ log.info("---------------------------Started ZK---------------------------");
+
+ final String zkKafkaPath = "/kafka";
+
+ final Properties serverProperties = new Properties();
+ serverProperties.putAll(kafkaProperties);
+ serverProperties.put("broker.id", "0");
+ serverProperties.put("log.dir", temporaryFolder.newFolder().getAbsolutePath());
+ serverProperties.put("log.cleaner.enable", "true");
+ serverProperties.put("host.name", "127.0.0.1");
+ serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
+ serverProperties.put("zookeeper.session.timeout.ms", "10000");
+ serverProperties.put("zookeeper.sync.time.ms", "200");
+
+ kafkaConfig = new KafkaConfig(serverProperties);
+
+ final long time = DateTime.parse("2015-01-01").getMillis();
+ kafkaServer = new KafkaServer(
+ kafkaConfig,
+ new Time()
+ {
+
+ @Override
+ public long milliseconds()
+ {
+ return time;
+ }
+
+ @Override
+ public long nanoseconds()
+ {
+ return milliseconds() * 1_000_000;
+ }
+
+ @Override
+ public void sleep(long ms)
+ {
+ try {
+ Thread.sleep(ms);
+ }
+ catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+ );
+ kafkaServer.startup();
+ closer.register(new Closeable()
+ {
+ @Override
+ public void close() throws IOException
+ {
+ kafkaServer.shutdown();
+ kafkaServer.awaitShutdown();
+ }
+ });
+
+ int sleepCount = 0;
+
+ while (!kafkaServer.kafkaController().isActive()) {
+ Thread.sleep(100);
+ if (++sleepCount > 10) {
+ throw new InterruptedException("Controller took to long to awaken");
+ }
+ }
+
+ log.info("---------------------------Started Kafka Server---------------------------");
+
+ final ZkClient zkClient = new ZkClient(
+ zkTestServer.getConnectString() + zkKafkaPath, 10000, 10000,
+ ZKStringSerializer$.MODULE$
+ );
+
+ try (final AutoCloseable autoCloseable = new AutoCloseable()
+ {
+ @Override
+ public void close() throws Exception
+ {
+ if (zkClient.exists(zkKafkaPath)) {
+ try {
+ zkClient.deleteRecursive(zkKafkaPath);
+ }
+ catch (org.I0Itec.zkclient.exception.ZkException ex) {
+ log.warn(ex, "error deleting %s zk node", zkKafkaPath);
+ }
+ }
+ zkClient.close();
+ }
+ }) {
+ final Properties topicProperties = new Properties();
+ topicProperties.put("cleanup.policy", "compact");
+ if (!AdminUtils.topicExists(zkClient, topicName)) {
+ AdminUtils.createTopic(zkClient, topicName, 1, 1, topicProperties);
+ }
+
+ log.info("---------------------------Created topic---------------------------");
+
+ Assert.assertTrue(AdminUtils.topicExists(zkClient, topicName));
+ }
+
+ final Properties kafkaProducerProperties = makeProducerProperties();
+ final Producer producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
+ try (final AutoCloseable autoCloseable = new AutoCloseable()
+ {
+ @Override
+ public void close() throws Exception
+ {
+ producer.close();
+ }
+ }) {
+ producer.send(
+ new KeyedMessage<>(
+ topicName,
+ StringUtils.toUtf8("abcdefg"),
+ StringUtils.toUtf8("abcdefg")
+ )
+ );
+ }
+
+ System.setProperty("druid.extensions.searchCurrentClassloader", "false");
+
+ injector = Initialization.makeInjectorWithModules(
+ GuiceInjectors.makeStartupInjector(),
+ ImmutableList.of(
+ new Module()
+ {
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
+ binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
+ }
+ },
+ // These injections fail under IntelliJ but are required for maven
+ new NamespacedExtractionModule(),
+ new KafkaExtractionNamespaceModule()
+ )
+ );
+ mapper = injector.getInstance(ObjectMapper.class);
+
+ log.info("--------------------------- placed default item via producer ---------------------------");
+ final Map consumerProperties = new HashMap<>(kafkaProperties);
+ consumerProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
+ consumerProperties.put("zookeeper.session.timeout.ms", "10000");
+ consumerProperties.put("zookeeper.sync.time.ms", "200");
+
+ final KafkaLookupExtractorFactory kafkaLookupExtractorFactory = new KafkaLookupExtractorFactory(
+ null,
+ topicName,
+ consumerProperties
+ );
+
+ factory = (KafkaLookupExtractorFactory) mapper.readValue(
+ mapper.writeValueAsString(kafkaLookupExtractorFactory),
+ LookupExtractorFactory.class
+ );
+ Assert.assertEquals(kafkaLookupExtractorFactory.getKafkaTopic(), factory.getKafkaTopic());
+ Assert.assertEquals(kafkaLookupExtractorFactory.getKafkaProperties(), factory.getKafkaProperties());
+ factory.start();
+ closer.register(new Closeable()
+ {
+ @Override
+ public void close() throws IOException
+ {
+ factory.close();
+ }
+ });
+ log.info("--------------------------- started rename manager ---------------------------");
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ closer.close();
+ }
+
+ private final Properties makeProducerProperties()
+ {
+ final Properties kafkaProducerProperties = new Properties();
+ kafkaProducerProperties.putAll(kafkaProperties);
+ kafkaProducerProperties.put(
+ "metadata.broker.list",
+ String.format("127.0.0.1:%d", kafkaServer.socketServer().port())
+ );
+ kafkaProperties.put("request.required.acks", "1");
+ return kafkaProducerProperties;
+ }
+
+ private void checkServer()
+ {
+ if (!kafkaServer.apis().controller().isActive()) {
+ throw new ISE("server is not active!");
+ }
+ }
+
+ @Test(timeout = 60_000L)
+ public void testSimpleRename() throws InterruptedException
+ {
+ final Properties kafkaProducerProperties = makeProducerProperties();
+ final Producer producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
+ closer.register(new Closeable()
+ {
+ @Override
+ public void close() throws IOException
+ {
+ producer.close();
+ }
+ });
+ checkServer();
+
+ assertUpdated(null, "foo");
+ assertReverseUpdated(ImmutableList.of(), "foo");
+
+ long events = factory.getCompletedEventCount();
+
+ log.info("------------------------- Sending foo bar -------------------------------");
+ producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
+
+ long start = System.currentTimeMillis();
+ while (events == factory.getCompletedEventCount()) {
+ Thread.sleep(100);
+ if (System.currentTimeMillis() > start + 60_000) {
+ throw new ISE("Took too long to update event");
+ }
+ }
+
+ log.info("------------------------- Checking foo bar -------------------------------");
+ assertUpdated("bar", "foo");
+ assertReverseUpdated(Collections.singletonList("foo"), "bar");
+ assertUpdated(null, "baz");
+
+ checkServer();
+ events = factory.getCompletedEventCount();
+
+ log.info("------------------------- Sending baz bat -------------------------------");
+ producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
+ while (events == factory.getCompletedEventCount()) {
+ Thread.sleep(10);
+ if (System.currentTimeMillis() > start + 60_000) {
+ throw new ISE("Took too long to update event");
+ }
+ }
+
+ log.info("------------------------- Checking baz bat -------------------------------");
+ Assert.assertEquals("bat", factory.get().apply("baz"));
+ Assert.assertEquals(Collections.singletonList("baz"), factory.get().unapply("bat"));
+ }
+
+ private void assertUpdated(
+ String expected,
+ String key
+ )
+ throws InterruptedException
+ {
+ final LookupExtractor extractor = factory.get();
+ if (expected == null) {
+ while (extractor.apply(key) != null) {
+ Thread.sleep(100);
+ }
+ } else {
+ while (!expected.equals(extractor.apply(key))) {
+ Thread.sleep(100);
+ }
+ }
+
+ Assert.assertEquals("update check", expected, extractor.apply(key));
+ }
+
+ private void assertReverseUpdated(
+ List expected,
+ String key
+ )
+ throws InterruptedException
+ {
+ final LookupExtractor extractor = factory.get();
+
+ while (!expected.equals(extractor.unapply(key))) {
+ Thread.sleep(100);
+ }
+
+ Assert.assertEquals("update check", expected, extractor.unapply(key));
+ }
+}
diff --git a/extensions-core/kafka-extraction-namespace/src/test/resources/log4j2.xml b/extensions-core/kafka-extraction-namespace/src/test/resources/log4j2.xml
index 5e1ebff74c3..1601a94c87a 100644
--- a/extensions-core/kafka-extraction-namespace/src/test/resources/log4j2.xml
+++ b/extensions-core/kafka-extraction-namespace/src/test/resources/log4j2.xml
@@ -28,7 +28,7 @@
-
+