mirror of https://github.com/apache/druid.git
fix topn on string columns with non-sorted or non-unique dictionaries (#10053)
* fix topn on string columns with non-sorted or non-unique dictionaries * fix metadata tests * refactor, clarify comments and code, fix ci failures
This commit is contained in:
parent
37e150c075
commit
c2f5d453f8
|
@ -165,7 +165,7 @@ public class TopNTypeInterfaceBenchmark
|
|||
queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf"));
|
||||
queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper"));
|
||||
|
||||
// Use an IdentityExtractionFn to force usage of DimExtractionTopNAlgorithm
|
||||
// Use an IdentityExtractionFn to force usage of HeapBasedTopNAlgorithm
|
||||
TopNQueryBuilder queryBuilderString = new TopNQueryBuilder()
|
||||
.dataSource("blah")
|
||||
.granularity(Granularities.ALL)
|
||||
|
@ -174,7 +174,7 @@ public class TopNTypeInterfaceBenchmark
|
|||
.intervals(intervalSpec)
|
||||
.aggregators(queryAggs);
|
||||
|
||||
// DimExtractionTopNAlgorithm is always used for numeric columns
|
||||
// HeapBasedTopNAlgorithm is always used for numeric columns
|
||||
TopNQueryBuilder queryBuilderLong = new TopNQueryBuilder()
|
||||
.dataSource("blah")
|
||||
.granularity(Granularities.ALL)
|
||||
|
|
|
@ -120,36 +120,34 @@ public class TopNQueryEngine
|
|||
|
||||
|
||||
final TopNAlgorithm<?, ?> topNAlgorithm;
|
||||
if (
|
||||
selector.isHasExtractionFn() &&
|
||||
if (requiresHeapAlgorithm(selector, query, columnCapabilities)) {
|
||||
// heap based algorithm selection
|
||||
if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) {
|
||||
// TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
|
||||
// Once we have arbitrary dimension types following check should be replaced by checking
|
||||
// that the column is of type long and single-value.
|
||||
dimension.equals(ColumnHolder.TIME_COLUMN_NAME)
|
||||
) {
|
||||
// A special TimeExtractionTopNAlgorithm is required, since DimExtractionTopNAlgorithm
|
||||
// currently relies on the dimension cardinality to support lexicographic sorting
|
||||
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
|
||||
} else if (selector.isHasExtractionFn()) {
|
||||
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
|
||||
} else if (columnCapabilities == null || !(columnCapabilities.getType() == ValueType.STRING
|
||||
&& columnCapabilities.isDictionaryEncoded())) {
|
||||
// Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings, and for things we don't know
|
||||
// which can happen for 'inline' data sources when this is run on the broker
|
||||
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
|
||||
} else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
|
||||
// Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be
|
||||
// a many-to-one mapping, since numeric types can't represent all possible values of other types.)
|
||||
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
|
||||
} else if (selector.isAggregateAllMetrics()) {
|
||||
// sorted by dimension
|
||||
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
|
||||
} else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
|
||||
// high cardinality dimensions with larger result sets
|
||||
topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
|
||||
// We might be able to use this for any long column with an extraction function, that is
|
||||
// ValueType.LONG.equals(columnCapabilities.getType())
|
||||
// but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm
|
||||
|
||||
// A special TimeExtractionTopNAlgorithm is required since DimExtractionTopNAlgorithm
|
||||
// currently relies on the dimension cardinality to support lexicographic sorting
|
||||
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
|
||||
} else {
|
||||
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
|
||||
}
|
||||
} else {
|
||||
// anything else
|
||||
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
|
||||
// pool based algorithm selection
|
||||
if (selector.isAggregateAllMetrics()) {
|
||||
// if sorted by dimension we should aggregate all metrics in a single pass, use the regular pooled algorithm for
|
||||
// this
|
||||
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
|
||||
} else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
|
||||
// for high cardinality dimensions with larger result sets we aggregate with only the ordering aggregation to
|
||||
// compute the first 'n' values, and then for the rest of the metrics but for only the 'n' values
|
||||
topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
|
||||
} else {
|
||||
// anything else, use the regular pooled algorithm
|
||||
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
|
||||
}
|
||||
}
|
||||
if (queryMetrics != null) {
|
||||
queryMetrics.algorithm(topNAlgorithm);
|
||||
|
@ -158,6 +156,40 @@ public class TopNQueryEngine
|
|||
return new TopNMapFn(query, topNAlgorithm);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link PooledTopNAlgorithm} (and {@link AggregateTopNMetricFirstAlgorithm} which utilizes the pooled
|
||||
* algorithm) are optimized off-heap algorithms for aggregating dictionary encoded string columns. These algorithms
|
||||
* rely on dictionary ids being unique so to aggregate on the dictionary ids directly and defer
|
||||
* {@link org.apache.druid.segment.DimensionSelector#lookupName(int)} until as late as possible in query processing.
|
||||
*
|
||||
* When these conditions are not true, we have an on-heap fall-back algorithm, the {@link HeapBasedTopNAlgorithm}
|
||||
* (and {@link TimeExtractionTopNAlgorithm} for a specialized form for long columns) which aggregates on values of
|
||||
* selectors.
|
||||
*/
|
||||
private static boolean requiresHeapAlgorithm(
|
||||
final TopNAlgorithmSelector selector,
|
||||
final TopNQuery query,
|
||||
final ColumnCapabilities capabilities
|
||||
)
|
||||
{
|
||||
if (selector.isHasExtractionFn()) {
|
||||
// extraction functions can have a many to one mapping, and should use a heap algorithm
|
||||
return true;
|
||||
}
|
||||
|
||||
if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
|
||||
// non-string output cannot use the pooled algorith, even if the underlying selector supports it
|
||||
return true;
|
||||
}
|
||||
if (capabilities != null && capabilities.getType() == ValueType.STRING) {
|
||||
// string columns must use the on heap algorithm unless they have the following capabilites
|
||||
return !(capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue());
|
||||
} else {
|
||||
// non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean canApplyExtractionInPost(TopNQuery query)
|
||||
{
|
||||
return query.getDimensionSpec() != null
|
||||
|
|
|
@ -59,7 +59,7 @@ public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregates
|
|||
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
|
||||
}
|
||||
|
||||
// This method is used for the DimExtractionTopNAlgorithm only.
|
||||
// This method is used for the HeapBasedTopNAlgorithm only.
|
||||
// Unlike regular topN we cannot rely on ordering to optimize.
|
||||
// Optimization possibly requires a reverse lookup from value to ID, which is
|
||||
// not possible when applying an extraction function
|
||||
|
|
|
@ -25,9 +25,12 @@ import org.apache.druid.collections.CloseableStupidPool;
|
|||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.io.Closer;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.query.DataSource;
|
||||
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
|
||||
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
|
||||
import org.apache.druid.query.DruidProcessingConfig;
|
||||
import org.apache.druid.query.InlineDataSource;
|
||||
import org.apache.druid.query.LookupDataSource;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.QueryRunnerFactory;
|
||||
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
|
||||
|
@ -41,6 +44,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig;
|
|||
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
|
||||
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
|
||||
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
|
||||
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
|
||||
import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
|
||||
import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
|
||||
import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
|
||||
|
@ -61,7 +65,10 @@ import org.apache.druid.query.topn.TopNQueryRunnerFactory;
|
|||
import org.apache.druid.segment.ReferenceCountingSegment;
|
||||
import org.apache.druid.segment.SegmentWrangler;
|
||||
import org.apache.druid.segment.TestHelper;
|
||||
import org.apache.druid.segment.join.InlineJoinableFactory;
|
||||
import org.apache.druid.segment.join.JoinableFactory;
|
||||
import org.apache.druid.segment.join.LookupJoinableFactory;
|
||||
import org.apache.druid.segment.join.MapJoinableFactoryTest;
|
||||
import org.apache.druid.server.initialization.ServerConfig;
|
||||
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
||||
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
|
||||
|
@ -273,4 +280,26 @@ public class QueryStackTests
|
|||
return conglomerate;
|
||||
}
|
||||
|
||||
public static JoinableFactory makeJoinableFactoryForLookup(
|
||||
LookupExtractorFactoryContainerProvider lookupProvider
|
||||
)
|
||||
{
|
||||
return makeJoinableFactoryFromDefault(lookupProvider, null);
|
||||
}
|
||||
|
||||
public static JoinableFactory makeJoinableFactoryFromDefault(
|
||||
@Nullable LookupExtractorFactoryContainerProvider lookupProvider,
|
||||
@Nullable Map<Class<? extends DataSource>, JoinableFactory> custom
|
||||
)
|
||||
{
|
||||
ImmutableMap.Builder<Class<? extends DataSource>, JoinableFactory> builder = ImmutableMap.builder();
|
||||
builder.put(InlineDataSource.class, new InlineJoinableFactory());
|
||||
if (lookupProvider != null) {
|
||||
builder.put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider));
|
||||
}
|
||||
if (custom != null) {
|
||||
builder.putAll(custom);
|
||||
}
|
||||
return MapJoinableFactoryTest.fromMap(builder.build());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -384,6 +384,12 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
final DatabaseMetaData metaData = client.getMetaData();
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
row(
|
||||
Pair.of("TABLE_CAT", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.BROADCAST_DATASOURCE),
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_TYPE", "TABLE")
|
||||
),
|
||||
row(
|
||||
Pair.of("TABLE_CAT", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1),
|
||||
|
@ -441,6 +447,12 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
|
|||
final DatabaseMetaData metaData = superuserClient.getMetaData();
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
row(
|
||||
Pair.of("TABLE_CAT", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.BROADCAST_DATASOURCE),
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_TYPE", "TABLE")
|
||||
),
|
||||
row(
|
||||
Pair.of("TABLE_CAT", "druid"),
|
||||
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1),
|
||||
|
|
|
@ -422,7 +422,10 @@ public class BaseCalciteQueryTest extends CalciteTestBase
|
|||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
|
||||
walker = CalciteTests.createMockWalker(
|
||||
conglomerate,
|
||||
temporaryFolder.newFolder()
|
||||
);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
|
|||
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
|
||||
import org.apache.druid.math.expr.ExprMacroTable;
|
||||
import org.apache.druid.query.Druids;
|
||||
import org.apache.druid.query.GlobalTableDataSource;
|
||||
import org.apache.druid.query.LookupDataSource;
|
||||
import org.apache.druid.query.QueryContexts;
|
||||
import org.apache.druid.query.QueryDataSource;
|
||||
|
@ -712,6 +713,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
+ "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
|
||||
ImmutableList.of(),
|
||||
ImmutableList.<Object[]>builder()
|
||||
.add(new Object[]{"druid", CalciteTests.BROADCAST_DATASOURCE, "TABLE", "YES", "YES"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
|
||||
|
@ -741,6 +743,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
CalciteTests.SUPER_USER_AUTH_RESULT,
|
||||
ImmutableList.of(),
|
||||
ImmutableList.<Object[]>builder()
|
||||
.add(new Object[]{"druid", CalciteTests.BROADCAST_DATASOURCE, "TABLE", "YES", "YES"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
|
||||
.add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
|
||||
|
@ -14997,6 +15000,46 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Parameters(source = QueryContextForJoinProvider.class)
|
||||
public void testTopNOnStringWithNonSortedOrUniqueDictionary(Map<String, Object> queryContext) throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"SELECT druid.broadcast.dim4, COUNT(*)\n"
|
||||
+ "FROM druid.numfoo\n"
|
||||
+ "INNER JOIN druid.broadcast ON numfoo.dim4 = broadcast.dim4\n"
|
||||
+ "GROUP BY 1 ORDER BY 2 LIMIT 4",
|
||||
queryContext,
|
||||
ImmutableList.of(
|
||||
new TopNQueryBuilder()
|
||||
.dataSource(
|
||||
join(
|
||||
new TableDataSource(CalciteTests.DATASOURCE3),
|
||||
new GlobalTableDataSource(CalciteTests.BROADCAST_DATASOURCE),
|
||||
"j0.",
|
||||
equalsCondition(
|
||||
DruidExpression.fromColumn("dim4"),
|
||||
DruidExpression.fromColumn("j0.dim4")
|
||||
),
|
||||
JoinType.INNER
|
||||
|
||||
)
|
||||
)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.dimension(new DefaultDimensionSpec("j0.dim4", "_d0", ValueType.STRING))
|
||||
.threshold(4)
|
||||
.aggregators(aggregators(new CountAggregatorFactory("a0")))
|
||||
.context(queryContext)
|
||||
.metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{"a", 9L},
|
||||
new Object[]{"b", 9L}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a provider of query contexts that should be used by join tests.
|
||||
* It tests various configs that can be passed to join queries. All the configs provided by this provider should
|
||||
|
|
|
@ -50,13 +50,17 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
|
|||
import org.apache.druid.discovery.NodeRole;
|
||||
import org.apache.druid.guice.ExpressionModule;
|
||||
import org.apache.druid.guice.annotations.Json;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.emitter.core.NoopEmitter;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.java.util.http.client.HttpClient;
|
||||
import org.apache.druid.java.util.http.client.Request;
|
||||
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
|
||||
import org.apache.druid.math.expr.ExprMacroTable;
|
||||
import org.apache.druid.query.DataSource;
|
||||
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
|
||||
import org.apache.druid.query.GlobalTableDataSource;
|
||||
import org.apache.druid.query.InlineDataSource;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import org.apache.druid.query.QuerySegmentWalker;
|
||||
|
@ -72,8 +76,14 @@ import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
|
|||
import org.apache.druid.segment.IndexBuilder;
|
||||
import org.apache.druid.segment.QueryableIndex;
|
||||
import org.apache.druid.segment.TestHelper;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import org.apache.druid.segment.join.MapJoinableFactory;
|
||||
import org.apache.druid.segment.join.JoinConditionAnalysis;
|
||||
import org.apache.druid.segment.join.Joinable;
|
||||
import org.apache.druid.segment.join.JoinableFactory;
|
||||
import org.apache.druid.segment.join.table.IndexedTableJoinable;
|
||||
import org.apache.druid.segment.join.table.RowBasedIndexedTable;
|
||||
import org.apache.druid.segment.loading.SegmentLoader;
|
||||
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
|
||||
import org.apache.druid.server.DruidNode;
|
||||
|
@ -125,6 +135,7 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.BooleanSupplier;
|
||||
|
@ -140,6 +151,7 @@ public class CalciteTests
|
|||
public static final String DATASOURCE3 = "numfoo";
|
||||
public static final String DATASOURCE4 = "foo4";
|
||||
public static final String DATASOURCE5 = "lotsocolumns";
|
||||
public static final String BROADCAST_DATASOURCE = "broadcast";
|
||||
public static final String FORBIDDEN_DATASOURCE = "forbiddenDatasource";
|
||||
public static final String SOME_DATASOURCE = "some_datasource";
|
||||
public static final String SOME_DATSOURCE_ESCAPED = "some\\_datasource";
|
||||
|
@ -214,7 +226,7 @@ public class CalciteTests
|
|||
|
||||
private static final String TIMESTAMP_COLUMN = "t";
|
||||
|
||||
private static final Injector INJECTOR = Guice.createInjector(
|
||||
public static final Injector INJECTOR = Guice.createInjector(
|
||||
binder -> {
|
||||
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
|
||||
|
||||
|
@ -605,6 +617,68 @@ public class CalciteTests
|
|||
)
|
||||
);
|
||||
|
||||
private static final InlineDataSource JOINABLE_BACKING_DATA = InlineDataSource.fromIterable(
|
||||
RAW_ROWS1_WITH_NUMERIC_DIMS.stream().map(x -> new Object[]{
|
||||
x.get("dim1"),
|
||||
x.get("dim2"),
|
||||
x.get("dim3"),
|
||||
x.get("dim4"),
|
||||
x.get("dim5"),
|
||||
x.get("d1"),
|
||||
x.get("d2"),
|
||||
x.get("f1"),
|
||||
x.get("f2"),
|
||||
x.get("l1"),
|
||||
x.get("l2")
|
||||
}).collect(Collectors.toList()),
|
||||
RowSignature.builder()
|
||||
.add("dim1", ValueType.STRING)
|
||||
.add("dim2", ValueType.STRING)
|
||||
.add("dim3", ValueType.STRING)
|
||||
.add("dim4", ValueType.STRING)
|
||||
.add("dim5", ValueType.STRING)
|
||||
.add("d1", ValueType.DOUBLE)
|
||||
.add("d2", ValueType.DOUBLE)
|
||||
.add("f1", ValueType.FLOAT)
|
||||
.add("f2", ValueType.FLOAT)
|
||||
.add("l1", ValueType.LONG)
|
||||
.add("l2", ValueType.LONG)
|
||||
.build()
|
||||
);
|
||||
|
||||
private static final Set<String> KEY_COLUMNS = ImmutableSet.of("dim4");
|
||||
|
||||
private static final RowBasedIndexedTable JOINABLE_TABLE = new RowBasedIndexedTable(
|
||||
JOINABLE_BACKING_DATA.getRowsAsList(),
|
||||
JOINABLE_BACKING_DATA.rowAdapter(),
|
||||
JOINABLE_BACKING_DATA.getRowSignature(),
|
||||
KEY_COLUMNS,
|
||||
DateTimes.nowUtc().toString()
|
||||
);
|
||||
|
||||
public static GlobalTableDataSource CUSTOM_TABLE = new GlobalTableDataSource(BROADCAST_DATASOURCE);
|
||||
|
||||
public static JoinableFactory CUSTOM_ROW_TABLE_JOINABLE = new JoinableFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean isDirectlyJoinable(DataSource dataSource)
|
||||
{
|
||||
return CUSTOM_TABLE.equals(dataSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Joinable> build(
|
||||
DataSource dataSource,
|
||||
JoinConditionAnalysis condition
|
||||
)
|
||||
{
|
||||
if (dataSource instanceof GlobalTableDataSource) {
|
||||
return Optional.of(new IndexedTableJoinable(JOINABLE_TABLE));
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
};
|
||||
|
||||
private CalciteTests()
|
||||
{
|
||||
// No instantiation.
|
||||
|
@ -649,12 +723,28 @@ public class CalciteTests
|
|||
return INJECTOR.getInstance(Key.get(ObjectMapper.class, Json.class));
|
||||
}
|
||||
|
||||
public static JoinableFactory createDefaultJoinableFactory()
|
||||
{
|
||||
return QueryStackTests.makeJoinableFactoryFromDefault(
|
||||
INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class),
|
||||
ImmutableMap.of(
|
||||
GlobalTableDataSource.class,
|
||||
CUSTOM_ROW_TABLE_JOINABLE
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
|
||||
final QueryRunnerFactoryConglomerate conglomerate,
|
||||
final File tmpDir
|
||||
)
|
||||
{
|
||||
return createMockWalker(conglomerate, tmpDir, QueryStackTests.DEFAULT_NOOP_SCHEDULER);
|
||||
return createMockWalker(
|
||||
conglomerate,
|
||||
tmpDir,
|
||||
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
|
||||
createDefaultJoinableFactory()
|
||||
);
|
||||
}
|
||||
|
||||
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
|
||||
|
@ -662,6 +752,16 @@ public class CalciteTests
|
|||
final File tmpDir,
|
||||
final QueryScheduler scheduler
|
||||
)
|
||||
{
|
||||
return createMockWalker(conglomerate, tmpDir, scheduler, null);
|
||||
}
|
||||
|
||||
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
|
||||
final QueryRunnerFactoryConglomerate conglomerate,
|
||||
final File tmpDir,
|
||||
final QueryScheduler scheduler,
|
||||
final JoinableFactory joinableFactory
|
||||
)
|
||||
{
|
||||
final QueryableIndex index1 = IndexBuilder
|
||||
.create()
|
||||
|
@ -713,7 +813,7 @@ public class CalciteTests
|
|||
|
||||
final QueryableIndex someDatasourceIndex = IndexBuilder
|
||||
.create()
|
||||
.tmpDir(new File(tmpDir, "1"))
|
||||
.tmpDir(new File(tmpDir, "6"))
|
||||
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
|
||||
.schema(INDEX_SCHEMA)
|
||||
.rows(ROWS1)
|
||||
|
@ -721,7 +821,7 @@ public class CalciteTests
|
|||
|
||||
final QueryableIndex someXDatasourceIndex = IndexBuilder
|
||||
.create()
|
||||
.tmpDir(new File(tmpDir, "1"))
|
||||
.tmpDir(new File(tmpDir, "7"))
|
||||
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
|
||||
.schema(INDEX_SCHEMA_WITH_X_COLUMNS)
|
||||
.rows(RAW_ROWS1_X)
|
||||
|
@ -731,7 +831,7 @@ public class CalciteTests
|
|||
return new SpecificSegmentsQuerySegmentWalker(
|
||||
conglomerate,
|
||||
INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class),
|
||||
null,
|
||||
joinableFactory,
|
||||
scheduler
|
||||
).add(
|
||||
DataSegment.builder()
|
||||
|
@ -805,6 +905,15 @@ public class CalciteTests
|
|||
.size(0)
|
||||
.build(),
|
||||
someXDatasourceIndex
|
||||
).add(
|
||||
DataSegment.builder()
|
||||
.dataSource(BROADCAST_DATASOURCE)
|
||||
.interval(indexNumericDims.getDataInterval())
|
||||
.version("1")
|
||||
.shardSpec(new LinearShardSpec(0))
|
||||
.size(0)
|
||||
.build(),
|
||||
indexNumericDims
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -979,8 +1088,15 @@ public class CalciteTests
|
|||
final DruidSchema schema = new DruidSchema(
|
||||
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
|
||||
new TestServerInventoryView(walker.getSegments()),
|
||||
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
|
||||
new MapJoinableFactory(ImmutableMap.of()),
|
||||
new SegmentManager(EasyMock.createMock(SegmentLoader.class))
|
||||
{
|
||||
@Override
|
||||
public Set<String> getDataSourceNames()
|
||||
{
|
||||
return ImmutableSet.of(BROADCAST_DATASOURCE);
|
||||
}
|
||||
},
|
||||
createDefaultJoinableFactory(),
|
||||
plannerConfig,
|
||||
viewManager,
|
||||
TEST_AUTHENTICATOR_ESCALATOR
|
||||
|
|
|
@ -40,10 +40,7 @@ import org.apache.druid.segment.QueryableIndexSegment;
|
|||
import org.apache.druid.segment.ReferenceCountingSegment;
|
||||
import org.apache.druid.segment.Segment;
|
||||
import org.apache.druid.segment.SegmentWrangler;
|
||||
import org.apache.druid.segment.join.InlineJoinableFactory;
|
||||
import org.apache.druid.segment.join.JoinableFactory;
|
||||
import org.apache.druid.segment.join.LookupJoinableFactory;
|
||||
import org.apache.druid.segment.join.MapJoinableFactoryTest;
|
||||
import org.apache.druid.server.ClientQuerySegmentWalker;
|
||||
import org.apache.druid.server.QueryScheduler;
|
||||
import org.apache.druid.server.QueryStackTests;
|
||||
|
@ -91,12 +88,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
|
|||
final JoinableFactory joinableFactoryToUse;
|
||||
|
||||
if (joinableFactory == null) {
|
||||
joinableFactoryToUse = MapJoinableFactoryTest.fromMap(
|
||||
ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
|
||||
.put(InlineDataSource.class, new InlineJoinableFactory())
|
||||
.put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider))
|
||||
.build()
|
||||
);
|
||||
joinableFactoryToUse = QueryStackTests.makeJoinableFactoryForLookup(lookupProvider);
|
||||
} else {
|
||||
joinableFactoryToUse = joinableFactory;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue