Add LookupJoinableFactory. (#9281)

* Add LookupJoinableFactory.

Enables joins where the right-hand side is a lookup. Includes an
integration test.

Also, includes changes to LookupExtractorFactoryContainerProvider:

1) Add "getAllLookupNames", which will be needed to eventually connect
   lookups to Druid's SQL catalog.
2) Convert "get" from nullable to Optional return.
3) Swap out most usages of LookupReferencesManager in favor of the
   simpler LookupExtractorFactoryContainerProvider interface.

* Fixes for tests.

* Fix another test.

* Java 11 message fix.

* Fixups.

* Fixup benchmark class.
This commit is contained in:
Gian Merlino 2020-01-30 14:46:21 -08:00 committed by GitHub
parent b856853f09
commit 204ba9966f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 592 additions and 165 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
@ -31,6 +32,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.LookupExprMacro;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.lookup.MapLookupExtractorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
@ -67,6 +69,8 @@ import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@ -190,19 +194,34 @@ public class JoinAndLookupBenchmark
final ExprMacroTable exprMacroTable = new ExprMacroTable(
ImmutableList.of(
new LookupExprMacro(
lookupName -> {
if (LOOKUP_COUNTRY_CODE_TO_NAME.equals(lookupName)) {
return new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(countryCodeToNameMap, false)
);
} else if (LOOKUP_COUNTRY_NUMBER_TO_NAME.equals(lookupName)) {
return new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(countryNumberToNameMap, false)
);
} else {
return null;
new LookupExtractorFactoryContainerProvider()
{
@Override
public Set<String> getAllLookupNames()
{
return ImmutableSet.of(LOOKUP_COUNTRY_CODE_TO_NAME, LOOKUP_COUNTRY_NUMBER_TO_NAME);
}
@Override
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
if (LOOKUP_COUNTRY_CODE_TO_NAME.equals(lookupName)) {
return Optional.of(
new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(countryCodeToNameMap, false)
)
);
} else if (LOOKUP_COUNTRY_NUMBER_TO_NAME.equals(lookupName)) {
return Optional.of(
new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(countryNumberToNameMap, false)
)
);
} else {
return Optional.empty();
}
}
}
)

View File

