Merge pull request #2029 from b-slim/add_reverse_fn

Adding reverse lookup function to LookupExtractor.
This commit is contained in:
Fangjin Yang 2015-12-09 12:50:13 -08:00
commit f4ba13a1ac
19 changed files with 338 additions and 41 deletions

View File

@ -20,12 +20,16 @@
package io.druid.server.namespace;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
import io.druid.query.extraction.namespace.KafkaExtractionNamespace;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@ -47,7 +51,7 @@ public class KafkaExtractionNamespaceFactory implements ExtractionNamespaceFunct
@Override
public Function<String, String> build(KafkaExtractionNamespace extractionNamespace, final Map<String, String> cache)
public Function<String, String> buildFn(KafkaExtractionNamespace extractionNamespace, final Map<String, String> cache)
{
return new Function<String, String>()
{
@ -63,6 +67,28 @@ public class KafkaExtractionNamespaceFactory implements ExtractionNamespaceFunct
};
}
@Override
public Function<String, List<String>> buildReverseFn(
KafkaExtractionNamespace extractionNamespace, final Map<String, String> cache
)
{
return new Function<String, List<String>>()
{
@Nullable
@Override
public List<String> apply(@Nullable final String value)
{
return Lists.newArrayList(Maps.filterKeys(cache, new Predicate<String>()
{
@Override public boolean apply(@Nullable String key)
{
return cache.get(key).equals(Strings.nullToEmpty(value));
}
}).keySet());
}
};
}
// This only fires ONCE when the namespace is first added. The version is updated externally as events come in
@Override
public Callable<String> getCachePopulator(

View File

@ -64,8 +64,10 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
@ -81,7 +83,6 @@ public class TestKafkaExtractionCluster
private static final String namespace = "testNamespace";
private static TestingServer zkTestServer;
private static KafkaExtractionManager renameManager;
private static final ConcurrentMap<String, Function<String, String>> fnCache = new ConcurrentHashMap<>();
private static final Lifecycle lifecycle = new Lifecycle();
private static NamespaceExtractionCacheManager extractionCacheManager;
@ -90,16 +91,18 @@ public class TestKafkaExtractionCluster
private static Injector injector;
public static class KafkaFactoryProvider implements Provider<ExtractionNamespaceFunctionFactory<?>>
{
private final KafkaExtractionManager kafkaExtractionManager;
@Inject
public KafkaFactoryProvider(
KafkaExtractionManager kafkaExtractionManager
){
)
{
this.kafkaExtractionManager = kafkaExtractionManager;
}
@Override
public ExtractionNamespaceFunctionFactory<?> get()
{
@ -195,7 +198,6 @@ public class TestKafkaExtractionCluster
finally {
zkClient.close();
}
fnCache.clear();
final Properties kafkaProducerProperties = makeProducerProperties();
Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(new ProducerConfig(kafkaProducerProperties));
try {
@ -280,7 +282,8 @@ public class TestKafkaExtractionCluster
if (zkClient.exists("/kafka")) {
try {
zkClient.deleteRecursive("/kafka");
}catch(org.I0Itec.zkclient.exception.ZkException ex){
}
catch (org.I0Itec.zkclient.exception.ZkException ex) {
log.warn(ex, "error deleting /kafka zk node");
}
}
@ -289,12 +292,13 @@ public class TestKafkaExtractionCluster
if (null != zkTestServer) {
zkTestServer.stop();
}
if(tmpDir.exists()){
if (tmpDir.exists()) {
FileUtils.deleteDirectory(tmpDir);
}
}
private static final Properties makeProducerProperties(){
private static final Properties makeProducerProperties()
{
final Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.putAll(kafkaProperties);
kafkaProducerProperties.put(
@ -320,10 +324,16 @@ public class TestKafkaExtractionCluster
final Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(new ProducerConfig(kafkaProducerProperties));
try {
checkServer();
final ConcurrentMap<String, Function<String, String>> fnFn = injector.getInstance(Key.get(new TypeLiteral<ConcurrentMap<String, Function<String, String>>>(){}, Names.named("namespaceExtractionFunctionCache")));
final ConcurrentMap<String, Function<String, String>> fnFn = injector.getInstance(Key.get(new TypeLiteral<ConcurrentMap<String, Function<String, String>>>()
{
}, Names.named("namespaceExtractionFunctionCache")));
final ConcurrentMap<String, Function<String, List<String>>> reverseFn = injector.getInstance(Key.get(new TypeLiteral<ConcurrentMap<String, Function<String, List<String>>>>()
{
}, Names.named("namespaceReverseExtractionFunctionCache")));
KafkaExtractionNamespace extractionNamespace = new KafkaExtractionNamespace(topicName, namespace);
Assert.assertEquals(null, fnFn.get(extractionNamespace.getNamespace()).apply("foo"));
Assert.assertEquals(Collections.EMPTY_LIST, reverseFn.get(extractionNamespace.getNamespace()).apply("foo"));
long events = renameManager.getNumEvents(namespace);
@ -340,6 +350,7 @@ public class TestKafkaExtractionCluster
log.info("------------------------- Checking foo bar -------------------------------");
Assert.assertEquals("bar", fnFn.get(extractionNamespace.getNamespace()).apply("foo"));
Assert.assertEquals(Arrays.asList("foo"), reverseFn.get(extractionNamespace.getNamespace()).apply("bar"));
Assert.assertEquals(null, fnFn.get(extractionNamespace.getNamespace()).apply("baz"));
checkServer();
@ -356,6 +367,7 @@ public class TestKafkaExtractionCluster
log.info("------------------------- Checking baz bat -------------------------------");
Assert.assertEquals("bat", fnFn.get(extractionNamespace.getNamespace()).apply("baz"));
Assert.assertEquals(Arrays.asList("baz"), reverseFn.get(extractionNamespace.getNamespace()).apply("bat"));
}
finally {
producer.close();

View File

@ -30,6 +30,7 @@ import com.metamx.common.StringUtils;
import javax.validation.constraints.NotNull;
import java.nio.ByteBuffer;
import java.util.List;
/**
* Namespaced extraction is a special case of DimExtractionFn where the actual extractor is pulled from a map of known implementations.
@ -42,17 +43,24 @@ public class NamespacedExtractor implements LookupExtractor
private final String namespace;
private final Function<String, String> extractionFunction;
private final Function<String, List<String>> reverseExtractionFunction;
@JsonCreator
public NamespacedExtractor(
@NotNull @JacksonInject @Named("dimExtractionNamespace")
final Function<String, Function<String, String>> namespaces,
@NotNull @JacksonInject @Named("reverseDimExtractionNamespace")
final Function<String, Function<String, List<String>>> reverseNamespaces,
@NotNull @JsonProperty(value = "namespace", required = true)
final String namespace
)
{
this.namespace = Preconditions.checkNotNull(namespace, "namespace");
this.extractionFunction = Preconditions.checkNotNull(namespaces.apply(namespace), "no namespace found");
this.reverseExtractionFunction = Preconditions.checkNotNull(
reverseNamespaces.apply(namespace),
"can not found reverse extraction function"
);
}
@JsonProperty("namespace")
@ -74,4 +82,9 @@ public class NamespacedExtractor implements LookupExtractor
return extractionFunction.apply(value);
}
@Override
public List<String> unApply(@NotNull String value)
{
return reverseExtractionFunction.apply(value);
}
}

View File

@ -21,6 +21,7 @@ package io.druid.query.extraction.namespace;
import com.google.common.base.Function;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@ -39,7 +40,16 @@ public interface ExtractionNamespaceFunctionFactory<T extends ExtractionNamespac
*
* @return A function which will perform an extraction in accordance with the desires of the ExtractionNamespace
*/
Function<String, String> build(T extractionNamespace, Map<String, String> cache);
Function<String, String> buildFn(T extractionNamespace, Map<String, String> cache);
/**
* @param extractionNamespace The ExtractionNamespace for which a manipulating reverse function is needed.
* @param cache view of the cache containing the function mapping.
*
* @return A function that will perform reverse lookup.
*/
Function<String, List<String>> buildReverseFn(T extractionNamespace, final Map<String, String> cache);
/**
* This function is called once if `ExtractionNamespace.getUpdateMs() == 0`, or every update if
@ -54,7 +64,7 @@ public interface ExtractionNamespaceFunctionFactory<T extends ExtractionNamespac
* @param lastVersion The version which was last cached
* @param swap The temporary Map into which data may be placed and will be "swapped" with the proper
* namespace Map in NamespaceExtractionCacheManager. Implementations which cannot offer
* a swappable cache of the data may ignore this but must make sure `build(...)` returns
* a swappable cache of the data may ignore this but must make sure `buildFn(...)` returns
* a proper Function.
*
* @return A callable that will be used to refresh resources of the namespace and return the version string used in

View File

@ -20,7 +20,10 @@
package io.druid.server.namespace;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.Pair;
import io.druid.common.utils.JodaUtils;
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
@ -51,7 +54,7 @@ public class JDBCExtractionNamespaceFunctionFactory
private final ConcurrentMap<String, DBI> dbiCache = new ConcurrentHashMap<>();
@Override
public Function<String, String> build(JDBCExtractionNamespace extractionNamespace, final Map<String, String> cache)
public Function<String, String> buildFn(JDBCExtractionNamespace extractionNamespace, final Map<String, String> cache)
{
return new Function<String, String>()
{
@ -67,6 +70,28 @@ public class JDBCExtractionNamespaceFunctionFactory
};
}
@Override
public Function<String, List<String>> buildReverseFn(
JDBCExtractionNamespace extractionNamespace, final Map<String, String> cache
)
{
return new Function<String, List<String>>()
{
@Nullable
@Override
public List<String> apply(@Nullable final String value)
{
return Lists.newArrayList(Maps.filterKeys(cache, new Predicate<String>()
{
@Override public boolean apply(@Nullable String key)
{
return cache.get(key).equals(Strings.nullToEmpty(value));
}
}).keySet());
}
};
}
@Override
public Callable<String> getCachePopulator(
final JDBCExtractionNamespace namespace,

View File

@ -66,6 +66,7 @@ public class NamespacedExtractionModule implements DruidModule
private static final String TYPE_PREFIX = "druid.query.extraction.namespace.cache.type";
private static final String STATIC_CONFIG_PREFIX = "druid.query.extraction.namespace";
private final ConcurrentMap<String, Function<String, String>> fnCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Function<String, List<String>>> reverseFnCache= new ConcurrentHashMap<>();
@Override
public List<? extends Module> getJacksonModules()
@ -176,6 +177,13 @@ public class NamespacedExtractionModule implements DruidModule
return fnCache;
}
@Provides
@Named("namespaceReverseExtractionFunctionCache")
public ConcurrentMap<String, Function<String, List<String>>> getReverseFnCache()
{
return reverseFnCache;
}
@Provides
@Named("dimExtractionNamespace")
@LazySingleton
@ -198,4 +206,27 @@ public class NamespacedExtractionModule implements DruidModule
}
};
}
@Provides
@Named("dimReverseExtractionNamespace")
@LazySingleton
public Function<String, Function<String, List<String>>> getReverseFunctionMaker(
@Named("namespaceReverseExtractionFunctionCache")
final ConcurrentMap<String, Function<String, List<String>>> reverseFn
)
{
return new Function<String, Function<String, List<String>>>()
{
@Nullable
@Override
public Function<String, List<String>> apply(final String namespace)
{
Function<String, List<String>> fn = reverseFn.get(namespace);
if (fn == null) {
throw new IAE("Namespace reverse function [%s] not found", namespace);
}
return fn;
}
};
}
}

View File

@ -20,8 +20,11 @@
package io.druid.server.namespace;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.metamx.common.CompressionUtils;
@ -42,6 +45,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
@ -64,7 +68,7 @@ public class URIExtractionNamespaceFunctionFactory implements ExtractionNamespac
}
@Override
public Function<String, String> build(URIExtractionNamespace extractionNamespace, final Map<String, String> cache)
public Function<String, String> buildFn(URIExtractionNamespace extractionNamespace, final Map<String, String> cache)
{
return new Function<String, String>()
{
@ -80,6 +84,29 @@ public class URIExtractionNamespaceFunctionFactory implements ExtractionNamespac
};
}
@Override
public Function<String, List<String>> buildReverseFn(
URIExtractionNamespace extractionNamespace, final Map<String, String> cache
)
{
return new Function<String, List<String>>()
{
@Nullable
@Override
public List<String> apply(@Nullable final String value)
{
return Lists.newArrayList(Maps.filterKeys(cache, new Predicate<String>()
{
@Override
public boolean apply(@Nullable String key)
{
return cache.get(key).equals(Strings.nullToEmpty(value));
}
}).keySet());
}
};
}
@Override
public Callable<String> getCachePopulator(
final URIExtractionNamespace extractionNamespace,

View File

@ -43,6 +43,7 @@ import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@ -85,11 +86,13 @@ public abstract class NamespaceExtractionCacheManager
final String name;
final AtomicBoolean enabled = new AtomicBoolean(false);
final AtomicReference<Function<String, String>> fn = new AtomicReference<>(null);
final AtomicReference<Function<String, List<String>>> reverseFn = new AtomicReference<>(null);
}
private static final Logger log = new Logger(NamespaceExtractionCacheManager.class);
private final ListeningScheduledExecutorService listeningScheduledExecutorService;
protected final ConcurrentMap<String, Function<String, String>> fnCache;
protected final ConcurrentMap<String, Function<String, List<String>>> reverseFnCache;
protected final ConcurrentMap<String, NamespaceImplData> implData = new ConcurrentHashMap<>();
protected final AtomicLong tasksStarted = new AtomicLong(0);
protected final AtomicLong dataSize = new AtomicLong(0);
@ -100,6 +103,7 @@ public abstract class NamespaceExtractionCacheManager
public NamespaceExtractionCacheManager(
Lifecycle lifecycle,
final ConcurrentMap<String, Function<String, String>> fnCache,
final ConcurrentMap<String, Function<String, List<String>>> reverseFnCache,
final ServiceEmitter serviceEmitter,
final Map<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>> namespaceFunctionFactoryMap
)
@ -117,6 +121,7 @@ public abstract class NamespaceExtractionCacheManager
ExecutorServices.manageLifecycle(lifecycle, listeningScheduledExecutorService);
this.serviceEmitter = serviceEmitter;
this.fnCache = fnCache;
this.reverseFnCache = reverseFnCache;
this.namespaceFunctionFactoryMap = namespaceFunctionFactoryMap;
listeningScheduledExecutorService.scheduleAtFixedRate(
new Runnable()
@ -177,12 +182,18 @@ public abstract class NamespaceExtractionCacheManager
return;
}
swapAndClearCache(nsName, cacheId);
final Function<String, String> fn = factory.build(namespace, getCacheMap(nsName));
final Function<String, String> fn = factory.buildFn(namespace, getCacheMap(nsName));
final Function<String, List<String>> reverseFn = factory.buildReverseFn(namespace, getCacheMap(nsName));
final Function<String, String> priorFn = fnCache.put(nsName, fn);
final Function<String, List<String>> priorReverseFn = reverseFnCache.put(nsName, reverseFn);
if (priorFn != null && priorFn != namespaceDatum.fn.get()) {
log.warn("Replaced prior function for namespace [%s]", nsName);
}
if (priorReverseFn != null && priorReverseFn != namespaceDatum.reverseFn.get()) {
log.warn("Replaced prior reverse function for namespace [%s]", nsName);
}
namespaceDatum.fn.set(fn);
namespaceDatum.reverseFn.set(reverseFn);
}
}
};

View File

@ -35,6 +35,7 @@ import org.mapdb.DBMaker;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -57,11 +58,13 @@ public class OffHeapNamespaceExtractionCacheManager extends NamespaceExtractionC
Lifecycle lifecycle,
@Named("namespaceExtractionFunctionCache")
ConcurrentMap<String, Function<String, String>> fnCache,
@Named("namespaceReverseExtractionFunctionCache")
ConcurrentMap<String, Function<String, List<String>>> reverseFnCache,
ServiceEmitter emitter,
final Map<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>> namespaceFunctionFactoryMap
)
{
super(lifecycle, fnCache, emitter, namespaceFunctionFactoryMap);
super(lifecycle, fnCache, reverseFnCache, emitter, namespaceFunctionFactoryMap);
try {
tmpFile = File.createTempFile("druidMapDB", getClass().getCanonicalName());
log.info("Using file [%s] for mapDB off heap namespace cache", tmpFile.getAbsolutePath());

View File

@ -28,6 +28,7 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.query.extraction.namespace.ExtractionNamespace;
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -46,11 +47,13 @@ public class OnHeapNamespaceExtractionCacheManager extends NamespaceExtractionCa
final Lifecycle lifecycle,
@Named("namespaceExtractionFunctionCache")
final ConcurrentMap<String, Function<String, String>> fnCache,
@Named("namespaceReverseExtractionFunctionCache")
final ConcurrentMap<String, Function<String, List<String>>> reverseFnCache,
final ServiceEmitter emitter,
final Map<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>> namespaceFunctionFactoryMap
)
{
super(lifecycle, fnCache, emitter, namespaceFunctionFactoryMap);
super(lifecycle, fnCache, reverseFnCache,emitter, namespaceFunctionFactoryMap);
}
@Override

View File

@ -27,6 +27,9 @@ import org.junit.BeforeClass;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -46,6 +49,17 @@ public class NamespacedExtractorTest
return Strings.isNullOrEmpty(input) ? null : input;
}
};
private static final Function<String, List<String>> NOOP_REVERSE_FN = new Function<String, List<String>>()
{
@Nullable
@Override
public List<String> apply(@Nullable String input)
{
return Strings.isNullOrEmpty(input) ? Collections.<String>emptyList() : Arrays.asList(input);
}
};
private static final Function<String, Function<String, String>> defaultFnFinder = new Function<String, Function<String, String>>()
{
@Nullable
@ -56,6 +70,17 @@ public class NamespacedExtractorTest
return fn == null ? NOOP_FN : fn;
}
};
private static final Function<String,Function<String, List<String>>> defaultReverseFnFinder = new Function<String, Function<String,List<String>>>()
{
@Nullable
@Override
public Function<String, java.util.List<String>> apply(@Nullable final String value)
{
return NOOP_REVERSE_FN;
}
};
@BeforeClass
public static void setupStatic()
{
@ -108,20 +133,26 @@ public class NamespacedExtractorTest
@Test
public void testSimpleNamespace()
{
NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, "noop");
NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, defaultReverseFnFinder, "noop");
for (int i = 0; i < 10; ++i) {
final String val = UUID.randomUUID().toString();
Assert.assertEquals(val, namespacedExtractor.apply(val));
Assert.assertEquals(Arrays.asList(val), namespacedExtractor.unApply(val));
}
Assert.assertEquals("", namespacedExtractor.apply(""));
Assert.assertNull(namespacedExtractor.apply(null));
Assert.assertEquals(Collections.emptyList(), namespacedExtractor.unApply(null));
Assert.assertEquals("The awesomeness", namespacedExtractor.apply("The awesomeness"));
}
@Test
public void testUnknownNamespace()
{
NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, "HFJDKSHFUINEWUINIUENFIUENFUNEWI");
NamespacedExtractor namespacedExtractor = new NamespacedExtractor(
defaultFnFinder,
defaultReverseFnFinder,
"HFJDKSHFUINEWUINIUENFIUENFUNEWI"
);
for (int i = 0; i < 10; ++i) {
final String val = UUID.randomUUID().toString();
Assert.assertEquals(val, namespacedExtractor.apply(val));
@ -133,7 +164,7 @@ public class NamespacedExtractorTest
@Test
public void testTurtles()
{
NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, "turtles");
NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, defaultReverseFnFinder, "turtles");
for (int i = 0; i < 10; ++i) {
final String val = UUID.randomUUID().toString();
Assert.assertEquals("turtle", namespacedExtractor.apply(val));
@ -145,7 +176,7 @@ public class NamespacedExtractorTest
@Test
public void testEmpty()
{
NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, "empty");
NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, defaultReverseFnFinder, "empty");
Assert.assertEquals("", namespacedExtractor.apply(""));
Assert.assertEquals("", namespacedExtractor.apply(null));
Assert.assertEquals("", namespacedExtractor.apply("anything"));
@ -154,7 +185,7 @@ public class NamespacedExtractorTest
@Test
public void testNull()
{
NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, "null");
NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, defaultReverseFnFinder, "null");
Assert.assertNull(namespacedExtractor.apply(""));
Assert.assertNull(namespacedExtractor.apply(null));
}
@ -162,7 +193,7 @@ public class NamespacedExtractorTest
@Test
public void testBlankMissingValueIsNull()
{
NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, "null");
NamespacedExtractor namespacedExtractor = new NamespacedExtractor(defaultFnFinder, defaultReverseFnFinder, "null");
Assert.assertNull(namespacedExtractor.apply("fh43u1i2"));
Assert.assertNull(namespacedExtractor.apply(""));
Assert.assertNull(namespacedExtractor.apply(null));

View File

@ -49,6 +49,7 @@ import java.io.OutputStreamWriter;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -84,6 +85,7 @@ public class NamespacedExtractorModuleTest
cacheManager = new OnHeapNamespaceExtractionCacheManager(
lifecycle,
new ConcurrentHashMap<String, Function<String, String>>(),
new ConcurrentHashMap<String, Function<String, List<String>>>(),
new NoopServiceEmitter(), factoryMap
);
fnCache.clear();

View File

@ -24,6 +24,7 @@ import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.metamx.common.UOE;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.service.ServiceEmitter;
@ -60,6 +61,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -136,12 +138,14 @@ public class URIExtractionNamespaceFunctionFactoryTest
OnHeapNamespaceExtractionCacheManager.class.getConstructor(
Lifecycle.class,
ConcurrentMap.class,
ConcurrentMap.class,
ServiceEmitter.class,
Map.class
),
OffHeapNamespaceExtractionCacheManager.class.getConstructor(
Lifecycle.class,
ConcurrentMap.class,
ConcurrentMap.class,
ServiceEmitter.class,
Map.class
)
@ -173,6 +177,7 @@ public class URIExtractionNamespaceFunctionFactoryTest
manager = constructor.newInstance(
new Lifecycle(),
new ConcurrentHashMap<String, Function<String, String>>(),
new ConcurrentHashMap<String, Function<String, String>>(),
new NoopServiceEmitter(),
new HashMap<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>()
);
@ -222,9 +227,11 @@ public class URIExtractionNamespaceFunctionFactoryTest
this.outStreamSupplier = outStreamSupplier;
this.lifecycle = new Lifecycle();
this.fnCache = new ConcurrentHashMap<>();
this.reverseFnCache = new ConcurrentHashMap<>();
this.manager = cacheManagerConstructor.newInstance(
lifecycle,
fnCache,
reverseFnCache,
new NoopServiceEmitter(),
namespaceFunctionFactoryMap
);
@ -251,6 +258,7 @@ public class URIExtractionNamespaceFunctionFactoryTest
private URIExtractionNamespaceFunctionFactory factory;
private URIExtractionNamespace namespace;
private ConcurrentHashMap<String, Function<String, String>> fnCache;
private ConcurrentHashMap<String, Function<String, List<String>>> reverseFnCache;
@Before
public void setUp() throws Exception
@ -262,7 +270,16 @@ public class URIExtractionNamespaceFunctionFactoryTest
final ObjectMapper mapper = new DefaultObjectMapper();
try (OutputStream ostream = outStreamSupplier.apply(tmpFile)) {
try (OutputStreamWriter out = new OutputStreamWriter(ostream)) {
out.write(mapper.writeValueAsString(ImmutableMap.<String, String>of("foo", "bar")));
out.write(mapper.writeValueAsString(ImmutableMap.<String, String>of(
"boo",
"bar",
"foo",
"bar",
"",
"MissingValue",
"emptyString",
""
)));
}
}
factory = new URIExtractionNamespaceFunctionFactory(
@ -299,6 +316,20 @@ public class URIExtractionNamespaceFunctionFactoryTest
Assert.assertEquals(null, fn.apply("baz"));
}
@Test
public void testReverseFunction() throws InterruptedException
{
Assert.assertNull(reverseFnCache.get(namespace.getNamespace()));
NamespaceExtractionCacheManagersTest.waitFor(manager.schedule(namespace));
Function<String, List<String>> reverseFn = reverseFnCache.get(namespace.getNamespace());
Assert.assertNotNull(reverseFn);
Assert.assertEquals(Sets.newHashSet("boo", "foo"), Sets.newHashSet(reverseFn.apply("bar")));
Assert.assertEquals(Sets.newHashSet(""), Sets.newHashSet(reverseFn.apply("MissingValue")));
Assert.assertEquals(Sets.newHashSet("emptyString"), Sets.newHashSet(reverseFn.apply("")));
Assert.assertEquals(Sets.newHashSet("emptyString"), Sets.newHashSet(reverseFn.apply(null)));
Assert.assertEquals(Collections.EMPTY_LIST, reverseFn.apply("baz"));
}
@Test
public void simplePileONamespacesTest() throws InterruptedException
{

View File

@ -20,8 +20,10 @@
package io.druid.server.namespace.cache;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.druid.metadata.TestDerbyConnector;
@ -41,6 +43,8 @@ import org.junit.runners.Parameterized;
import org.skife.jdbi.v2.Handle;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@ -66,7 +70,8 @@ public class JDBCExtractionNamespaceTest
private static final Map<String, String> renames = ImmutableMap.of(
"foo", "bar",
"bad", "bar",
"how about that", "foo"
"how about that", "foo",
"empty string", ""
);
@Parameterized.Parameters(name = "{0}")
@ -86,6 +91,7 @@ public class JDBCExtractionNamespaceTest
}
private final ConcurrentMap<String, Function<String, String>> fnCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Function<String, List<String>>> reverseFnCache = new ConcurrentHashMap<>();
private final String tsColumn;
private OnHeapNamespaceExtractionCacheManager extractionCacheManager;
private final Lifecycle lifecycle = new Lifecycle();
@ -120,6 +126,7 @@ public class JDBCExtractionNamespaceTest
extractionCacheManager = new OnHeapNamespaceExtractionCacheManager(
lifecycle,
fnCache,
reverseFnCache,
new NoopServiceEmitter(),
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>of(
JDBCExtractionNamespace.class,
@ -219,19 +226,33 @@ public class JDBCExtractionNamespaceTest
for (Map.Entry<String, String> entry : renames.entrySet()) {
String key = entry.getKey();
String val = entry.getValue();
Assert.assertEquals(
"non-null check",
val,
extractionFn.apply(key)
);
Assert.assertEquals("non-null check", Strings.emptyToNull(val), extractionFn.apply(key));
}
Assert.assertEquals(
"null check",
null,
extractionFn.apply("baz")
);
Assert.assertEquals("null check", null, extractionFn.apply("baz"));
}
@Test(timeout = 60_000L)
public void testReverseLookup() throws InterruptedException
{
final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace(
namespace,
derbyConnectorRule.getMetadataConnectorConfig(),
tableName,
keyName,
valName,
tsColumn,
new Period(0)
);
NamespaceExtractionCacheManagersTest.waitFor(extractionCacheManager.schedule(extractionNamespace));
Function<String, List<String>> reverseExtractionFn = reverseFnCache.get(extractionNamespace.getNamespace());
Assert.assertEquals("reverse lookup should match", Sets.newHashSet("foo", "bad"), Sets.newHashSet(reverseExtractionFn.apply("bar")));
Assert.assertEquals("reverse lookup should match", Sets.newHashSet("how about that"), Sets.newHashSet(reverseExtractionFn.apply("foo")));
Assert.assertEquals("reverse lookup should match", Sets.newHashSet("empty string"), Sets.newHashSet(reverseExtractionFn.apply("")));
Assert.assertEquals("null is same as empty string", Sets.newHashSet("empty string"), Sets.newHashSet(reverseExtractionFn.apply(null)));
Assert.assertEquals("reverse lookup of none existing value should be empty list",
Collections.EMPTY_LIST,
reverseExtractionFn.apply("does't exist"));
}
@Test(timeout = 60_000L)
public void testSkipOld()

View File

@ -56,6 +56,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
@ -80,6 +81,7 @@ public class NamespaceExtractionCacheManagerExecutorsTest
private NamespaceExtractionCacheManager manager;
private File tmpFile;
private final ConcurrentMap<String, Function<String, String>> fnCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Function<String, List<String>>> reverseFnCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Object> cacheUpdateAlerts = new ConcurrentHashMap<>();
private final AtomicLong numRuns = new AtomicLong(0L);
@ -116,7 +118,7 @@ public class NamespaceExtractionCacheManagerExecutorsTest
}
};
manager = new OnHeapNamespaceExtractionCacheManager(
lifecycle, fnCache, new NoopServiceEmitter(),
lifecycle, fnCache, reverseFnCache, new NoopServiceEmitter(),
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>of(
URIExtractionNamespace.class,
factory

View File

@ -62,11 +62,13 @@ public class NamespaceExtractionCacheManagersTest
ArrayList<Object[]> params = new ArrayList<>();
ConcurrentMap<String, Function<String, String>> fnMap = new ConcurrentHashMap<String, Function<String, String>>();
ConcurrentMap<String, Function<String, List<String>>> reverserFnMap = new ConcurrentHashMap<String, Function<String, List<String>>>();
params.add(
new Object[]{
new OffHeapNamespaceExtractionCacheManager(
lifecycle,
fnMap,
reverserFnMap,
new NoopServiceEmitter(),
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>of()
), fnMap
@ -77,6 +79,7 @@ public class NamespaceExtractionCacheManagersTest
new OnHeapNamespaceExtractionCacheManager(
lifecycle,
fnMap,
reverserFnMap,
new NoopServiceEmitter(),
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>of()
), fnMap

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.List;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@ -39,6 +40,18 @@ public interface LookupExtractor
*/
@Nullable String apply(@NotNull String key);
/**
* Provide the reverse mapping from a given value to a list of keys
* @param value the value to apply the reverse lookup
* Null and empty are considered to be the same value = nullToEmpty(value)
*
* @return the list of keys that maps to value or empty list.
* Note that for the case of a none existing value in the lookup we have to cases either return an empty list OR list with null element.
* returning an empty list implies that user want to ignore such a lookup value.
* In the other hand returning a list with the null element implies user want to map the none existing value to the key null.
*/
List<String> unApply(String value);
/**
* Create a cache key for use in results caching
* @return A byte array that can be used to uniquely identify if results of a prior lookup can use the cached values

View File

@ -23,15 +23,19 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.StringUtils;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@JsonTypeName("map")
@ -60,6 +64,19 @@ public class MapLookupExtractor implements LookupExtractor
return map.get(val);
}
@Override
public List<String> unApply(final String value)
{
return Lists.newArrayList(Maps.filterKeys(map, new Predicate<String>()
{
@Override public boolean apply(@Nullable String key)
{
return map.get(key).equals(Strings.nullToEmpty(value));
}
}).keySet());
}
@Override
public byte[] getCacheKey()
{

View File

@ -20,20 +20,36 @@
package io.druid.query.extraction;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
public class MapLookupExtractorTest
{
private final MapLookupExtractor fn = new MapLookupExtractor(ImmutableMap.of("foo", "bar"));
private final Map lookupMap = ImmutableMap.of("foo", "bar", "null", "", "empty String", "", "","empty_string");
private final MapLookupExtractor fn = new MapLookupExtractor(lookupMap);
@Test
public void testUnApply()
{
Assert.assertEquals(Arrays.asList("foo"), fn.unApply("bar"));
Assert.assertEquals(Sets.newHashSet("null", "empty String"), Sets.newHashSet(fn.unApply("")));
Assert.assertEquals("Null value should be equal to empty string",
Sets.newHashSet("null", "empty String"),
Sets.newHashSet(fn.unApply(null)));
Assert.assertEquals(Sets.newHashSet(""), Sets.newHashSet(fn.unApply("empty_string")));
Assert.assertEquals("not existing value returns empty list", Collections.EMPTY_LIST, fn.unApply("not There"));
}
@Test
public void testGetMap() throws Exception
{
Assert.assertEquals(ImmutableMap.of("foo", "bar"), fn.getMap());
Assert.assertEquals(lookupMap, fn.getMap());
}
@Test
@ -46,7 +62,7 @@ public class MapLookupExtractorTest
@Test
public void testGetCacheKey() throws Exception
{
final MapLookupExtractor fn2 = new MapLookupExtractor(ImmutableMap.of("foo", "bar"));
final MapLookupExtractor fn2 = new MapLookupExtractor(ImmutableMap.copyOf(lookupMap));
Assert.assertArrayEquals(fn.getCacheKey(), fn2.getCacheKey());
final MapLookupExtractor fn3 = new MapLookupExtractor(ImmutableMap.of("foo2", "bar"));
Assert.assertFalse(Arrays.equals(fn.getCacheKey(), fn3.getCacheKey()));
@ -57,7 +73,7 @@ public class MapLookupExtractorTest
@Test
public void testEquals() throws Exception
{
final MapLookupExtractor fn2 = new MapLookupExtractor(ImmutableMap.of("foo", "bar"));
final MapLookupExtractor fn2 = new MapLookupExtractor(ImmutableMap.copyOf(lookupMap));
Assert.assertEquals(fn, fn2);
final MapLookupExtractor fn3 = new MapLookupExtractor(ImmutableMap.of("foo2", "bar"));
Assert.assertNotEquals(fn, fn3);
@ -68,7 +84,7 @@ public class MapLookupExtractorTest
@Test
public void testHashCode() throws Exception
{
final MapLookupExtractor fn2 = new MapLookupExtractor(ImmutableMap.of("foo", "bar"));
final MapLookupExtractor fn2 = new MapLookupExtractor(ImmutableMap.copyOf(lookupMap));
Assert.assertEquals(fn.hashCode(), fn2.hashCode());
final MapLookupExtractor fn3 = new MapLookupExtractor(ImmutableMap.of("foo2", "bar"));
Assert.assertNotEquals(fn.hashCode(), fn3.hashCode());