[QTL] Move kafka-extraction-namespace to the Lookup framework. (#2800)

* Move kafka-extraction-namespace to the Lookup framework.

* Address comments

* Fix missing kafka introspection

* Fix tests to be less racy

* Make testing a bit more leniant

* Make tests even more forgiving

* Add comments to kafka lookup cache method

* Move startStopLock to just use started

* Make start() and stop() idempotent

* Forgot to update test after last change, test now accounts for idempotency

* Add extra idempotency on stop check

* Add more descriptive docs of behavior
This commit is contained in:
Charles Allen 2016-05-02 09:45:13 -07:00
parent e1eb3b1d95
commit 54b717bdc3
14 changed files with 1409 additions and 1069 deletions

View File

@ -2,7 +2,7 @@
layout: doc_page
---
# Kafka Namespaced Lookup
# Kafka Lookups
<div class="note caution">
Lookups are an <a href="../experimental.html">experimental</a> feature.
@ -10,44 +10,34 @@ Lookups are an <a href="../experimental.html">experimental</a> feature.
Make sure to [include](../../operations/including-extensions.html) `druid-namespace-lookup` and `druid-kafka-extraction-namespace` as an extension.
Note that this lookup does not employ a `pollPeriod`.
If you need updates to populate as promptly as possible, it is possible to plug into a kafka topic whose key is the old value and message is the desired new value (both in UTF-8).
If you need updates to populate as promptly as possible, it is possible to plug into a kafka topic whose key is the old value and message is the desired new value (both in UTF-8) as a LookupExtractorFactory.
```json
{
"type":"kafka",
"namespace":"testTopic",
"kafkaTopic":"testTopic"
"kafkaTopic":"testTopic",
"kafkaProperties":{"zookeeper.connect","somehost:2181/kafka"}
}
```
|Parameter|Description|Required|Default|
|---------|-----------|--------|-------|
|`namespace`|The namespace to define|Yes||
|`kafkaTopic`|The kafka topic to read the data from|Yes||
## Kafka renames
|`kafkaProperties`|Kafka consumer properties. At least"zookeeper.connect" must be specified. Only the zookeeper connector is supported|Yes||
|`connectTimeout`|How long to wait for an initial connection|No|`0` (do not wait)|
|`isOneToOne`|The map is a one-to-one (see[Lookup DimensionSpecs](../querying/dimensionspecs.html))|No|`false`|
The extension `kafka-extraction-namespace` enables reading from a kafka feed which has name/key pairs to allow renaming of dimension values. An example use case would be to rename an ID to a human readable format.
Currently the historical node caches the key/value pairs from the kafka feed in an ephemeral memory mapped DB via MapDB.
The consumer properties `group.id` and `auto.offset.reset` CANNOT be set in `kafkaProperties` as they are set by the extension as `UUID.randomUUID().toString()` and `smallest` respectively.
## Configuration
See [lookups](../../querying/lookups.html) for how to configure and use lookups.
The following options are used to define the behavior and should be included wherever the extension is included (all query servicing nodes):
# Limitations
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.rename.kafka.properties`|A json map of kafka consumer properties. See below for special properties.|See below|
The following are the handling for kafka consumer properties in `druid.query.rename.kafka.properties`
|Property|Description|Default|
|--------|-----------|-------|
|`zookeeper.connect`|Zookeeper connection string|`localhost:2181/kafka`|
|`group.id`|Group ID, auto-assigned for publish-subscribe model and cannot be overridden|`UUID.randomUUID().toString()`|
|`auto.offset.reset`|Setting to get the entire kafka rename stream. Cannot be overridden|`smallest`|
Currently the Kafka lookup extractor feeds the entire kafka stream into a local cache. If you are using OnHeap caching, this can easily clobber your java heap if the kafka stream spews a lot of unique keys.
OffHeap caching should alleviate these concerns, but there is still a limit to the quantity of data that can be stored.
There is currently no eviction policy.
## Testing the Kafka rename functionality

View File

@ -98,5 +98,10 @@
<version>3.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,74 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction.namespace;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import javax.validation.constraints.NotNull;
/**
*
*/
@JsonTypeName("kafka")
public class KafkaExtractionNamespace implements ExtractionNamespace
{
@JsonProperty
private final String kafkaTopic;
@JsonProperty
private final String namespace;
@JsonCreator
public KafkaExtractionNamespace(
@NotNull @JsonProperty(value = "kafkaTopic", required = true) final String kafkaTopic,
@NotNull @JsonProperty(value = "namespace", required = true) final String namespace
)
{
Preconditions.checkNotNull(kafkaTopic, "kafkaTopic required");
Preconditions.checkNotNull(namespace, "namespace required");
this.kafkaTopic = kafkaTopic;
this.namespace = namespace;
}
public String getKafkaTopic()
{
return kafkaTopic;
}
@Override
public String getNamespace()
{
return namespace;
}
@Override
public long getPollMs()
{
return 0L;
}
@Override
public String toString()
{
return String.format("KafkaExtractionNamespace = { kafkaTopic = '%s', namespace = '%s'", kafkaTopic, namespace);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.lookup;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.List;
/**
*
*/
public class KafkaExtractionNamespaceModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.<Module>of(
new SimpleModule("kafka-lookups").registerSubtypes(
KafkaLookupExtractorFactory.class
)
);
}
@Override
public void configure(Binder binder)
{
// NOOP
}
}

View File

@ -0,0 +1,423 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.lookup;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.StringUtils;
import com.metamx.common.logger.Logger;
import io.druid.concurrent.Execs;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.validation.constraints.Min;
import javax.ws.rs.GET;
import javax.ws.rs.core.Response;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
@JsonTypeName("kafka")
public class KafkaLookupExtractorFactory implements LookupExtractorFactory
{
private static final Logger LOG = new Logger(KafkaLookupExtractorFactory.class);
static final Decoder<String> DEFAULT_STRING_DECODER = new Decoder<String>()
{
@Override
public String fromBytes(byte[] bytes)
{
return StringUtils.fromUtf8(bytes);
}
};
private final ListeningExecutorService executorService;
private final AtomicLong doubleEventCount = new AtomicLong(0L);
private final NamespaceExtractionCacheManager cacheManager;
private final String factoryId = UUID.randomUUID().toString();
private final AtomicReference<Map<String, String>> mapRef = new AtomicReference<>(null);
private final AtomicBoolean started = new AtomicBoolean(false);
private volatile ListenableFuture<?> future = null;
@JsonProperty
private final String kafkaTopic;
@JsonProperty
private final Map<String, String> kafkaProperties;
@JsonProperty
private final long connectTimeout;
@JsonProperty
private final boolean isOneToOne;
@JsonCreator
public KafkaLookupExtractorFactory(
@JacksonInject NamespaceExtractionCacheManager cacheManager,
@JsonProperty("kafkaTopic") final String kafkaTopic,
@JsonProperty("kafkaProperties") final Map<String, String> kafkaProperties,
@JsonProperty("connectTimeout") @Min(0) long connectTimeout,
@JsonProperty("isOneToOne") boolean isOneToOne
)
{
this.kafkaTopic = Preconditions.checkNotNull(kafkaTopic, "kafkaTopic required");
this.kafkaProperties = Preconditions.checkNotNull(kafkaProperties, "kafkaProperties required");
executorService = MoreExecutors.listeningDecorator(Execs.singleThreaded(
"kafka-factory-" + kafkaTopic + "-%s",
Thread.MIN_PRIORITY
));
this.cacheManager = cacheManager;
this.connectTimeout = connectTimeout;
this.isOneToOne = isOneToOne;
}
public KafkaLookupExtractorFactory(
NamespaceExtractionCacheManager cacheManager,
String kafkaTopic,
Map<String, String> kafkaProperties
)
{
this(cacheManager, kafkaTopic, kafkaProperties, 0, false);
}
public String getKafkaTopic()
{
return kafkaTopic;
}
public Map<String, String> getKafkaProperties()
{
return kafkaProperties;
}
public long getConnectTimeout()
{
return connectTimeout;
}
public boolean isOneToOne()
{
return isOneToOne;
}
@Override
public boolean start()
{
synchronized (started) {
if (started.get()) {
LOG.warn("Already started, not starting again");
return started.get();
}
if (executorService.isShutdown()) {
LOG.warn("Already shut down, not starting again");
return false;
}
final Properties kafkaProperties = new Properties();
kafkaProperties.putAll(getKafkaProperties());
if (kafkaProperties.containsKey("group.id")) {
throw new IAE(
"Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]",
kafkaProperties.getProperty("group.id")
);
}
if (kafkaProperties.containsKey("auto.offset.reset")) {
throw new IAE(
"Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]",
kafkaProperties.getProperty("auto.offset.reset")
);
}
Preconditions.checkNotNull(
kafkaProperties.getProperty("zookeeper.connect"),
"zookeeper.connect required property"
);
kafkaProperties.setProperty("group.id", factoryId);
final String topic = getKafkaTopic();
LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, factoryId);
final Map<String, String> map = cacheManager.getCacheMap(factoryId);
mapRef.set(map);
// Enable publish-subscribe
kafkaProperties.setProperty("auto.offset.reset", "smallest");
final CountDownLatch startingReads = new CountDownLatch(1);
final ListenableFuture<?> future = executorService.submit(
new Runnable()
{
@Override
public void run()
{
while (!executorService.isShutdown() && !Thread.currentThread().isInterrupted()) {
final ConsumerConnector consumerConnector = buildConnector(kafkaProperties);
try {
final List<KafkaStream<String, String>> streams = consumerConnector.createMessageStreamsByFilter(
new Whitelist(Pattern.quote(topic)), 1, DEFAULT_STRING_DECODER, DEFAULT_STRING_DECODER
);
if (streams == null || streams.isEmpty()) {
throw new IAE("Topic [%s] had no streams", topic);
}
if (streams.size() > 1) {
throw new ISE("Topic [%s] has %d streams! expected 1", topic, streams.size());
}
final KafkaStream<String, String> kafkaStream = streams.get(0);
startingReads.countDown();
for (final MessageAndMetadata<String, String> messageAndMetadata : kafkaStream) {
final String key = messageAndMetadata.key();
final String message = messageAndMetadata.message();
if (key == null || message == null) {
LOG.error("Bad key/message from topic [%s]: [%s]", topic, messageAndMetadata);
continue;
}
doubleEventCount.incrementAndGet();
map.put(key, message);
doubleEventCount.incrementAndGet();
LOG.trace("Placed key[%s] val[%s]", key, message);
}
}
catch (Exception e) {
LOG.error(e, "Error reading stream for topic [%s]", topic);
}
finally {
consumerConnector.shutdown();
}
}
}
}
);
Futures.addCallback(
future, new FutureCallback<Object>()
{
@Override
public void onSuccess(Object result)
{
LOG.debug("Success listening to [%s]", topic);
}
@Override
public void onFailure(Throwable t)
{
if (t instanceof CancellationException) {
LOG.debug("Topic [%s] cancelled", topic);
} else {
LOG.error(t, "Error in listening to [%s]", topic);
}
}
},
MoreExecutors.sameThreadExecutor()
);
this.future = future;
final Stopwatch stopwatch = Stopwatch.createStarted();
try {
while (!startingReads.await(100, TimeUnit.MILLISECONDS) && connectTimeout > 0L) {
// Don't return until we have actually connected
if (future.isDone()) {
future.get();
} else {
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > connectTimeout) {
throw new TimeoutException("Failed to connect to kafka in sufficient time");
}
}
}
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
if (!future.isDone() && !future.cancel(true) && !future.isDone()) {
LOG.warn("Could not cancel kafka listening thread");
}
LOG.error(e, "Failed to start kafka extraction factory");
cacheManager.delete(factoryId);
return false;
}
started.set(true);
return true;
}
}
// Overriden in tests
ConsumerConnector buildConnector(Properties properties)
{
return new kafka.javaapi.consumer.ZookeeperConsumerConnector(
new ConsumerConfig(properties)
);
}
@Override
public boolean close()
{
synchronized (started) {
if (!started.get() || executorService.isShutdown()) {
LOG.info("Already shutdown, ignoring");
return !started.get();
}
started.set(false);
executorService.shutdownNow();
final ListenableFuture<?> future = this.future;
if (future != null) {
if (!future.isDone() && !future.cancel(true) && !future.isDone()) {
LOG.error("Error cancelling future for topic [%s]", getKafkaTopic());
return false;
}
}
if (!cacheManager.delete(factoryId)) {
LOG.error("Error removing [%s] for topic [%s] from cache", factoryId, getKafkaTopic());
return false;
}
return true;
}
}
@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
if (this == other) {
return false;
}
if (other == null) {
return false;
}
if (getClass() != other.getClass()) {
return true;
}
final KafkaLookupExtractorFactory that = (KafkaLookupExtractorFactory) other;
return !(getKafkaTopic().equals(that.getKafkaTopic())
&& getKafkaProperties().equals(that.getKafkaProperties())
&& getConnectTimeout() == that.getConnectTimeout()
&& isOneToOne() == that.isOneToOne()
);
}
@Nullable
@Override
public LookupIntrospectHandler getIntrospectHandler()
{
return new KafkaLookupExtractorIntrospectionHandler();
}
@Override
public LookupExtractor get()
{
final Map<String, String> map = Preconditions.checkNotNull(mapRef.get(), "Not started");
final long startCount = doubleEventCount.get();
return new MapLookupExtractor(map, isOneToOne())
{
@Override
public byte[] getCacheKey()
{
final byte[] idutf8 = StringUtils.toUtf8(factoryId);
// If the number of things added has not changed during the course of this extractor's life, we can cache it
if (startCount == doubleEventCount.get()) {
return ByteBuffer
.allocate(idutf8.length + 1 + Longs.BYTES)
.put(idutf8)
.put((byte) 0xFF)
.putLong(startCount)
.array();
} else {
// If the number of things added HAS changed during the coruse of this extractor's life, we CANNOT cache
final byte[] scrambler = StringUtils.toUtf8(UUID.randomUUID().toString());
return ByteBuffer
.allocate(idutf8.length + 1 + scrambler.length + 1)
.put(idutf8)
.put((byte) 0xFF)
.put(scrambler)
.put((byte) 0xFF)
.array();
}
}
};
}
public long getCompletedEventCount()
{
return doubleEventCount.get() >> 1;
}
// Used in tests
NamespaceExtractionCacheManager getCacheManager()
{
return cacheManager;
}
AtomicReference<Map<String, String>> getMapRef()
{
return mapRef;
}
AtomicLong getDoubleEventCount()
{
return doubleEventCount;
}
ListenableFuture<?> getFuture()
{
return future;
}
class KafkaLookupExtractorIntrospectionHandler implements LookupIntrospectHandler
{
@GET
public Response getActive()
{
final ListenableFuture<?> future = getFuture();
if (future != null && !future.isDone()) {
return Response.ok().build();
} else {
return Response.status(Response.Status.GONE).build();
}
}
}
}

View File

@ -1,229 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.namespace;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.StringUtils;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.guice.ManageLifecycle;
import io.druid.query.extraction.namespace.KafkaExtractionNamespace;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
/**
*
*/
@ManageLifecycle
public class KafkaExtractionManager
{
private static final Logger log = new Logger(KafkaExtractionManager.class);
private final Properties kafkaProperties = new Properties();
private final ConcurrentMap<String, String> namespaceVersionMap;
private final ConcurrentMap<String, AtomicLong> topicEvents = new ConcurrentHashMap<>();
private final Collection<ListenableFuture<?>> futures = new ConcurrentLinkedQueue<>();
private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("kafka-rename-consumer-%d")
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
.build()
)
);
private final AtomicInteger backgroundTaskCount = new AtomicInteger(0);
// Bindings in KafkaExtractionNamespaceModule
@Inject
public KafkaExtractionManager(
@Named("namespaceVersionMap") final ConcurrentMap<String, String> namespaceVersionMap,
@Named("renameKafkaProperties") final Properties kafkaProperties
)
{
if (kafkaProperties.containsKey("group.id")) {
throw new IAE(
"Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]",
kafkaProperties.getProperty("group.id")
);
}
if (kafkaProperties.containsKey("auto.offset.reset")) {
throw new IAE(
"Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]",
kafkaProperties.getProperty("auto.offset.reset")
);
}
this.kafkaProperties.putAll(kafkaProperties);
if (!this.kafkaProperties.containsKey("zookeeper.connect")) {
this.kafkaProperties.put("zookeeper.connect", "localhost:2181/kafka");
}
// Enable publish-subscribe
this.kafkaProperties.setProperty("auto.offset.reset", "smallest");
this.namespaceVersionMap = namespaceVersionMap;
}
public long getBackgroundTaskCount()
{
return backgroundTaskCount.get();
}
private static final Decoder<String> defaultStringDecoder = new Decoder<String>()
{
@Override
public String fromBytes(byte[] bytes)
{
return StringUtils.fromUtf8(bytes);
}
};
public long getNumEvents(String namespace)
{
if(namespace == null){
return 0L;
} else {
final AtomicLong eventCounter = topicEvents.get(namespace);
if(eventCounter != null) {
return eventCounter.get();
} else {
return 0L;
}
}
}
public void addListener(final KafkaExtractionNamespace kafkaNamespace, final Map<String, String> map)
{
final String topic = kafkaNamespace.getKafkaTopic();
final String namespace = kafkaNamespace.getNamespace();
final ListenableFuture<?> future = executorService.submit(
new Runnable()
{
@Override
public void run()
{
final Properties privateProperties = new Properties();
privateProperties.putAll(kafkaProperties);
privateProperties.setProperty("group.id", UUID.randomUUID().toString());
ConsumerConnector consumerConnector = new kafka.javaapi.consumer.ZookeeperConsumerConnector(
new ConsumerConfig(
privateProperties
)
);
List<KafkaStream<String, String>> streams = consumerConnector.createMessageStreamsByFilter(
new Whitelist(Pattern.quote(topic)), 1, defaultStringDecoder, defaultStringDecoder
);
if (streams == null || streams.isEmpty()) {
throw new IAE("Topic [%s] had no streams", topic);
}
if (streams.size() > 1) {
throw new ISE("Topic [%s] has %d streams! expected 1", topic, streams.size());
}
backgroundTaskCount.incrementAndGet();
final KafkaStream<String, String> kafkaStream = streams.get(0);
final ConsumerIterator<String, String> it = kafkaStream.iterator();
log.info("Listening to topic [%s] for namespace [%s]", topic, namespace);
AtomicLong eventCounter = topicEvents.get(namespace);
if(eventCounter == null){
topicEvents.putIfAbsent(namespace, new AtomicLong(0L));
eventCounter = topicEvents.get(namespace);
}
while (it.hasNext()) {
final MessageAndMetadata<String, String> messageAndMetadata = it.next();
final String key = messageAndMetadata.key();
final String message = messageAndMetadata.message();
if (key == null || message == null) {
log.error("Bad key/message from topic [%s]: [%s]", topic, messageAndMetadata);
continue;
}
map.put(key, message);
namespaceVersionMap.put(namespace, Long.toString(eventCounter.incrementAndGet()));
log.debug("Placed key[%s] val[%s]", key, message);
}
}
}
);
Futures.addCallback(
future, new FutureCallback<Object>()
{
@Override
public void onSuccess(Object result)
{
topicEvents.remove(namespace);
}
@Override
public void onFailure(Throwable t)
{
topicEvents.remove(namespace);
if (t instanceof java.util.concurrent.CancellationException) {
log.warn("Cancelled rename task for topic [%s]", topic);
} else {
Throwables.propagate(t);
}
}
},
MoreExecutors.sameThreadExecutor()
);
}
@LifecycleStart
public void start()
{
// NO-OP
// all consumers are started through KafkaExtractionNamespaceFactory.getCachePopulator
}
@LifecycleStop
public void stop()
{
executorService.shutdown();
Futures.allAsList(futures).cancel(true);
}
}

View File

@ -1,110 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.namespace;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
import io.druid.query.extraction.namespace.KafkaExtractionNamespace;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
/**
*
*/
public class KafkaExtractionNamespaceFactory implements ExtractionNamespaceFunctionFactory<KafkaExtractionNamespace>
{
private final KafkaExtractionManager kafkaExtractionManager;
private static final String KAFKA_VERSION = "kafka versions are updated every time a new event comes in";
@Inject
public KafkaExtractionNamespaceFactory(
final KafkaExtractionManager kafkaExtractionManager
)
{
this.kafkaExtractionManager = kafkaExtractionManager;
}
@Override
public Function<String, String> buildFn(KafkaExtractionNamespace extractionNamespace, final Map<String, String> cache)
{
return new Function<String, String>()
{
@Nullable
@Override
public String apply(String input)
{
if (Strings.isNullOrEmpty(input)) {
return null;
}
return Strings.emptyToNull(cache.get(input));
}
};
}
@Override
public Function<String, List<String>> buildReverseFn(
KafkaExtractionNamespace extractionNamespace, final Map<String, String> cache
)
{
return new Function<String, List<String>>()
{
@Nullable
@Override
public List<String> apply(@Nullable final String value)
{
return Lists.newArrayList(Maps.filterKeys(cache, new Predicate<String>()
{
@Override public boolean apply(@Nullable String key)
{
return cache.get(key).equals(Strings.nullToEmpty(value));
}
}).keySet());
}
};
}
// This only fires ONCE when the namespace is first added. The version is updated externally as events come in
@Override
public Callable<String> getCachePopulator(
final KafkaExtractionNamespace extractionNamespace,
final String unused,
final Map<String, String> cache
)
{
return new Callable<String>()
{
@Override
public String call()
{
kafkaExtractionManager.addListener(extractionNamespace, cache);
return KAFKA_VERSION;
}
};
}
}

View File

@ -1,111 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.namespace;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.name.Named;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.annotations.Json;
import io.druid.initialization.DruidModule;
import io.druid.query.extraction.namespace.KafkaExtractionNamespace;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
*
*/
public class KafkaExtractionNamespaceModule implements DruidModule
{
private static final String PROPERTIES_KEY = "druid.query.rename.kafka.properties";
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.<Module>of(
new SimpleModule("kafka-lookups"){
@Override
public void setupModule(SetupContext setupContext)
{
setupContext.registerSubtypes(KafkaExtractionNamespace.class);
super.setupModule(setupContext);
}
}
);
}
@Provides
@Named("renameKafkaProperties")
@LazySingleton
public Properties getProperties(
@Json ObjectMapper mapper,
Properties systemProperties
)
{
String val = systemProperties.getProperty(PROPERTIES_KEY);
if (val == null) {
return new Properties();
}
try {
final Properties properties = new Properties();
properties.putAll(
mapper.<Map<String, String>>readValue(
val, new TypeReference<Map<String, String>>()
{
}
)
);
return properties;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Provides
@LazySingleton
public KafkaExtractionNamespaceFactory factoryFactory(
KafkaExtractionManager kafkaManager
)
{
return new KafkaExtractionNamespaceFactory(kafkaManager);
}
@Override
public void configure(Binder binder)
{
LifecycleModule.register(binder, KafkaExtractionManager.class);
NamespacedExtractionModule
.getNamespaceFactoryMapBinder(binder)
.addBinding(KafkaExtractionNamespace.class)
.to(KafkaExtractionNamespaceFactory.class)
.in(LazySingleton.class);
}
}

View File

@ -16,4 +16,4 @@
# specific language governing permissions and limitations
# under the License.
#
io.druid.server.namespace.KafkaExtractionNamespaceModule
io.druid.query.lookup.KafkaExtractionNamespaceModule

View File

@ -1,75 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction.namespace;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
/**
*
*/
public class KafkaExtractionNamespaceTest
{
@Test
public void testReflectiveSerde() throws IOException
{
ObjectMapper mapper = new DefaultObjectMapper();
mapper.registerSubtypes(KafkaExtractionNamespace.class);
final String val = "{\"type\":\"kafka\",\"kafkaTopic\":\"testTopic\",\"namespace\":\"testNamespace\"}";
final ExtractionNamespace fn =
mapper.reader(ExtractionNamespace.class)
.readValue(
val
);
Assert.assertEquals(val, mapper.writeValueAsString(fn));
}
@Test(expected = com.fasterxml.jackson.databind.JsonMappingException.class)
public void testMissingTopic() throws IOException
{
ObjectMapper mapper = new DefaultObjectMapper();
mapper.registerSubtypes(KafkaExtractionNamespace.class);
final String val = "{\"type\":\"kafka\",\"namespace\":\"testNamespace\"}";
final ExtractionNamespace fn =
mapper.reader(ExtractionNamespace.class)
.readValue(
val
);
Assert.assertEquals(val, mapper.writeValueAsString(fn));
}
@Test(expected = com.fasterxml.jackson.databind.JsonMappingException.class)
public void testMissingNamespace() throws IOException
{
ObjectMapper mapper = new DefaultObjectMapper();
mapper.registerSubtypes(KafkaExtractionNamespace.class);
final String val = "{\"type\":\"kafka\",\"kafkaTopic\":\"testTopic\"}";
final ExtractionNamespace fn =
mapper.reader(ExtractionNamespace.class)
.readValue(
val
);
Assert.assertEquals(val, mapper.writeValueAsString(fn));
}
}

View File

@ -1,445 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction.namespace;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
import com.metamx.common.ISE;
import com.metamx.common.StringUtils;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.annotations.Json;
import io.druid.initialization.Initialization;
import io.druid.server.namespace.KafkaExtractionManager;
import io.druid.server.namespace.KafkaExtractionNamespaceFactory;
import io.druid.server.namespace.KafkaExtractionNamespaceModule;
import io.druid.server.namespace.NamespacedExtractionModule;
import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
import kafka.admin.AdminUtils;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.CreateMode;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
/**
*
*/
public class TestKafkaExtractionCluster
{
private static final Logger log = new Logger(TestKafkaExtractionCluster.class);
private static final Lifecycle lifecycle = new Lifecycle();
private static final File tmpDir = Files.createTempDir();
private static final String topicName = "testTopic";
private static final String namespace = "testNamespace";
private static final Properties kafkaProperties = new Properties();
private KafkaServer kafkaServer;
private KafkaConfig kafkaConfig;
private TestingServer zkTestServer;
private ZkClient zkClient;
private KafkaExtractionManager renameManager;
private NamespaceExtractionCacheManager extractionCacheManager;
private Injector injector;
public static class KafkaFactoryProvider implements Provider<ExtractionNamespaceFunctionFactory<?>>
{
private final KafkaExtractionManager kafkaExtractionManager;
@Inject
public KafkaFactoryProvider(
KafkaExtractionManager kafkaExtractionManager
)
{
this.kafkaExtractionManager = kafkaExtractionManager;
}
@Override
public ExtractionNamespaceFunctionFactory<?> get()
{
return new KafkaExtractionNamespaceFactory(kafkaExtractionManager);
}
}
@Before
public void setUp() throws Exception
{
zkTestServer = new TestingServer(-1, new File(tmpDir.getAbsolutePath() + "/zk"), true);
zkTestServer.start();
zkClient = new ZkClient(
zkTestServer.getConnectString(),
10000,
10000,
ZKStringSerializer$.MODULE$
);
if (!zkClient.exists("/kafka")) {
zkClient.create("/kafka", null, CreateMode.PERSISTENT);
}
log.info("---------------------------Started ZK---------------------------");
final Properties serverProperties = new Properties();
serverProperties.putAll(kafkaProperties);
serverProperties.put("broker.id", "0");
serverProperties.put("log.dir", tmpDir.getAbsolutePath() + "/log");
serverProperties.put("log.cleaner.enable", "true");
serverProperties.put("host.name", "127.0.0.1");
serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + "/kafka");
serverProperties.put("zookeeper.session.timeout.ms", "10000");
serverProperties.put("zookeeper.sync.time.ms", "200");
kafkaConfig = new KafkaConfig(serverProperties);
final long time = DateTime.parse("2015-01-01").getMillis();
kafkaServer = new KafkaServer(
kafkaConfig,
new Time()
{
@Override
public long milliseconds()
{
return time;
}
@Override
public long nanoseconds()
{
return milliseconds() * 1_000_000;
}
@Override
public void sleep(long ms)
{
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}
);
kafkaServer.startup();
int sleepCount = 0;
while (!kafkaServer.kafkaController().isActive()) {
Thread.sleep(100);
if (++sleepCount > 10) {
throw new InterruptedException("Controller took to long to awaken");
}
}
log.info("---------------------------Started Kafka Server---------------------------");
ZkClient zkClient = new ZkClient(
zkTestServer.getConnectString() + "/kafka", 10000, 10000,
ZKStringSerializer$.MODULE$
);
try {
final Properties topicProperties = new Properties();
topicProperties.put("cleanup.policy", "compact");
if (!AdminUtils.topicExists(zkClient, topicName)) {
AdminUtils.createTopic(zkClient, topicName, 1, 1, topicProperties);
}
log.info("---------------------------Created topic---------------------------");
Assert.assertTrue(AdminUtils.topicExists(zkClient, topicName));
}
finally {
zkClient.close();
}
final Properties kafkaProducerProperties = makeProducerProperties();
Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
try {
producer.send(
new KeyedMessage<>(
topicName,
StringUtils.toUtf8("abcdefg"),
StringUtils.toUtf8("abcdefg")
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
producer.close();
}
System.setProperty("druid.extensions.searchCurrentClassloader", "false");
injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.<Module>of()
),
ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
}
},
new NamespacedExtractionModule(),
new KafkaExtractionNamespaceModule()
{
@Override
public Properties getProperties(
@Json ObjectMapper mapper,
Properties systemProperties
)
{
final Properties consumerProperties = new Properties(kafkaProperties);
consumerProperties.put("zookeeper.connect", zkTestServer.getConnectString() + "/kafka");
consumerProperties.put("zookeeper.session.timeout.ms", "10000");
consumerProperties.put("zookeeper.sync.time.ms", "200");
return consumerProperties;
}
}
)
);
renameManager = injector.getInstance(KafkaExtractionManager.class);
log.info("--------------------------- placed default item via producer ---------------------------");
extractionCacheManager = injector.getInstance(NamespaceExtractionCacheManager.class);
extractionCacheManager.schedule(
new KafkaExtractionNamespace(topicName, namespace)
);
long start = System.currentTimeMillis();
while (renameManager.getBackgroundTaskCount() < 1) {
Thread.sleep(100); // wait for map populator to start up
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("renameManager took too long to start");
}
}
log.info("--------------------------- started rename manager ---------------------------");
}
@After
public void tearDown() throws Exception
{
lifecycle.stop();
if (null != renameManager) {
renameManager.stop();
}
if (null != kafkaServer) {
kafkaServer.shutdown();
kafkaServer.awaitShutdown();
}
if (null != zkClient) {
if (zkClient.exists("/kafka")) {
try {
zkClient.deleteRecursive("/kafka");
}
catch (org.I0Itec.zkclient.exception.ZkException ex) {
log.warn(ex, "error deleting /kafka zk node");
}
}
zkClient.close();
}
if (null != zkTestServer) {
zkTestServer.stop();
}
if (tmpDir.exists()) {
FileUtils.deleteDirectory(tmpDir);
}
}
private final Properties makeProducerProperties()
{
final Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.putAll(kafkaProperties);
kafkaProducerProperties.put(
"metadata.broker.list",
String.format("127.0.0.1:%d", kafkaServer.socketServer().port())
);
kafkaProperties.put("request.required.acks", "1");
return kafkaProducerProperties;
}
private void checkServer()
{
if (!kafkaServer.apis().controller().isActive()) {
throw new ISE("server is not active!");
}
}
@Test(timeout = 60_000L)
public void testSimpleRename() throws InterruptedException
{
final Properties kafkaProducerProperties = makeProducerProperties();
final Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
try {
checkServer();
final ConcurrentMap<String, Function<String, String>> fnFn =
injector.getInstance(
Key.get(
new TypeLiteral<ConcurrentMap<String, Function<String, String>>>()
{
},
Names.named("namespaceExtractionFunctionCache")
)
);
final ConcurrentMap<String, Function<String, List<String>>> reverseFn =
injector.getInstance(
Key.get(
new TypeLiteral<ConcurrentMap<String, Function<String, List<String>>>>()
{
},
Names.named("namespaceReverseExtractionFunctionCache")
)
);
KafkaExtractionNamespace extractionNamespace = new KafkaExtractionNamespace(topicName, namespace);
assertUpdated(null, extractionNamespace.getNamespace(), "foo", fnFn);
assertReverseUpdated(Collections.EMPTY_LIST, extractionNamespace.getNamespace(), "foo", reverseFn);
long events = renameManager.getNumEvents(namespace);
log.info("------------------------- Sending foo bar -------------------------------");
producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
long start = System.currentTimeMillis();
while (events == renameManager.getNumEvents(namespace)) {
Thread.sleep(100);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
log.info("------------------------- Checking foo bar -------------------------------");
assertUpdated("bar", extractionNamespace.getNamespace(), "foo", fnFn);
assertReverseUpdated(Arrays.asList("foo"), extractionNamespace.getNamespace(), "bar", reverseFn);
assertUpdated(null, extractionNamespace.getNamespace(), "baz", fnFn);
checkServer();
events = renameManager.getNumEvents(namespace);
log.info("------------------------- Sending baz bat -------------------------------");
producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
while (events == renameManager.getNumEvents(namespace)) {
Thread.sleep(10);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
log.info("------------------------- Checking baz bat -------------------------------");
Assert.assertEquals("bat", fnFn.get(extractionNamespace.getNamespace()).apply("baz"));
Assert.assertEquals(Arrays.asList("baz"), reverseFn.get(extractionNamespace.getNamespace()).apply("bat"));
}
finally {
producer.close();
}
}
private void assertUpdated(
String expected,
String namespace,
String key,
ConcurrentMap<String, Function<String, String>> lookup
)
throws InterruptedException
{
Function<String, String> extractionFn = lookup.get(namespace);
if (expected == null) {
while (extractionFn.apply(key) != null) {
Thread.sleep(100);
extractionFn = lookup.get(namespace);
}
} else {
while (!expected.equals(extractionFn.apply(key))) {
Thread.sleep(100);
extractionFn = lookup.get(namespace);
}
}
Assert.assertEquals("update check", expected, extractionFn.apply(key));
}
private void assertReverseUpdated(
List<String> expected,
String namespace,
String key,
ConcurrentMap<String, Function<String, List<String>>> lookup
)
throws InterruptedException
{
Function<String, List<String>> extractionFn = lookup.get(namespace);
while (!extractionFn.apply(key).equals(expected)) {
Thread.sleep(100);
extractionFn = lookup.get(namespace);
}
Assert.assertEquals("update check", expected, extractionFn.apply(key));
}
}

View File

@ -0,0 +1,516 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.lookup;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.StringUtils;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.javaapi.consumer.ConsumerConnector;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static io.druid.query.lookup.KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER;
public class KafkaLookupExtractorFactoryTest
{
private static final String TOPIC = "some_topic";
private static final Map<String, String> DEFAULT_PROPERTIES = ImmutableMap.of(
"some.property", "some.value"
);
private final ObjectMapper mapper = new DefaultObjectMapper();
final NamespaceExtractionCacheManager cacheManager = EasyMock.createStrictMock(NamespaceExtractionCacheManager.class);
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUp()
{
mapper.setInjectableValues(new InjectableValues()
{
@Override
public Object findInjectableValue(
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
)
{
if ("io.druid.server.namespace.cache.NamespaceExtractionCacheManager".equals(valueId)) {
return cacheManager;
} else {
return null;
}
}
});
}
@Test
public void testSimpleSerDe() throws Exception
{
final KafkaLookupExtractorFactory expected = new KafkaLookupExtractorFactory(null, TOPIC, DEFAULT_PROPERTIES);
final KafkaLookupExtractorFactory result = mapper.readValue(
mapper.writeValueAsString(expected),
KafkaLookupExtractorFactory.class
);
Assert.assertEquals(expected.getKafkaTopic(), result.getKafkaTopic());
Assert.assertEquals(expected.getKafkaProperties(), result.getKafkaProperties());
Assert.assertEquals(cacheManager, result.getCacheManager());
Assert.assertEquals(0, expected.getCompletedEventCount());
Assert.assertEquals(0, result.getCompletedEventCount());
}
@Test
public void testCacheKeyScramblesOnNewData()
{
final int n = 1000;
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
);
factory.getMapRef().set(ImmutableMap.<String, String>of());
final AtomicLong events = factory.getDoubleEventCount();
final LookupExtractor extractor = factory.get();
final List<byte[]> byteArrays = new ArrayList<>(n);
for (int i = 0; i < n; ++i) {
final byte[] myKey = extractor.getCacheKey();
// Not terribly efficient.. but who cares
for (byte[] byteArray : byteArrays) {
Assert.assertFalse(Arrays.equals(byteArray, myKey));
}
byteArrays.add(myKey);
events.incrementAndGet();
}
Assert.assertEquals(n, byteArrays.size());
}
@Test
public void testCacheKeyScramblesDifferentStarts()
{
final int n = 1000;
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
);
factory.getMapRef().set(ImmutableMap.<String, String>of());
final AtomicLong events = factory.getDoubleEventCount();
final List<byte[]> byteArrays = new ArrayList<>(n);
for (int i = 0; i < n; ++i) {
final LookupExtractor extractor = factory.get();
final byte[] myKey = extractor.getCacheKey();
// Not terribly efficient.. but who cares
for (byte[] byteArray : byteArrays) {
Assert.assertFalse(Arrays.equals(byteArray, myKey));
}
byteArrays.add(myKey);
events.incrementAndGet();
}
Assert.assertEquals(n, byteArrays.size());
}
@Test
public void testCacheKeySameOnNoChange()
{
final int n = 1000;
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
);
factory.getMapRef().set(ImmutableMap.<String, String>of());
final LookupExtractor extractor = factory.get();
final byte[] baseKey = extractor.getCacheKey();
for (int i = 0; i < n; ++i) {
Assert.assertArrayEquals(baseKey, factory.get().getCacheKey());
}
}
@Test
public void testCacheKeyDifferentForTopics()
{
final KafkaLookupExtractorFactory factory1 = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
);
factory1.getMapRef().set(ImmutableMap.<String, String>of());
final KafkaLookupExtractorFactory factory2 = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC + "b",
DEFAULT_PROPERTIES
);
factory2.getMapRef().set(ImmutableMap.<String, String>of());
Assert.assertFalse(Arrays.equals(factory1.get().getCacheKey(), factory2.get().getCacheKey()));
}
@Test
public void testReplaces()
{
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
);
Assert.assertTrue(factory.replaces(new MapLookupExtractorFactory(ImmutableMap.<String, String>of(), false)));
Assert.assertFalse(factory.replaces(factory));
Assert.assertFalse(factory.replaces(new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
)));
Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
cacheManager,
TOPIC + "b",
DEFAULT_PROPERTIES
)));
Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("some.property", "some.other.value")
)));
Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("some.other.property", "some.value")
)));
Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES,
1,
false
)));
Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES,
0,
true
)));
}
@Test
public void testStopWithoutStart()
{
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
);
Assert.assertTrue(factory.close());
}
@Test
public void testStartStop()
{
final KafkaStream<String, String> kafkaStream = EasyMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = EasyMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = EasyMock.createStrictMock(ConsumerConnector.class);
EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
EasyMock.anyObject(TopicFilter.class),
EasyMock.anyInt(),
EasyMock.eq(
DEFAULT_STRING_DECODER),
EasyMock.eq(DEFAULT_STRING_DECODER)
)).andReturn(ImmutableList.of(kafkaStream)).once();
EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
EasyMock.expect(consumerIterator.hasNext()).andReturn(false).anyTimes();
EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
.andReturn(new ConcurrentHashMap<String, String>())
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
consumerConnector.shutdown();
EasyMock.expectLastCall().once();
EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("zookeeper.connect", "localhost"),
10_000L,
false
)
{
@Override
ConsumerConnector buildConnector(Properties properties)
{
return consumerConnector;
}
};
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
Assert.assertTrue(factory.getFuture().isDone());
EasyMock.verify(cacheManager);
}
@Test
public void testStartFailsFromTimeout() throws Exception
{
EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
.andReturn(new ConcurrentHashMap<String, String>())
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
EasyMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("zookeeper.connect", "localhost"),
1,
false
)
{
@Override
ConsumerConnector buildConnector(Properties properties)
{
// Lock up
try {
Thread.currentThread().join();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
throw new RuntimeException("shouldn't make it here");
}
};
Assert.assertFalse(factory.start());
Assert.assertTrue(factory.getFuture().isDone());
Assert.assertTrue(factory.getFuture().isCancelled());
EasyMock.verify(cacheManager);
}
@Test
public void testStopDeleteError()
{
final KafkaStream<String, String> kafkaStream = EasyMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = EasyMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = EasyMock.createStrictMock(ConsumerConnector.class);
EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
EasyMock.anyObject(TopicFilter.class),
EasyMock.anyInt(),
EasyMock.eq(
DEFAULT_STRING_DECODER),
EasyMock.eq(DEFAULT_STRING_DECODER)
)).andReturn(ImmutableList.of(kafkaStream)).once();
EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
EasyMock.expect(consumerIterator.hasNext()).andReturn(false).anyTimes();
EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
.andReturn(new ConcurrentHashMap<String, String>())
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(false).once();
EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("zookeeper.connect", "localhost")
)
{
@Override
ConsumerConnector buildConnector(Properties properties)
{
return consumerConnector;
}
};
Assert.assertTrue(factory.start());
Assert.assertFalse(factory.close());
EasyMock.verify(cacheManager, kafkaStream, consumerConnector, consumerIterator);
}
@Test
public void testStartStopStart()
{
final KafkaStream<String, String> kafkaStream = EasyMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = EasyMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = EasyMock.createStrictMock(ConsumerConnector.class);
EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
EasyMock.anyObject(TopicFilter.class),
EasyMock.anyInt(),
EasyMock.eq(
DEFAULT_STRING_DECODER),
EasyMock.eq(DEFAULT_STRING_DECODER)
)).andReturn(ImmutableList.of(kafkaStream)).once();
EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
EasyMock.expect(consumerIterator.hasNext()).andReturn(false).anyTimes();
EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
.andReturn(new ConcurrentHashMap<String, String>())
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
consumerConnector.shutdown();
EasyMock.expectLastCall().once();
EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("zookeeper.connect", "localhost")
)
{
@Override
ConsumerConnector buildConnector(Properties properties)
{
return consumerConnector;
}
};
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
Assert.assertFalse(factory.start());
EasyMock.verify(cacheManager);
}
@Test
public void testStartStartStop()
{
final KafkaStream<String, String> kafkaStream = EasyMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = EasyMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = EasyMock.createStrictMock(ConsumerConnector.class);
EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
EasyMock.anyObject(TopicFilter.class),
EasyMock.anyInt(),
EasyMock.eq(
DEFAULT_STRING_DECODER),
EasyMock.eq(DEFAULT_STRING_DECODER)
)).andReturn(ImmutableList.of(kafkaStream)).once();
EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
EasyMock.expect(consumerIterator.hasNext()).andReturn(false).anyTimes();
EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
.andReturn(new ConcurrentHashMap<String, String>())
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
consumerConnector.shutdown();
EasyMock.expectLastCall().once();
EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("zookeeper.connect", "localhost"),
10_000L,
false
)
{
@Override
ConsumerConnector buildConnector(Properties properties)
{
return consumerConnector;
}
};
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
Assert.assertTrue(factory.close());
EasyMock.verify(cacheManager);
}
@Test
public void testStartFailsOnMissingConnect()
{
expectedException.expectMessage("zookeeper.connect required property");
EasyMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.<String, String>of()
);
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
EasyMock.verify(cacheManager);
}
@Test
public void testStartFailsOnGroupID()
{
expectedException.expectMessage(
"Cannot set kafka property [group.id]. Property is randomly generated for you. Found");
EasyMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("group.id", "make me fail")
);
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
EasyMock.verify(cacheManager);
}
@Test
public void testStartFailsOnAutoOffset()
{
expectedException.expectMessage(
"Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found ");
EasyMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("auto.offset.reset", "make me fail")
);
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
EasyMock.verify(cacheManager);
}
@Test
public void testFailsGetNotStarted()
{
expectedException.expectMessage("Not started");
new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
).get();
}
@Test
public void testDefaultDecoder()
{
final String str = "some string";
Assert.assertEquals(str, DEFAULT_STRING_DECODER.fromBytes(StringUtils.toUtf8(str)));
}
}

View File

@ -0,0 +1,400 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.lookup;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import com.metamx.common.ISE;
import com.metamx.common.StringUtils;
import com.metamx.common.logger.Logger;
import io.druid.guice.GuiceInjectors;
import io.druid.initialization.Initialization;
import io.druid.server.namespace.NamespacedExtractionModule;
import kafka.admin.AdminUtils;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.CreateMode;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
*
*/
public class TestKafkaExtractionCluster
{
private static final Logger log = new Logger(TestKafkaExtractionCluster.class);
private static final String topicName = "testTopic";
private static final Map<String, String> kafkaProperties = new HashMap<>();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private final Closer closer = Closer.create();
private KafkaServer kafkaServer;
private KafkaConfig kafkaConfig;
private TestingServer zkTestServer;
private ZkClient zkClient;
private Injector injector;
private ObjectMapper mapper;
private KafkaLookupExtractorFactory factory;
@Before
public void setUp() throws Exception
{
zkTestServer = new TestingServer(-1, temporaryFolder.newFolder(), true);
zkTestServer.start();
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
zkTestServer.stop();
}
});
zkClient = new ZkClient(
zkTestServer.getConnectString(),
10000,
10000,
ZKStringSerializer$.MODULE$
);
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
zkClient.close();
}
});
if (!zkClient.exists("/kafka")) {
zkClient.create("/kafka", null, CreateMode.PERSISTENT);
}
log.info("---------------------------Started ZK---------------------------");
final String zkKafkaPath = "/kafka";
final Properties serverProperties = new Properties();
serverProperties.putAll(kafkaProperties);
serverProperties.put("broker.id", "0");
serverProperties.put("log.dir", temporaryFolder.newFolder().getAbsolutePath());
serverProperties.put("log.cleaner.enable", "true");
serverProperties.put("host.name", "127.0.0.1");
serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
serverProperties.put("zookeeper.session.timeout.ms", "10000");
serverProperties.put("zookeeper.sync.time.ms", "200");
kafkaConfig = new KafkaConfig(serverProperties);
final long time = DateTime.parse("2015-01-01").getMillis();
kafkaServer = new KafkaServer(
kafkaConfig,
new Time()
{
@Override
public long milliseconds()
{
return time;
}
@Override
public long nanoseconds()
{
return milliseconds() * 1_000_000;
}
@Override
public void sleep(long ms)
{
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}
);
kafkaServer.startup();
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
kafkaServer.shutdown();
kafkaServer.awaitShutdown();
}
});
int sleepCount = 0;
while (!kafkaServer.kafkaController().isActive()) {
Thread.sleep(100);
if (++sleepCount > 10) {
throw new InterruptedException("Controller took to long to awaken");
}
}
log.info("---------------------------Started Kafka Server---------------------------");
final ZkClient zkClient = new ZkClient(
zkTestServer.getConnectString() + zkKafkaPath, 10000, 10000,
ZKStringSerializer$.MODULE$
);
try (final AutoCloseable autoCloseable = new AutoCloseable()
{
@Override
public void close() throws Exception
{
if (zkClient.exists(zkKafkaPath)) {
try {
zkClient.deleteRecursive(zkKafkaPath);
}
catch (org.I0Itec.zkclient.exception.ZkException ex) {
log.warn(ex, "error deleting %s zk node", zkKafkaPath);
}
}
zkClient.close();
}
}) {
final Properties topicProperties = new Properties();
topicProperties.put("cleanup.policy", "compact");
if (!AdminUtils.topicExists(zkClient, topicName)) {
AdminUtils.createTopic(zkClient, topicName, 1, 1, topicProperties);
}
log.info("---------------------------Created topic---------------------------");
Assert.assertTrue(AdminUtils.topicExists(zkClient, topicName));
}
final Properties kafkaProducerProperties = makeProducerProperties();
final Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
try (final AutoCloseable autoCloseable = new AutoCloseable()
{
@Override
public void close() throws Exception
{
producer.close();
}
}) {
producer.send(
new KeyedMessage<>(
topicName,
StringUtils.toUtf8("abcdefg"),
StringUtils.toUtf8("abcdefg")
)
);
}
System.setProperty("druid.extensions.searchCurrentClassloader", "false");
injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
}
},
// These injections fail under IntelliJ but are required for maven
new NamespacedExtractionModule(),
new KafkaExtractionNamespaceModule()
)
);
mapper = injector.getInstance(ObjectMapper.class);
log.info("--------------------------- placed default item via producer ---------------------------");
final Map<String, String> consumerProperties = new HashMap<>(kafkaProperties);
consumerProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
consumerProperties.put("zookeeper.session.timeout.ms", "10000");
consumerProperties.put("zookeeper.sync.time.ms", "200");
final KafkaLookupExtractorFactory kafkaLookupExtractorFactory = new KafkaLookupExtractorFactory(
null,
topicName,
consumerProperties
);
factory = (KafkaLookupExtractorFactory) mapper.readValue(
mapper.writeValueAsString(kafkaLookupExtractorFactory),
LookupExtractorFactory.class
);
Assert.assertEquals(kafkaLookupExtractorFactory.getKafkaTopic(), factory.getKafkaTopic());
Assert.assertEquals(kafkaLookupExtractorFactory.getKafkaProperties(), factory.getKafkaProperties());
factory.start();
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
factory.close();
}
});
log.info("--------------------------- started rename manager ---------------------------");
}
@After
public void tearDown() throws Exception
{
closer.close();
}
private final Properties makeProducerProperties()
{
final Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.putAll(kafkaProperties);
kafkaProducerProperties.put(
"metadata.broker.list",
String.format("127.0.0.1:%d", kafkaServer.socketServer().port())
);
kafkaProperties.put("request.required.acks", "1");
return kafkaProducerProperties;
}
private void checkServer()
{
if (!kafkaServer.apis().controller().isActive()) {
throw new ISE("server is not active!");
}
}
@Test(timeout = 60_000L)
public void testSimpleRename() throws InterruptedException
{
final Properties kafkaProducerProperties = makeProducerProperties();
final Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
producer.close();
}
});
checkServer();
assertUpdated(null, "foo");
assertReverseUpdated(ImmutableList.<String>of(), "foo");
long events = factory.getCompletedEventCount();
log.info("------------------------- Sending foo bar -------------------------------");
producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
long start = System.currentTimeMillis();
while (events == factory.getCompletedEventCount()) {
Thread.sleep(100);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
log.info("------------------------- Checking foo bar -------------------------------");
assertUpdated("bar", "foo");
assertReverseUpdated(Collections.singletonList("foo"), "bar");
assertUpdated(null, "baz");
checkServer();
events = factory.getCompletedEventCount();
log.info("------------------------- Sending baz bat -------------------------------");
producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
while (events == factory.getCompletedEventCount()) {
Thread.sleep(10);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
log.info("------------------------- Checking baz bat -------------------------------");
Assert.assertEquals("bat", factory.get().apply("baz"));
Assert.assertEquals(Collections.singletonList("baz"), factory.get().unapply("bat"));
}
private void assertUpdated(
String expected,
String key
)
throws InterruptedException
{
final LookupExtractor extractor = factory.get();
if (expected == null) {
while (extractor.apply(key) != null) {
Thread.sleep(100);
}
} else {
while (!expected.equals(extractor.apply(key))) {
Thread.sleep(100);
}
}
Assert.assertEquals("update check", expected, extractor.apply(key));
}
private void assertReverseUpdated(
List<String> expected,
String key
)
throws InterruptedException
{
final LookupExtractor extractor = factory.get();
while (!expected.equals(extractor.unapply(key))) {
Thread.sleep(100);
}
Assert.assertEquals("update check", expected, extractor.unapply(key));
}
}

View File

@ -28,7 +28,7 @@
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
<Logger name="io.druid.server.namespace.KafkaExtractionManager" level="debug" additivity="false">
<Logger name="io.druid.server" level="trace" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
</Loggers>