@ -101,7 +101,7 @@ public class BloomFilterSqlAggregatorTest extends InitializedNullHandlingTest
binder -> {
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(
LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
LookupEnabledTestExprMacroTable.createTestLookupProvider(
ImmutableMap.of(
"a", "xa",
"abc", "xabc"

View File

@ -68,7 +68,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
binder -> {
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(
LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
LookupEnabledTestExprMacroTable.createTestLookupProvider(
ImmutableMap.of(
"a", "xa",
"abc", "xabc"

View File

@ -1480,6 +1480,66 @@
}
]
},
{
"description": "topN, 1 agg, join to lookup",
"query": {
"queryType": "topN",
"dataSource": {
"type": "join",
"left": "wikipedia_editstream",
"right": {
"type": "lookup",
"lookup": "wiki-simple"
},
"rightPrefix": "j.",
"condition": "page == \"j.k\"",
"joinType": "LEFT"
},
"intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
"granularity": "all",
"virtualColumns": [
{
"type": "expression",
"name": "lookupPage",
"expression": "nvl(\"j.v\", \"page\")",
"outputType": "string"
}
],
"aggregations": [
{
"type": "count",
"name": "rows"
}
],
"dimension": "lookupPage",
"metric": "rows",
"threshold": 3,
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
},
"expectedResults": [
{
"timestamp": "2013-01-01T00:00:00.000Z",
"result": [
{
"lookupPage": "lookup!",
"rows": 991
},
{
"lookupPage": "Wikipedia:Administrators'_noticeboard/Incidents",
"rows": 990
},
{
"lookupPage": "Wikipedia:Administrator_intervention_against_vandalism",
"rows": 800
}
]
}
]
},
{
"description": "topN, 1 agg, join to inline",
"query": {

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.DimFilterUtils;
@ -136,13 +137,17 @@ public class LookupDimensionSpec implements DimensionSpec
@Override
public ExtractionFn getExtractionFn()
{
final LookupExtractor lookupExtractor = Strings.isNullOrEmpty(name)
? this.lookup
: Preconditions.checkNotNull(
lookupExtractorFactoryContainerProvider.get(name),
"Lookup [%s] not found",
name
).getLookupExtractorFactory().get();
final LookupExtractor lookupExtractor;
if (Strings.isNullOrEmpty(name)) {
lookupExtractor = this.lookup;
} else {
lookupExtractor = lookupExtractorFactoryContainerProvider
.get(name)
.orElseThrow(() -> new ISE("Lookup [%s] not found", name))
.getLookupExtractorFactory()
.get();
}
return new LookupExtractionFn(
lookupExtractor,

View File

@ -19,14 +19,25 @@
package org.apache.druid.query.lookup;
import javax.annotation.Nullable;
import java.util.Optional;
import java.util.Set;
/**
* Provides {@link LookupExtractorFactoryContainer} to query and indexing time dimension transformations.
*
* The most important production implementation is LookupReferencesManager.
*/
@FunctionalInterface
public interface LookupExtractorFactoryContainerProvider
{
@Nullable
LookupExtractorFactoryContainer get(String lookupName);
/**
* Returns the set of all lookup names that {@link #get} can return containers for. Note that because the underlying
* set of valid lookups might change over time, it is not guaranteed that calling {@link #get} on the results will
* actually yield a container (it might have been removed).
*/
Set<String> getAllLookupNames();
/**
* Returns a lookup container for the provided lookupName, if it exists.
*/
Optional<LookupExtractorFactoryContainer> get(String lookupName);
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.ExtractionFn;
@ -145,11 +146,11 @@ public class RegisteredLookupExtractionFn implements ExtractionFn
// http://www.javamex.com/tutorials/double_checked_locking.shtml
synchronized (delegateLock) {
if (null == delegate) {
final LookupExtractor factory = Preconditions.checkNotNull(
manager.get(getLookup()),
"Lookup [%s] not found",
getLookup()
).getLookupExtractorFactory().get();
final LookupExtractor factory =
manager.get(getLookup())
.orElseThrow(() -> new ISE("Lookup [%s] not found", getLookup()))
.getLookupExtractorFactory()
.get();
delegate = new LookupExtractionFn(
factory,

View File

@ -26,8 +26,10 @@ import com.google.inject.Scopes;
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory;
import java.util.Map;
@ -41,7 +43,10 @@ public class JoinableFactoryModule implements Module
* Default mappings of datasources to factories.
*/
private static final Map<Class<? extends DataSource>, Class<? extends JoinableFactory>> FACTORY_MAPPINGS =
ImmutableMap.of(InlineDataSource.class, InlineJoinableFactory.class);
ImmutableMap.of(
InlineDataSource.class, InlineJoinableFactory.class,
LookupDataSource.class, LookupJoinableFactory.class
);
@Override
public void configure(Binder binder)

View File

@ -28,6 +28,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import java.util.Optional;
@Path("/druid/v1/lookups/introspect")
@ResourceFilters(ConfigResourceFilter.class)
@ -48,12 +49,15 @@ public class LookupIntrospectionResource
@Path("/{lookupId}")
public Object introspectLookup(@PathParam("lookupId") final String lookupId)
{
final LookupExtractorFactoryContainer container = lookupExtractorFactoryContainerProvider.get(lookupId);
final Optional<LookupExtractorFactoryContainer> maybeContainer =
lookupExtractorFactoryContainerProvider.get(lookupId);
if (container == null) {
if (!maybeContainer.isPresent()) {
LOGGER.error("trying to introspect non existing lookup [%s]", lookupId);
return Response.status(Response.Status.NOT_FOUND).build();
}
final LookupExtractorFactoryContainer container = maybeContainer.get();
LookupIntrospectHandler introspectHandler = container.getLookupExtractorFactory().getIntrospectHandler();
if (introspectHandler != null) {
return introspectHandler;

View File

@ -55,6 +55,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
@ -295,11 +296,16 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
}
@Override
@Nullable
public LookupExtractorFactoryContainer get(String lookupName)
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
return stateRef.get().lookupMap.get(lookupName);
return Optional.ofNullable(stateRef.get().lookupMap.get(lookupName));
}
@Override
public Set<String> getAllLookupNames()
{
return stateRef.get().lookupMap.keySet();
}
// Note that this should ensure that "toLoad" and "toDrop" are disjoint.

View File

@ -30,8 +30,10 @@ import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.dimension.LookupDimensionSpec;
import org.apache.druid.query.expression.LookupExprMacro;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* Variant of {@link LookupModule} that only supports serde of {@link org.apache.druid.query.Query} objects, to allow
@ -66,11 +68,16 @@ public class LookupSerdeModule implements DruidModule
*/
private static class NoopLookupExtractorFactoryContainerProvider implements LookupExtractorFactoryContainerProvider
{
@Nullable
@Override
public LookupExtractorFactoryContainer get(String lookupName)
public Set<String> getAllLookupNames()
{
return null;
return Collections.emptySet();
}
@Override
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
return Optional.empty();
}
}
}

View File

@ -30,14 +30,17 @@ import java.util.Set;
/**
* A {@link JoinableFactory} for {@link InlineDataSource}. It works by building an {@link IndexedTable}.
*
* It is not valid to pass any other DataSource type to the "build" method.
*/
public class InlineJoinableFactory implements JoinableFactory
{
@Override
public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
{
if (condition.canHashJoin() && dataSource instanceof InlineDataSource) {
final InlineDataSource inlineDataSource = (InlineDataSource) dataSource;
final InlineDataSource inlineDataSource = (InlineDataSource) dataSource;
if (condition.canHashJoin()) {
final Set<String> rightKeyColumns = condition.getRightEquiConditionKeys();
return Optional.of(

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import com.google.inject.Inject;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import java.util.Optional;
/**
* A {@link JoinableFactory} for {@link LookupDataSource}.
*
* It is not valid to pass any other DataSource type to the "build" method.
*/
public class LookupJoinableFactory implements JoinableFactory
{
private final LookupExtractorFactoryContainerProvider lookupProvider;
@Inject
public LookupJoinableFactory(LookupExtractorFactoryContainerProvider lookupProvider)
{
this.lookupProvider = lookupProvider;
}
@Override
public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
{
final LookupDataSource lookupDataSource = (LookupDataSource) dataSource;
if (condition.canHashJoin()) {
final String lookupName = lookupDataSource.getLookupName();
return lookupProvider.get(lookupName)
.map(c -> LookupJoinable.wrap(c.getLookupExtractorFactory().get()));
} else {
return Optional.empty();
}
}
}

View File

@ -28,6 +28,8 @@ import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory;
@ -37,6 +39,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.Map;
public class JoinableFactoryModuleTest
@ -63,7 +66,7 @@ public class JoinableFactoryModuleTest
{
Map<Class<? extends DataSource>, JoinableFactory> joinableFactories =
injector.getInstance(Key.get(new TypeLiteral<Map<Class<? extends DataSource>, JoinableFactory>>() {}));
Assert.assertEquals(1, joinableFactories.size());
Assert.assertEquals(2, joinableFactories.size());
Assert.assertEquals(InlineJoinableFactory.class, joinableFactories.get(InlineDataSource.class).getClass());
}
@ -75,23 +78,25 @@ public class JoinableFactoryModuleTest
.joinableFactoryBinder(binder).addBinding(NoopDataSource.class).toInstance(NoopJoinableFactory.INSTANCE));
Map<Class<? extends DataSource>, JoinableFactory> joinableFactories =
injector.getInstance(Key.get(new TypeLiteral<Map<Class<? extends DataSource>, JoinableFactory>>() {}));
Assert.assertEquals(2, joinableFactories.size());
Assert.assertEquals(3, joinableFactories.size());
Assert.assertEquals(NoopJoinableFactory.INSTANCE, joinableFactories.get(NoopDataSource.class));
}
private Injector makeInjectorWithProperties(Module... otherModules)
{
ImmutableList.Builder<Module> modulesBuilder =
final LookupExtractorFactoryContainerProvider lookupProvider =
LookupEnabledTestExprMacroTable.createTestLookupProvider(Collections.emptyMap());
final ImmutableList.Builder<Module> modulesBuilder =
ImmutableList.<Module>builder()
.add(new JoinableFactoryModule())
.add(binder -> {
binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
});
for (Module otherModule : otherModules) {
.add(new JoinableFactoryModule())
.add(binder -> binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(lookupProvider))
.add(binder -> binder.bindScope(LazySingleton.class, Scopes.SINGLETON));
for (final Module otherModule : otherModules) {
modulesBuilder.add(otherModule);
}
return Guice.createInjector(
modulesBuilder.build()
);
return Guice.createInjector(modulesBuilder.build());
}
}

View File

@ -42,6 +42,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
@RunWith(JUnitParamsRunner.class)
public class LookupDimensionSpecTest
@ -55,7 +56,9 @@ public class LookupDimensionSpecTest
static {
EasyMock
.expect(LOOKUP_REF_MANAGER.get(EasyMock.eq("lookupName")))
.andReturn(new LookupExtractorFactoryContainer("v0", new MapLookupExtractorFactory(STRING_MAP, false)))
.andReturn(
Optional.of(new LookupExtractorFactoryContainer("v0", new MapLookupExtractorFactory(STRING_MAP, false)))
)
.anyTimes();
EasyMock.replay(LOOKUP_REF_MANAGER);
}
@ -75,14 +78,26 @@ public class LookupDimensionSpecTest
LOOKUP_REF_MANAGER
);
String serLookup = mapper.writeValueAsString(lookupDimSpec);
Assert.assertEquals(lookupDimSpec, mapper.readerFor(DimensionSpec.class).with(injectableValues).readValue(serLookup));
Assert.assertEquals(
lookupDimSpec,
mapper.readerFor(DimensionSpec.class).with(injectableValues).readValue(serLookup)
);
}
private Object[] parametersForTestSerDesr()
{
return new Object[]{
new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, true, null, null, true, null),
new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, "Missing_value", null, true, null),
new LookupDimensionSpec(
"dimName",
"outputName",
MAP_LOOKUP_EXTRACTOR,
false,
"Missing_value",
null,
true,
null
),
new LookupDimensionSpec("dimName", "outputName", MAP_LOOKUP_EXTRACTOR, false, null, null, true, null),
new LookupDimensionSpec("dimName", "outputName", null, false, null, "name", true, LOOKUP_REF_MANAGER)
};

View File

@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.Parser;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@ -51,6 +52,12 @@ public class ExprMacroTest
.build()
);
@BeforeClass
public static void setUpClass()
{
NullHandling.initializeForTests();
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@ -75,7 +82,7 @@ public class ExprMacroTest
@Test
public void testLookupNotFound()
{
expectedException.expect(NullPointerException.class);
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Lookup [lookylook] not found");
assertExpr("lookup(x, 'lookylook')", null);
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.expression;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.math.expr.ExprMacroTable;
@ -29,10 +30,12 @@ import org.apache.druid.query.lookup.LookupExtractorFactory;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.lookup.LookupIntrospectHandler;
import org.easymock.EasyMock;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* Separate from {@link TestExprMacroTable} since that one is in druid-processing, which doesn't have
@ -41,6 +44,7 @@ import java.util.Collections;
public class LookupEnabledTestExprMacroTable extends ExprMacroTable
{
public static final ExprMacroTable INSTANCE = new LookupEnabledTestExprMacroTable();
private static final String LOOKYLOO = "lookyloo";
private LookupEnabledTestExprMacroTable()
{
@ -49,7 +53,7 @@ public class LookupEnabledTestExprMacroTable extends ExprMacroTable
Iterables.concat(
TestExprMacroTable.INSTANCE.getMacros(),
Collections.singletonList(
new LookupExprMacro(createTestLookupReferencesManager(ImmutableMap.of("foo", "xfoo")))
new LookupExprMacro(createTestLookupProvider(ImmutableMap.of("foo", "xfoo")))
)
)
)
@ -57,53 +61,64 @@ public class LookupEnabledTestExprMacroTable extends ExprMacroTable
}
/**
* Returns a mock {@link LookupExtractorFactoryContainerProvider} that has one lookup, "lookyloo".
* Returns a {@link LookupExtractorFactoryContainerProvider} that has one lookup, "lookyloo". Public so other tests
* can use this helper method directly.
*/
public static LookupExtractorFactoryContainerProvider createTestLookupReferencesManager(
final ImmutableMap<String, String> theLookup
)
public static LookupExtractorFactoryContainerProvider createTestLookupProvider(final Map<String, String> theLookup)
{
final LookupExtractorFactoryContainerProvider lookupExtractorFactoryContainerProvider =
EasyMock.createMock(LookupExtractorFactoryContainerProvider.class);
EasyMock.expect(lookupExtractorFactoryContainerProvider.get(EasyMock.eq("lookyloo"))).andReturn(
new LookupExtractorFactoryContainer(
"v0",
new LookupExtractorFactory()
{
@Override
public boolean start()
{
throw new UnsupportedOperationException();
}
final LookupExtractorFactoryContainer container = new LookupExtractorFactoryContainer(
"v0",
new LookupExtractorFactory()
{
@Override
public boolean start()
{
throw new UnsupportedOperationException();
}
@Override
public boolean close()
{
throw new UnsupportedOperationException();
}
@Override
public boolean close()
{
throw new UnsupportedOperationException();
}
@Override
public boolean replaces(@Nullable final LookupExtractorFactory other)
{
throw new UnsupportedOperationException();
}
@Override
public boolean replaces(@Nullable final LookupExtractorFactory other)
{
throw new UnsupportedOperationException();
}
@Override
public LookupIntrospectHandler getIntrospectHandler()
{
throw new UnsupportedOperationException();
}
@Override
public LookupIntrospectHandler getIntrospectHandler()
{
throw new UnsupportedOperationException();
}
@Override
public LookupExtractor get()
{
return new MapLookupExtractor(theLookup, false);
}
}
)
).anyTimes();
EasyMock.expect(lookupExtractorFactoryContainerProvider.get(EasyMock.not(EasyMock.eq("lookyloo")))).andReturn(null).anyTimes();
EasyMock.replay(lookupExtractorFactoryContainerProvider);
return lookupExtractorFactoryContainerProvider;
@Override
public LookupExtractor get()
{
return new MapLookupExtractor(theLookup, false);
}
}
);
return new LookupExtractorFactoryContainerProvider()
{
@Override
public Set<String> getAllLookupNames()
{
return ImmutableSet.of(LOOKYLOO);
}
@Override
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
if (LOOKYLOO.equals(lookupName)) {
return Optional.of(container);
} else {
return Optional.empty();
}
}
};
}
}

View File

@ -36,6 +36,7 @@ import org.junit.Test;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.net.URI;
import java.util.Optional;
public class LookupIntrospectionResourceTest
{
@ -65,19 +66,25 @@ public class LookupIntrospectionResourceTest
EasyMock.reset(mockLookupExtractorFactory);
EasyMock.reset(mockLookupIntrospectHandler);
EasyMock.expect(mockLookupExtractorFactoryContainerProvider.get("lookupId")).andReturn(
new LookupExtractorFactoryContainer(
"v0",
mockLookupExtractorFactory
Optional.of(
new LookupExtractorFactoryContainer(
"v0",
mockLookupExtractorFactory
)
)
).anyTimes();
EasyMock.expect(mockLookupExtractorFactoryContainerProvider.get("lookupId1")).andReturn(
new LookupExtractorFactoryContainer(
"v0",
actualLookupExtractorFactory
Optional.of(
new LookupExtractorFactoryContainer(
"v0",
actualLookupExtractorFactory
)
)
).anyTimes();
EasyMock.expect(mockLookupExtractorFactoryContainerProvider.get(EasyMock.anyString())).andReturn(null).anyTimes();
EasyMock.expect(mockLookupExtractorFactoryContainerProvider.get(EasyMock.anyString()))
.andReturn(Optional.empty())
.anyTimes();
EasyMock.replay(mockLookupExtractorFactoryContainerProvider);
baseUri = WebserverTestUtils.createBaseUri();
@ -124,7 +131,8 @@ public class LookupIntrospectionResourceTest
);
}
@Test public void testExistingLookup()
@Test
public void testExistingLookup()
{
EasyMock.expect(mockLookupExtractorFactory.getIntrospectHandler()).andReturn(mockLookupIntrospectHandler);
EasyMock.expect(mockLookupExtractorFactory.get())

View File

@ -21,6 +21,7 @@ package org.apache.druid.query.lookup;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -43,6 +44,7 @@ import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
public class LookupReferencesManagerTest
@ -183,19 +185,19 @@ public class LookupReferencesManagerTest
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
Assert.assertNull(lookupReferencesManager.get("test"));
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
LookupExtractorFactoryContainer testContainer = new LookupExtractorFactoryContainer("0", lookupExtractorFactory);
lookupReferencesManager.add("test", testContainer);
lookupReferencesManager.handlePendingNotices();
Assert.assertEquals(testContainer, lookupReferencesManager.get("test"));
Assert.assertEquals(Optional.of(testContainer), lookupReferencesManager.get("test"));
lookupReferencesManager.remove("test");
lookupReferencesManager.handlePendingNotices();
Assert.assertNull(lookupReferencesManager.get("test"));
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
}
@Test
@ -289,7 +291,7 @@ public class LookupReferencesManagerTest
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
Assert.assertNull(lookupReferencesManager.get("notThere"));
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("notThere"));
}
@Test
@ -394,6 +396,46 @@ public class LookupReferencesManagerTest
lookupReferencesManager.handlePendingNotices();
}
@Test
public void testGetAllLookupNames() throws Exception
{
LookupExtractorFactoryContainer container1 = new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(ImmutableMap.of("key1", "value1"), true)
);
LookupExtractorFactoryContainer container2 = new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(ImmutableMap.of("key2", "value2"), true)
);
Map<String, Object> lookupMap = new HashMap<>();
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.replay(config);
EasyMock.expect(
druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")
).andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
lookupReferencesManager.add("one", container1);
lookupReferencesManager.add("two", container2);
lookupReferencesManager.handlePendingNotices();
Assert.assertEquals(ImmutableSet.of("one", "two"), lookupReferencesManager.getAllLookupNames());
Assert.assertEquals(
ImmutableSet.of("one", "two"),
((LookupExtractorFactoryContainerProvider) lookupReferencesManager).getAllLookupNames()
);
}
@Test
public void testGetAllLookupsState() throws Exception
{
@ -495,21 +537,31 @@ public class LookupReferencesManagerTest
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
EasyMock.replay(lookupExtractorFactory);
Assert.assertNull(lookupReferencesManager.get("test"));
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
LookupExtractorFactoryContainer testContainer = new LookupExtractorFactoryContainer("0", lookupExtractorFactory);
lookupReferencesManager.add("test", testContainer);
while (!testContainer.equals(lookupReferencesManager.get("test"))) {
while (!Optional.of(testContainer).equals(lookupReferencesManager.get("test"))) {
Thread.sleep(100);
}
Assert.assertEquals(
ImmutableSet.of("test", "testMockForRealModeWithMainThread"),
lookupReferencesManager.getAllLookupNames()
);
lookupReferencesManager.remove("test");
while (lookupReferencesManager.get("test") != null) {
while (lookupReferencesManager.get("test").isPresent()) {
Thread.sleep(100);
}
Assert.assertEquals(
ImmutableSet.of("testMockForRealModeWithMainThread"),
lookupReferencesManager.getAllLookupNames()
);
lookupReferencesManager.stop();
Assert.assertFalse(lookupReferencesManager.mainThread.isAlive());
@ -569,9 +621,9 @@ public class LookupReferencesManagerTest
EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
Assert.assertEquals(container1, lookupReferencesManager.get("testLookup1"));
Assert.assertEquals(container2, lookupReferencesManager.get("testLookup2"));
Assert.assertEquals(container3, lookupReferencesManager.get("testLookup3"));
Assert.assertEquals(Optional.of(container1), lookupReferencesManager.get("testLookup1"));
Assert.assertEquals(Optional.of(container2), lookupReferencesManager.get("testLookup2"));
Assert.assertEquals(Optional.of(container3), lookupReferencesManager.get("testLookup3"));
}
@ -638,7 +690,10 @@ public class LookupReferencesManagerTest
EasyMock.expect(druidLeaderClient.go(request)).andThrow(new IllegalStateException()).anyTimes();
EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
Assert.assertEquals(container, lookupReferencesManager.get("testMockForLoadLookupOnCoordinatorFailure"));
Assert.assertEquals(
Optional.of(container),
lookupReferencesManager.get("testMockForLoadLookupOnCoordinatorFailure")
);
}
@Test
@ -677,6 +732,6 @@ public class LookupReferencesManagerTest
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
lookupReferencesManager.start();
Assert.assertNull(lookupReferencesManager.get("testMockForDisableLookupSync"));
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("testMockForDisableLookupSync"));
}
}

View File

@ -34,6 +34,7 @@ import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
public class RegisteredLookupExtractionFnTest
{
@ -96,7 +97,7 @@ public class RegisteredLookupExtractionFnTest
public void testMissingDelegation()
{
final LookupExtractorFactoryContainerProvider manager = EasyMock.createStrictMock(LookupReferencesManager.class);
EasyMock.expect(manager.get(EasyMock.eq(LOOKUP_NAME))).andReturn(null).once();
EasyMock.expect(manager.get(EasyMock.eq(LOOKUP_NAME))).andReturn(Optional.empty()).once();
EasyMock.replay(manager);
expectedException.expectMessage("Lookup [some lookup] not found");
@ -147,7 +148,10 @@ public class RegisteredLookupExtractionFnTest
);
EasyMock.verify(manager);
final Map<String, Object> result = mapper.readValue(mapper.writeValueAsString(fn), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
final Map<String, Object> result = mapper.readValue(
mapper.writeValueAsString(fn),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
Assert.assertEquals(mapper.convertValue(fn, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT), result);
Assert.assertEquals(LOOKUP_NAME, result.get("lookup"));
Assert.assertEquals(true, result.get("retainMissingValue"));
@ -248,41 +252,43 @@ public class RegisteredLookupExtractionFnTest
private void managerReturnsMap(LookupExtractorFactoryContainerProvider manager)
{
EasyMock.expect(manager.get(EasyMock.eq(LOOKUP_NAME))).andReturn(
new LookupExtractorFactoryContainer(
"v0",
new LookupExtractorFactory()
{
@Override
public boolean start()
{
return false;
}
Optional.of(
new LookupExtractorFactoryContainer(
"v0",
new LookupExtractorFactory()
{
@Override
public boolean start()
{
return false;
}
@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
return false;
}
@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
return false;
}
@Override
public boolean close()
{
return false;
}
@Override
public boolean close()
{
return false;
}
@Nullable
@Override
public LookupIntrospectHandler getIntrospectHandler()
{
return null;
}
@Nullable
@Override
public LookupIntrospectHandler getIntrospectHandler()
{
return null;
}
@Override
public LookupExtractor get()
{
return LOOKUP_EXTRACTOR;
}
}
@Override
public LookupExtractor get()
{
return LOOKUP_EXTRACTOR;
}
}
)
)
).anyTimes();
}

View File

@ -27,7 +27,9 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Optional;
@ -35,6 +37,9 @@ public class InlineJoinableFactoryTest
{
private static final String PREFIX = "j.";
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final InlineJoinableFactory factory = new InlineJoinableFactory();
private final InlineDataSource inlineDataSource = InlineDataSource.fromIterable(
@ -49,10 +54,10 @@ public class InlineJoinableFactoryTest
@Test
public void testBuildNonInline()
{
Assert.assertEquals(
Optional.empty(),
factory.build(new TableDataSource("foo"), makeCondition("x == \"j.y\""))
);
expectedException.expect(ClassCastException.class);
expectedException.expectMessage("TableDataSource cannot be cast");
final Optional<Joinable> ignored = factory.build(new TableDataSource("foo"), makeCondition("x == \"j.y\""));
}
@Test

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.lookup.MapLookupExtractorFactory;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
public class LookupJoinableFactoryTest
{
private static final String PREFIX = "j.";
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final LookupJoinableFactory factory;
private final LookupDataSource lookupDataSource = new LookupDataSource("country_code_to_name");
public LookupJoinableFactoryTest()
{
try {
final MapLookupExtractor countryIsoCodeToNameLookup = JoinTestHelper.createCountryIsoCodeToNameLookup();
this.factory = new LookupJoinableFactory(
new LookupExtractorFactoryContainerProvider()
{
@Override
public Set<String> getAllLookupNames()
{
return ImmutableSet.of(lookupDataSource.getLookupName());
}
@Override
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
if (lookupDataSource.getLookupName().equals(lookupName)) {
return Optional.of(
new LookupExtractorFactoryContainer(
"v0",
new MapLookupExtractorFactory(
countryIsoCodeToNameLookup.getMap(),
false
)
)
);
} else {
return Optional.empty();
}
}
}
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
public void testBuildNonLookup()
{
expectedException.expect(ClassCastException.class);
expectedException.expectMessage("TableDataSource cannot be cast");
final Optional<Joinable> ignored = factory.build(new TableDataSource("foo"), makeCondition("x == \"j.k\""));
}
@Test
public void testBuildNonHashJoin()
{
Assert.assertEquals(
Optional.empty(),
factory.build(lookupDataSource, makeCondition("x > \"j.k\""))
);
}
@Test
public void testBuildDifferentLookup()
{
Assert.assertEquals(
Optional.empty(),
factory.build(new LookupDataSource("beep"), makeCondition("x == \"j.k\""))
);
}
@Test
public void testBuild()
{
final Joinable joinable = factory.build(lookupDataSource, makeCondition("x == \"j.k\"")).get();
Assert.assertThat(joinable, CoreMatchers.instanceOf(LookupJoinable.class));
Assert.assertEquals(ImmutableList.of("k", "v"), joinable.getAvailableColumns());
Assert.assertEquals(Joinable.CARDINALITY_UNKNOWN, joinable.getCardinality("k"));
Assert.assertEquals(Joinable.CARDINALITY_UNKNOWN, joinable.getCardinality("v"));
}
private static JoinConditionAnalysis makeCondition(final String condition)
{
return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil());
}
}

View File

@ -222,15 +222,15 @@ public class CalciteTests
// This Module is just to get a LookupExtractorFactoryContainerProvider with a usable "lookyloo" lookup.
binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(
LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
final LookupExtractorFactoryContainerProvider lookupProvider =
LookupEnabledTestExprMacroTable.createTestLookupProvider(
ImmutableMap.of(
"a", "xa",
"abc", "xabc"
"abc", "xabc",
"nosuchkey", "mysteryvalue"
)
)
);
);
binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(lookupProvider);
}
);