From 85f339b687231aeb397786b4c798e6ded96dbbd2 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Tue, 1 Dec 2015 10:34:09 -0600 Subject: [PATCH] introduction and implem of reverse lookup function unApply. --- .../KafkaExtractionNamespaceFactory.java | 28 +++++++++++- .../namespace/TestKafkaExtractionCluster.java | 30 +++++++++---- .../query/extraction/NamespacedExtractor.java | 13 ++++++ .../ExtractionNamespaceFunctionFactory.java | 14 +++++- ...DBCExtractionNamespaceFunctionFactory.java | 27 +++++++++++- .../namespace/NamespacedExtractionModule.java | 31 +++++++++++++ ...URIExtractionNamespaceFunctionFactory.java | 29 ++++++++++++- .../NamespaceExtractionCacheManager.java | 13 +++++- ...ffHeapNamespaceExtractionCacheManager.java | 5 ++- ...OnHeapNamespaceExtractionCacheManager.java | 5 ++- .../namespace/NamespacedExtractorTest.java | 43 ++++++++++++++++--- .../NamespacedExtractorModuleTest.java | 2 + ...xtractionNamespaceFunctionFactoryTest.java | 33 +++++++++++++- .../cache/JDBCExtractionNamespaceTest.java | 43 ++++++++++++++----- ...ceExtractionCacheManagerExecutorsTest.java | 4 +- .../NamespaceExtractionCacheManagersTest.java | 3 ++ .../query/extraction/LookupExtractor.java | 13 ++++++ .../query/extraction/MapLookupExtractor.java | 17 ++++++++ .../extraction/MapLookupExtractorTest.java | 26 ++++++++--- 19 files changed, 338 insertions(+), 41 deletions(-) diff --git a/extensions/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionNamespaceFactory.java b/extensions/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionNamespaceFactory.java index 28ee21fbc6b..baed37fe0a2 100644 --- a/extensions/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionNamespaceFactory.java +++ b/extensions/kafka-extraction-namespace/src/main/java/io/druid/server/namespace/KafkaExtractionNamespaceFactory.java @@ -20,12 +20,16 @@ package io.druid.server.namespace; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.inject.Inject; import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory; import io.druid.query.extraction.namespace.KafkaExtractionNamespace; import javax.annotation.Nullable; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -47,7 +51,7 @@ public class KafkaExtractionNamespaceFactory implements ExtractionNamespaceFunct @Override - public Function build(KafkaExtractionNamespace extractionNamespace, final Map cache) + public Function buildFn(KafkaExtractionNamespace extractionNamespace, final Map cache) { return new Function() { @@ -63,6 +67,28 @@ public class KafkaExtractionNamespaceFactory implements ExtractionNamespaceFunct }; } + @Override + public Function> buildReverseFn( + KafkaExtractionNamespace extractionNamespace, final Map cache + ) + { + return new Function>() + { + @Nullable + @Override + public List apply(@Nullable final String value) + { + return Lists.newArrayList(Maps.filterKeys(cache, new Predicate() + { + @Override public boolean apply(@Nullable String key) + { + return cache.get(key).equals(Strings.nullToEmpty(value)); + } + }).keySet()); + } + }; + } + // This only fires ONCE when the namespace is first added. The version is updated externally as events come in @Override public Callable getCachePopulator( diff --git a/extensions/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/TestKafkaExtractionCluster.java b/extensions/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/TestKafkaExtractionCluster.java index 899b053f8f8..91db72f4070 100644 --- a/extensions/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/TestKafkaExtractionCluster.java +++ b/extensions/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/TestKafkaExtractionCluster.java @@ -64,8 +64,10 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** @@ -81,7 +83,6 @@ public class TestKafkaExtractionCluster private static final String namespace = "testNamespace"; private static TestingServer zkTestServer; private static KafkaExtractionManager renameManager; - private static final ConcurrentMap> fnCache = new ConcurrentHashMap<>(); private static final Lifecycle lifecycle = new Lifecycle(); private static NamespaceExtractionCacheManager extractionCacheManager; @@ -90,16 +91,18 @@ public class TestKafkaExtractionCluster private static Injector injector; - public static class KafkaFactoryProvider implements Provider> { private final KafkaExtractionManager kafkaExtractionManager; + @Inject public KafkaFactoryProvider( KafkaExtractionManager kafkaExtractionManager - ){ + ) + { this.kafkaExtractionManager = kafkaExtractionManager; } + @Override public ExtractionNamespaceFunctionFactory get() { @@ -195,7 +198,6 @@ public class TestKafkaExtractionCluster finally { zkClient.close(); } - fnCache.clear(); final Properties kafkaProducerProperties = makeProducerProperties(); Producer producer = new Producer(new ProducerConfig(kafkaProducerProperties)); try { @@ -280,7 +282,8 @@ public class TestKafkaExtractionCluster if (zkClient.exists("/kafka")) { try { zkClient.deleteRecursive("/kafka"); - }catch(org.I0Itec.zkclient.exception.ZkException ex){ + } + catch (org.I0Itec.zkclient.exception.ZkException ex) { log.warn(ex, "error deleting /kafka zk node"); } } @@ -289,12 +292,13 @@ public class TestKafkaExtractionCluster if (null != zkTestServer) { zkTestServer.stop(); } - if(tmpDir.exists()){ + if (tmpDir.exists()) { FileUtils.deleteDirectory(tmpDir); } } - private static final Properties makeProducerProperties(){ + private static final Properties makeProducerProperties() + { final Properties kafkaProducerProperties = new Properties(); kafkaProducerProperties.putAll(kafkaProperties); kafkaProducerProperties.put( @@ -320,10 +324,16 @@ public class TestKafkaExtractionCluster final Producer producer = new Producer(new ProducerConfig(kafkaProducerProperties)); try { checkServer(); - final ConcurrentMap> fnFn = injector.getInstance(Key.get(new TypeLiteral>>(){}, Names.named("namespaceExtractionFunctionCache"))); + final ConcurrentMap> fnFn = injector.getInstance(Key.get(new TypeLiteral>>() + { + }, Names.named("namespaceExtractionFunctionCache"))); + final ConcurrentMap>> reverseFn = injector.getInstance(Key.get(new TypeLiteral>>>() + { + }, Names.named("namespaceReverseExtractionFunctionCache"))); KafkaExtractionNamespace extractionNamespace = new KafkaExtractionNamespace(topicName, namespace); Assert.assertEquals(null, fnFn.get(extractionNamespace.getNamespace()).apply("foo")); + Assert.assertEquals(Collections.EMPTY_LIST, reverseFn.get(extractionNamespace.getNamespace()).apply("foo")); long events = renameManager.getNumEvents(namespace); @@ -340,6 +350,7 @@ public class TestKafkaExtractionCluster log.info("------------------------- Checking foo bar -------------------------------"); Assert.assertEquals("bar", fnFn.get(extractionNamespace.getNamespace()).apply("foo")); + Assert.assertEquals(Arrays.asList("foo"), reverseFn.get(extractionNamespace.getNamespace()).apply("bar")); Assert.assertEquals(null, fnFn.get(extractionNamespace.getNamespace()).apply("baz")); checkServer(); @@ -356,6 +367,7 @@ public class TestKafkaExtractionCluster log.info("------------------------- Checking baz bat -------------------------------"); Assert.assertEquals("bat", fnFn.get(extractionNamespace.getNamespace()).apply("baz")); + Assert.assertEquals(Arrays.asList("baz"), reverseFn.get(extractionNamespace.getNamespace()).apply("bat")); } finally { producer.close(); diff --git a/extensions/namespace-lookup/src/main/java/io/druid/query/extraction/NamespacedExtractor.java b/extensions/namespace-lookup/src/main/java/io/druid/query/extraction/NamespacedExtractor.java index 841b7f3f34f..f8aca567cd0 100644 --- a/extensions/namespace-lookup/src/main/java/io/druid/query/extraction/NamespacedExtractor.java +++ b/extensions/namespace-lookup/src/main/java/io/druid/query/extraction/NamespacedExtractor.java @@ -30,6 +30,7 @@ import com.metamx.common.StringUtils; import javax.validation.constraints.NotNull; import java.nio.ByteBuffer; +import java.util.List; /** * Namespaced extraction is a special case of DimExtractionFn where the actual extractor is pulled from a map of known implementations. @@ -42,17 +43,24 @@ public class NamespacedExtractor implements LookupExtractor private final String namespace; private final Function extractionFunction; + private final Function> reverseExtractionFunction; @JsonCreator public NamespacedExtractor( @NotNull @JacksonInject @Named("dimExtractionNamespace") final Function> namespaces, + @NotNull @JacksonInject @Named("reverseDimExtractionNamespace") + final Function>> reverseNamespaces, @NotNull @JsonProperty(value = "namespace", required = true) final String namespace ) { this.namespace = Preconditions.checkNotNull(namespace, "namespace"); this.extractionFunction = Preconditions.checkNotNull(namespaces.apply(namespace), "no namespace found"); + this.reverseExtractionFunction = Preconditions.checkNotNull( + reverseNamespaces.apply(namespace), + "can not found reverse extraction function" + ); } @JsonProperty("namespace") @@ -74,4 +82,9 @@ public class NamespacedExtractor implements LookupExtractor return extractionFunction.apply(value); } + @Override + public List unApply(@NotNull String value) + { + return reverseExtractionFunction.apply(value); + } } diff --git a/extensions/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/ExtractionNamespaceFunctionFactory.java b/extensions/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/ExtractionNamespaceFunctionFactory.java index acb9f1d731b..0e48d0a425a 100644 --- a/extensions/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/ExtractionNamespaceFunctionFactory.java +++ b/extensions/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/ExtractionNamespaceFunctionFactory.java @@ -21,6 +21,7 @@ package io.druid.query.extraction.namespace; import com.google.common.base.Function; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -39,7 +40,16 @@ public interface ExtractionNamespaceFunctionFactory build(T extractionNamespace, Map cache); + Function buildFn(T extractionNamespace, Map cache); + + + /** + * @param extractionNamespace The ExtractionNamespace for which a manipulating reverse function is needed. + * @param cache view of the cache containing the function mapping. + * + * @return A function that will perform reverse lookup. + */ + Function> buildReverseFn(T extractionNamespace, final Map cache); /** * This function is called once if `ExtractionNamespace.getUpdateMs() == 0`, or every update if @@ -54,7 +64,7 @@ public interface ExtractionNamespaceFunctionFactory dbiCache = new ConcurrentHashMap<>(); @Override - public Function build(JDBCExtractionNamespace extractionNamespace, final Map cache) + public Function buildFn(JDBCExtractionNamespace extractionNamespace, final Map cache) { return new Function() { @@ -67,6 +70,28 @@ public class JDBCExtractionNamespaceFunctionFactory }; } + @Override + public Function> buildReverseFn( + JDBCExtractionNamespace extractionNamespace, final Map cache + ) + { + return new Function>() + { + @Nullable + @Override + public List apply(@Nullable final String value) + { + return Lists.newArrayList(Maps.filterKeys(cache, new Predicate() + { + @Override public boolean apply(@Nullable String key) + { + return cache.get(key).equals(Strings.nullToEmpty(value)); + } + }).keySet()); + } + }; + } + @Override public Callable getCachePopulator( final JDBCExtractionNamespace namespace, diff --git a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/NamespacedExtractionModule.java b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/NamespacedExtractionModule.java index 0c667cb2bbd..5870a2040ae 100644 --- a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/NamespacedExtractionModule.java +++ b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/NamespacedExtractionModule.java @@ -66,6 +66,7 @@ public class NamespacedExtractionModule implements DruidModule private static final String TYPE_PREFIX = "druid.query.extraction.namespace.cache.type"; private static final String STATIC_CONFIG_PREFIX = "druid.query.extraction.namespace"; private final ConcurrentMap> fnCache = new ConcurrentHashMap<>(); + private final ConcurrentMap>> reverseFnCache= new ConcurrentHashMap<>(); @Override public List getJacksonModules() @@ -176,6 +177,13 @@ public class NamespacedExtractionModule implements DruidModule return fnCache; } + @Provides + @Named("namespaceReverseExtractionFunctionCache") + public ConcurrentMap>> getReverseFnCache() + { + return reverseFnCache; + } + @Provides @Named("dimExtractionNamespace") @LazySingleton @@ -198,4 +206,27 @@ public class NamespacedExtractionModule implements DruidModule } }; } + + @Provides + @Named("dimReverseExtractionNamespace") + @LazySingleton + public Function>> getReverseFunctionMaker( + @Named("namespaceReverseExtractionFunctionCache") + final ConcurrentMap>> reverseFn + ) + { + return new Function>>() + { + @Nullable + @Override + public Function> apply(final String namespace) + { + Function> fn = reverseFn.get(namespace); + if (fn == null) { + throw new IAE("Namespace reverse function [%s] not found", namespace); + } + return fn; + } + }; + } } diff --git a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactory.java b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactory.java index da98a82f171..26a69fb4d45 100644 --- a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactory.java +++ b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactory.java @@ -20,8 +20,11 @@ package io.druid.server.namespace; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.io.ByteSource; import com.google.inject.Inject; import com.metamx.common.CompressionUtils; @@ -42,6 +45,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.regex.Pattern; @@ -64,7 +68,7 @@ public class URIExtractionNamespaceFunctionFactory implements ExtractionNamespac } @Override - public Function build(URIExtractionNamespace extractionNamespace, final Map cache) + public Function buildFn(URIExtractionNamespace extractionNamespace, final Map cache) { return new Function() { @@ -80,6 +84,29 @@ public class URIExtractionNamespaceFunctionFactory implements ExtractionNamespac }; } + @Override + public Function> buildReverseFn( + URIExtractionNamespace extractionNamespace, final Map cache + ) + { + return new Function>() + { + @Nullable + @Override + public List apply(@Nullable final String value) + { + return Lists.newArrayList(Maps.filterKeys(cache, new Predicate() + { + @Override + public boolean apply(@Nullable String key) + { + return cache.get(key).equals(Strings.nullToEmpty(value)); + } + }).keySet()); + } + }; + } + @Override public Callable getCachePopulator( final URIExtractionNamespace extractionNamespace, diff --git a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java index 448ed879043..7dbfd441e82 100644 --- a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java +++ b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java @@ -43,6 +43,7 @@ import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory; import javax.annotation.Nullable; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -85,11 +86,13 @@ public abstract class NamespaceExtractionCacheManager final String name; final AtomicBoolean enabled = new AtomicBoolean(false); final AtomicReference> fn = new AtomicReference<>(null); + final AtomicReference>> reverseFn = new AtomicReference<>(null); } private static final Logger log = new Logger(NamespaceExtractionCacheManager.class); private final ListeningScheduledExecutorService listeningScheduledExecutorService; protected final ConcurrentMap> fnCache; + protected final ConcurrentMap>> reverseFnCache; protected final ConcurrentMap implData = new ConcurrentHashMap<>(); protected final AtomicLong tasksStarted = new AtomicLong(0); protected final AtomicLong dataSize = new AtomicLong(0); @@ -100,6 +103,7 @@ public abstract class NamespaceExtractionCacheManager public NamespaceExtractionCacheManager( Lifecycle lifecycle, final ConcurrentMap> fnCache, + final ConcurrentMap>> reverseFnCache, final ServiceEmitter serviceEmitter, final Map, ExtractionNamespaceFunctionFactory> namespaceFunctionFactoryMap ) @@ -117,6 +121,7 @@ public abstract class NamespaceExtractionCacheManager ExecutorServices.manageLifecycle(lifecycle, listeningScheduledExecutorService); this.serviceEmitter = serviceEmitter; this.fnCache = fnCache; + this.reverseFnCache = reverseFnCache; this.namespaceFunctionFactoryMap = namespaceFunctionFactoryMap; listeningScheduledExecutorService.scheduleAtFixedRate( new Runnable() @@ -177,12 +182,18 @@ public abstract class NamespaceExtractionCacheManager return; } swapAndClearCache(nsName, cacheId); - final Function fn = factory.build(namespace, getCacheMap(nsName)); + final Function fn = factory.buildFn(namespace, getCacheMap(nsName)); + final Function> reverseFn = factory.buildReverseFn(namespace, getCacheMap(nsName)); final Function priorFn = fnCache.put(nsName, fn); + final Function> priorReverseFn = reverseFnCache.put(nsName, reverseFn); if (priorFn != null && priorFn != namespaceDatum.fn.get()) { log.warn("Replaced prior function for namespace [%s]", nsName); } + if (priorReverseFn != null && priorReverseFn != namespaceDatum.reverseFn.get()) { + log.warn("Replaced prior reverse function for namespace [%s]", nsName); + } namespaceDatum.fn.set(fn); + namespaceDatum.reverseFn.set(reverseFn); } } }; diff --git a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OffHeapNamespaceExtractionCacheManager.java b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OffHeapNamespaceExtractionCacheManager.java index dff26abc780..951be73206b 100644 --- a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OffHeapNamespaceExtractionCacheManager.java +++ b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OffHeapNamespaceExtractionCacheManager.java @@ -35,6 +35,7 @@ import org.mapdb.DBMaker; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -57,11 +58,13 @@ public class OffHeapNamespaceExtractionCacheManager extends NamespaceExtractionC Lifecycle lifecycle, @Named("namespaceExtractionFunctionCache") ConcurrentMap> fnCache, + @Named("namespaceReverseExtractionFunctionCache") + ConcurrentMap>> reverseFnCache, ServiceEmitter emitter, final Map, ExtractionNamespaceFunctionFactory> namespaceFunctionFactoryMap ) { - super(lifecycle, fnCache, emitter, namespaceFunctionFactoryMap); + super(lifecycle, fnCache, reverseFnCache, emitter, namespaceFunctionFactoryMap); try { tmpFile = File.createTempFile("druidMapDB", getClass().getCanonicalName()); log.info("Using file [%s] for mapDB off heap namespace cache", tmpFile.getAbsolutePath()); diff --git a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java index 15e5c398e74..8d5c14d12ed 100644 --- a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java +++ b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java @@ -28,6 +28,7 @@ import com.metamx.emitter.service.ServiceEmitter; import io.druid.query.extraction.namespace.ExtractionNamespace; import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -46,11 +47,13 @@ public class OnHeapNamespaceExtractionCacheManager extends NamespaceExtractionCa final Lifecycle lifecycle, @Named("namespaceExtractionFunctionCache") final ConcurrentMap> fnCache, + @Named("namespaceReverseExtractionFunctionCache") + final ConcurrentMap>> reverseFnCache, final ServiceEmitter emitter, final Map, ExtractionNamespaceFunctionFactory> namespaceFunctionFactoryMap ) { - super(lifecycle, fnCache, emitter, namespaceFunctionFactoryMap); + super(lifecycle, fnCache, reverseFnCache,emitter, namespaceFunctionFactoryMap); } @Override diff --git a/extensions/namespace-lookup/src/test/java/io/druid/query/extraction/namespace/NamespacedExtractorTest.java b/extensions/namespace-lookup/src/test/java/io/druid/query/extraction/namespace/NamespacedExtractorTest.java index 3f46b4ff0fd..f8987f8aa28 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/query/extraction/namespace/NamespacedExtractorTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/query/extraction/namespace/NamespacedExtractorTest.java @@ -27,6 +27,9 @@ import org.junit.BeforeClass; import org.junit.Test; import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -46,6 +49,17 @@ public class NamespacedExtractorTest return Strings.isNullOrEmpty(input) ? null : input; } }; + + private static final Function> NOOP_REVERSE_FN = new Function>() + { + @Nullable + @Override + public List apply(@Nullable String input) + { + return Strings.isNullOrEmpty(input) ? Collections.emptyList() : Arrays.asList(input); + } + }; + private static final Function> defaultFnFinder = new Function>() { @Nullable @@ -56,6 +70,17 @@ public class NamespacedExtractorTest return fn == null ? NOOP_FN : fn; } }; + + private static final Function>> defaultReverseFnFinder = new Function>>() + { + @Nullable + @Override + public Function> apply(@Nullable final String value) + { + return NOOP_REVERSE_FN; + } + }; + @BeforeClass public static void setupStatic() { @@ -108,20 +133,26 @@ public class NamespacedExtractorTest @Test public void testSimpleNamespace() { - NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, "noop"); + NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, defaultReverseFnFinder, "noop"); for (int i = 0; i < 10; ++i) { final String val = UUID.randomUUID().toString(); Assert.assertEquals(val, namespacedExtractor.apply(val)); + Assert.assertEquals(Arrays.asList(val), namespacedExtractor.unApply(val)); } Assert.assertEquals("", namespacedExtractor.apply("")); Assert.assertNull(namespacedExtractor.apply(null)); + Assert.assertEquals(Collections.emptyList(), namespacedExtractor.unApply(null)); Assert.assertEquals("The awesomeness", namespacedExtractor.apply("The awesomeness")); } @Test public void testUnknownNamespace() { - NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, "HFJDKSHFUINEWUINIUENFIUENFUNEWI"); + NamespacedExtractor namespacedExtractor = new NamespacedExtractor( + defaultFnFinder, + defaultReverseFnFinder, + "HFJDKSHFUINEWUINIUENFIUENFUNEWI" + ); for (int i = 0; i < 10; ++i) { final String val = UUID.randomUUID().toString(); Assert.assertEquals(val, namespacedExtractor.apply(val)); @@ -133,7 +164,7 @@ public class NamespacedExtractorTest @Test public void testTurtles() { - NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, "turtles"); + NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, defaultReverseFnFinder, "turtles"); for (int i = 0; i < 10; ++i) { final String val = UUID.randomUUID().toString(); Assert.assertEquals("turtle", namespacedExtractor.apply(val)); @@ -145,7 +176,7 @@ public class NamespacedExtractorTest @Test public void testEmpty() { - NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, "empty"); + NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, defaultReverseFnFinder, "empty"); Assert.assertEquals("", namespacedExtractor.apply("")); Assert.assertEquals("", namespacedExtractor.apply(null)); Assert.assertEquals("", namespacedExtractor.apply("anything")); @@ -154,7 +185,7 @@ public class NamespacedExtractorTest @Test public void testNull() { - NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, "null"); + NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, defaultReverseFnFinder, "null"); Assert.assertNull(namespacedExtractor.apply("")); Assert.assertNull(namespacedExtractor.apply(null)); } @@ -162,7 +193,7 @@ public class NamespacedExtractorTest @Test public void testBlankMissingValueIsNull() { - NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, "null"); + NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, defaultReverseFnFinder, "null"); Assert.assertNull(namespacedExtractor.apply("fh43u1i2")); Assert.assertNull(namespacedExtractor.apply("")); Assert.assertNull(namespacedExtractor.apply(null)); diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/NamespacedExtractorModuleTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/NamespacedExtractorModuleTest.java index 75b2857188d..8ee09d3f8c6 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/NamespacedExtractorModuleTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/NamespacedExtractorModuleTest.java @@ -49,6 +49,7 @@ import java.io.OutputStreamWriter; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -84,6 +85,7 @@ public class NamespacedExtractorModuleTest cacheManager = new OnHeapNamespaceExtractionCacheManager( lifecycle, new ConcurrentHashMap>(), + new ConcurrentHashMap>>(), new NoopServiceEmitter(), factoryMap ); fnCache.clear(); diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactoryTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactoryTest.java index 095eca88d6f..1a7e0c53c27 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactoryTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactoryTest.java @@ -24,6 +24,7 @@ import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; import com.metamx.common.UOE; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.emitter.service.ServiceEmitter; @@ -60,6 +61,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -136,12 +138,14 @@ public class URIExtractionNamespaceFunctionFactoryTest OnHeapNamespaceExtractionCacheManager.class.getConstructor( Lifecycle.class, ConcurrentMap.class, + ConcurrentMap.class, ServiceEmitter.class, Map.class ), OffHeapNamespaceExtractionCacheManager.class.getConstructor( Lifecycle.class, ConcurrentMap.class, + ConcurrentMap.class, ServiceEmitter.class, Map.class ) @@ -173,6 +177,7 @@ public class URIExtractionNamespaceFunctionFactoryTest manager = constructor.newInstance( new Lifecycle(), new ConcurrentHashMap>(), + new ConcurrentHashMap>(), new NoopServiceEmitter(), new HashMap, ExtractionNamespaceFunctionFactory>() ); @@ -222,9 +227,11 @@ public class URIExtractionNamespaceFunctionFactoryTest this.outStreamSupplier = outStreamSupplier; this.lifecycle = new Lifecycle(); this.fnCache = new ConcurrentHashMap<>(); + this.reverseFnCache = new ConcurrentHashMap<>(); this.manager = cacheManagerConstructor.newInstance( lifecycle, fnCache, + reverseFnCache, new NoopServiceEmitter(), namespaceFunctionFactoryMap ); @@ -251,6 +258,7 @@ public class URIExtractionNamespaceFunctionFactoryTest private URIExtractionNamespaceFunctionFactory factory; private URIExtractionNamespace namespace; private ConcurrentHashMap> fnCache; + private ConcurrentHashMap>> reverseFnCache; @Before public void setUp() throws Exception @@ -262,7 +270,16 @@ public class URIExtractionNamespaceFunctionFactoryTest final ObjectMapper mapper = new DefaultObjectMapper(); try (OutputStream ostream = outStreamSupplier.apply(tmpFile)) { try (OutputStreamWriter out = new OutputStreamWriter(ostream)) { - out.write(mapper.writeValueAsString(ImmutableMap.of("foo", "bar"))); + out.write(mapper.writeValueAsString(ImmutableMap.of( + "boo", + "bar", + "foo", + "bar", + "", + "MissingValue", + "emptyString", + "" + ))); } } factory = new URIExtractionNamespaceFunctionFactory( @@ -299,6 +316,20 @@ public class URIExtractionNamespaceFunctionFactoryTest Assert.assertEquals(null, fn.apply("baz")); } + @Test + public void testReverseFunction() throws InterruptedException + { + Assert.assertNull(reverseFnCache.get(namespace.getNamespace())); + NamespaceExtractionCacheManagersTest.waitFor(manager.schedule(namespace)); + Function> reverseFn = reverseFnCache.get(namespace.getNamespace()); + Assert.assertNotNull(reverseFn); + Assert.assertEquals(Sets.newHashSet("boo", "foo"), Sets.newHashSet(reverseFn.apply("bar"))); + Assert.assertEquals(Sets.newHashSet(""), Sets.newHashSet(reverseFn.apply("MissingValue"))); + Assert.assertEquals(Sets.newHashSet("emptyString"), Sets.newHashSet(reverseFn.apply(""))); + Assert.assertEquals(Sets.newHashSet("emptyString"), Sets.newHashSet(reverseFn.apply(null))); + Assert.assertEquals(Collections.EMPTY_LIST, reverseFn.apply("baz")); + } + @Test public void simplePileONamespacesTest() throws InterruptedException { diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java index d8199dff8d0..eb7e7942742 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java @@ -20,8 +20,10 @@ package io.druid.server.namespace.cache; import com.google.common.base.Function; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import io.druid.metadata.TestDerbyConnector; @@ -41,6 +43,8 @@ import org.junit.runners.Parameterized; import org.skife.jdbi.v2.Handle; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -66,7 +70,8 @@ public class JDBCExtractionNamespaceTest private static final Map renames = ImmutableMap.of( "foo", "bar", "bad", "bar", - "how about that", "foo" + "how about that", "foo", + "empty string", "" ); @Parameterized.Parameters(name = "{0}") @@ -86,6 +91,7 @@ public class JDBCExtractionNamespaceTest } private final ConcurrentMap> fnCache = new ConcurrentHashMap<>(); + private final ConcurrentMap>> reverseFnCache = new ConcurrentHashMap<>(); private final String tsColumn; private OnHeapNamespaceExtractionCacheManager extractionCacheManager; private final Lifecycle lifecycle = new Lifecycle(); @@ -120,6 +126,7 @@ public class JDBCExtractionNamespaceTest extractionCacheManager = new OnHeapNamespaceExtractionCacheManager( lifecycle, fnCache, + reverseFnCache, new NoopServiceEmitter(), ImmutableMap., ExtractionNamespaceFunctionFactory>of( JDBCExtractionNamespace.class, @@ -219,19 +226,33 @@ public class JDBCExtractionNamespaceTest for (Map.Entry entry : renames.entrySet()) { String key = entry.getKey(); String val = entry.getValue(); - Assert.assertEquals( - "non-null check", - val, - extractionFn.apply(key) - ); + Assert.assertEquals("non-null check", Strings.emptyToNull(val), extractionFn.apply(key)); } - Assert.assertEquals( - "null check", - null, - extractionFn.apply("baz") - ); + Assert.assertEquals("null check", null, extractionFn.apply("baz")); } + @Test(timeout = 60_000L) + public void testReverseLookup() throws InterruptedException + { + final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace( + namespace, + derbyConnectorRule.getMetadataConnectorConfig(), + tableName, + keyName, + valName, + tsColumn, + new Period(0) + ); + NamespaceExtractionCacheManagersTest.waitFor(extractionCacheManager.schedule(extractionNamespace)); + Function> reverseExtractionFn = reverseFnCache.get(extractionNamespace.getNamespace()); + Assert.assertEquals("reverse lookup should match", Sets.newHashSet("foo", "bad"), Sets.newHashSet(reverseExtractionFn.apply("bar"))); + Assert.assertEquals("reverse lookup should match", Sets.newHashSet("how about that"), Sets.newHashSet(reverseExtractionFn.apply("foo"))); + Assert.assertEquals("reverse lookup should match", Sets.newHashSet("empty string"), Sets.newHashSet(reverseExtractionFn.apply(""))); + Assert.assertEquals("null is same as empty string", Sets.newHashSet("empty string"), Sets.newHashSet(reverseExtractionFn.apply(null))); + Assert.assertEquals("reverse lookup of none existing value should be empty list", + Collections.EMPTY_LIST, + reverseExtractionFn.apply("does't exist")); + } @Test(timeout = 60_000L) public void testSkipOld() diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java index dca52f7eea9..eb725c868a9 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java @@ -56,6 +56,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -80,6 +81,7 @@ public class NamespaceExtractionCacheManagerExecutorsTest private NamespaceExtractionCacheManager manager; private File tmpFile; private final ConcurrentMap> fnCache = new ConcurrentHashMap<>(); + private final ConcurrentMap>> reverseFnCache = new ConcurrentHashMap<>(); private final ConcurrentMap cacheUpdateAlerts = new ConcurrentHashMap<>(); private final AtomicLong numRuns = new AtomicLong(0L); @@ -114,7 +116,7 @@ public class NamespaceExtractionCacheManagerExecutorsTest } }; manager = new OnHeapNamespaceExtractionCacheManager( - lifecycle, fnCache, new NoopServiceEmitter(), + lifecycle, fnCache, reverseFnCache, new NoopServiceEmitter(), ImmutableMap., ExtractionNamespaceFunctionFactory>of( URIExtractionNamespace.class, factory diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java index fd38b943911..97f279a304f 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java @@ -62,11 +62,13 @@ public class NamespaceExtractionCacheManagersTest ArrayList params = new ArrayList<>(); ConcurrentMap> fnMap = new ConcurrentHashMap>(); + ConcurrentMap>> reverserFnMap = new ConcurrentHashMap>>(); params.add( new Object[]{ new OffHeapNamespaceExtractionCacheManager( lifecycle, fnMap, + reverserFnMap, new NoopServiceEmitter(), ImmutableMap., ExtractionNamespaceFunctionFactory>of() ), fnMap @@ -77,6 +79,7 @@ public class NamespaceExtractionCacheManagersTest new OnHeapNamespaceExtractionCacheManager( lifecycle, fnMap, + reverserFnMap, new NoopServiceEmitter(), ImmutableMap., ExtractionNamespaceFunctionFactory>of() ), fnMap diff --git a/processing/src/main/java/io/druid/query/extraction/LookupExtractor.java b/processing/src/main/java/io/druid/query/extraction/LookupExtractor.java index 49c57099ddf..4784ca95450 100644 --- a/processing/src/main/java/io/druid/query/extraction/LookupExtractor.java +++ b/processing/src/main/java/io/druid/query/extraction/LookupExtractor.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; +import java.util.List; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { @@ -39,6 +40,18 @@ public interface LookupExtractor */ @Nullable String apply(@NotNull String key); + /** + * Provide the reverse mapping from a given value to a list of keys + * @param value the value to apply the reverse lookup + * Null and empty are considered to be the same value = nullToEmpty(value) + * + * @return the list of keys that maps to value or empty list. + * Note that for the case of a none existing value in the lookup we have to cases either return an empty list OR list with null element. + * returning an empty list implies that user want to ignore such a lookup value. + * In the other hand returning a list with the null element implies user want to map the none existing value to the key null. + */ + List unApply(String value); + /** * Create a cache key for use in results caching * @return A byte array that can be used to uniquely identify if results of a prior lookup can use the cached values diff --git a/processing/src/main/java/io/druid/query/extraction/MapLookupExtractor.java b/processing/src/main/java/io/druid/query/extraction/MapLookupExtractor.java index cfecf3fa894..f0a7e1c3308 100644 --- a/processing/src/main/java/io/druid/query/extraction/MapLookupExtractor.java +++ b/processing/src/main/java/io/druid/query/extraction/MapLookupExtractor.java @@ -23,15 +23,19 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.metamx.common.StringUtils; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.List; import java.util.Map; @JsonTypeName("map") @@ -60,6 +64,19 @@ public class MapLookupExtractor implements LookupExtractor return map.get(val); } + @Override + public List unApply(final String value) + { + return Lists.newArrayList(Maps.filterKeys(map, new Predicate() + { + @Override public boolean apply(@Nullable String key) + { + return map.get(key).equals(Strings.nullToEmpty(value)); + } + }).keySet()); + + } + @Override public byte[] getCacheKey() { diff --git a/processing/src/test/java/io/druid/query/extraction/MapLookupExtractorTest.java b/processing/src/test/java/io/druid/query/extraction/MapLookupExtractorTest.java index 8f51d2b6fc3..4216947e903 100644 --- a/processing/src/test/java/io/druid/query/extraction/MapLookupExtractorTest.java +++ b/processing/src/test/java/io/druid/query/extraction/MapLookupExtractorTest.java @@ -20,20 +20,36 @@ package io.druid.query.extraction; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; +import java.util.Map; public class MapLookupExtractorTest { - private final MapLookupExtractor fn = new MapLookupExtractor(ImmutableMap.of("foo", "bar")); + private final Map lookupMap = ImmutableMap.of("foo", "bar", "null", "", "empty String", "", "","empty_string"); + private final MapLookupExtractor fn = new MapLookupExtractor(lookupMap); + + @Test + public void testUnApply() + { + Assert.assertEquals(Arrays.asList("foo"), fn.unApply("bar")); + Assert.assertEquals(Sets.newHashSet("null", "empty String"), Sets.newHashSet(fn.unApply(""))); + Assert.assertEquals("Null value should be equal to empty string", + Sets.newHashSet("null", "empty String"), + Sets.newHashSet(fn.unApply(null))); + Assert.assertEquals(Sets.newHashSet(""), Sets.newHashSet(fn.unApply("empty_string"))); + Assert.assertEquals("not existing value returns empty list", Collections.EMPTY_LIST, fn.unApply("not There")); + } @Test public void testGetMap() throws Exception { - Assert.assertEquals(ImmutableMap.of("foo", "bar"), fn.getMap()); + Assert.assertEquals(lookupMap, fn.getMap()); } @Test @@ -46,7 +62,7 @@ public class MapLookupExtractorTest @Test public void testGetCacheKey() throws Exception { - final MapLookupExtractor fn2 = new MapLookupExtractor(ImmutableMap.of("foo", "bar")); + final MapLookupExtractor fn2 = new MapLookupExtractor(ImmutableMap.copyOf(lookupMap)); Assert.assertArrayEquals(fn.getCacheKey(), fn2.getCacheKey()); final MapLookupExtractor fn3 = new MapLookupExtractor(ImmutableMap.of("foo2", "bar")); Assert.assertFalse(Arrays.equals(fn.getCacheKey(), fn3.getCacheKey())); @@ -57,7 +73,7 @@ public class MapLookupExtractorTest @Test public void testEquals() throws Exception { - final MapLookupExtractor fn2 = new MapLookupExtractor(ImmutableMap.of("foo", "bar")); + final MapLookupExtractor fn2 = new MapLookupExtractor(ImmutableMap.copyOf(lookupMap)); Assert.assertEquals(fn, fn2); final MapLookupExtractor fn3 = new MapLookupExtractor(ImmutableMap.of("foo2", "bar")); Assert.assertNotEquals(fn, fn3); @@ -68,7 +84,7 @@ public class MapLookupExtractorTest @Test public void testHashCode() throws Exception { - final MapLookupExtractor fn2 = new MapLookupExtractor(ImmutableMap.of("foo", "bar")); + final MapLookupExtractor fn2 = new MapLookupExtractor(ImmutableMap.copyOf(lookupMap)); Assert.assertEquals(fn.hashCode(), fn2.hashCode()); final MapLookupExtractor fn3 = new MapLookupExtractor(ImmutableMap.of("foo2", "bar")); Assert.assertNotEquals(fn.hashCode(), fn3.hashCode());