Fix handling of 'join' on top of 'union' datasources. (#10318)

* Fix handling of 'join' on top of 'union' datasources.

The problem is that unions are typically rewritten into a series of
individual queries on the underlying tables, but this isn't done when
the union is wrapped in a join.

The main changes are in UnionQueryRunner:

1) Replace an instanceof UnionQueryRunner check with DataSourceAnalysis.
2) Replace a "query.withDataSource" call with a new function, "Queries.withBaseDataSource".

Together, these enable UnionQueryRunner to "see through" a join.

* Tests.

* Adjust heap sizes for integration tests.

* Different approach, more tests.

* Tweak.

* Styling.
This commit is contained in:
Gian Merlino 2020-08-26 14:23:54 -07:00 committed by GitHub
parent b9ff3483ac
commit 21703d81ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 565 additions and 45 deletions

View File

@ -131,6 +131,9 @@ Union datasources allow you to treat two or more table datasources as a single d
do not need to have identical schemas. If they do not fully match up, then columns that exist in one table but not
another will be treated as if they contained all null values in the tables where they do not exist.
The list of "dataSources" must be nonempty. If you want to query an empty dataset, use an [`inline` datasource](#inline)
instead.
Union datasources are not available in Druid SQL.
Refer to the [Query execution](query-execution.md#union) page for more details on how queries are executed when you

View File

@ -32,17 +32,24 @@ import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.NoopDataSegmentArchiver;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.NoopDataSegmentMover;
import org.apache.druid.segment.loading.NoopDataSegmentPusher;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -135,6 +142,21 @@ public class SingleTaskBackgroundRunnerTest
);
}
@Test
public void testGetQueryRunner() throws ExecutionException, InterruptedException
{
runner.run(new NoopTask(null, null, "foo", 500L, 0, null, null, null)).get().getStatusCode();
final QueryRunner<ScanResultValue> queryRunner =
Druids.newScanQueryBuilder()
.dataSource("foo")
.intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
.build()
.getRunner(runner);
Assert.assertThat(queryRunner, CoreMatchers.instanceOf(SetAndVerifyContextQueryRunner.class));
}
@Test
public void testStop() throws ExecutionException, InterruptedException, TimeoutException
{

View File

@ -21,7 +21,7 @@ DRUID_SERVICE=broker
DRUID_LOG_PATH=/shared/logs/broker.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
SERVICE_DRUID_JAVA_OPTS=-server -Xmx256m -Xms256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
# Druid configs
druid_processing_buffer_sizeBytes=25000000

View File

@ -21,7 +21,7 @@ DRUID_SERVICE=historical
DRUID_LOG_PATH=/shared/logs/historical.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007
SERVICE_DRUID_JAVA_OPTS=-server -Xmx768m -Xms768m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007
# Druid configs
druid_processing_buffer_sizeBytes=25000000

View File

@ -21,7 +21,7 @@ DRUID_SERVICE=historical-for-query-retry-test
DRUID_LOG_PATH=/shared/logs/historical-for-query-retry-test.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5010
SERVICE_DRUID_JAVA_OPTS=-server -Xmx768m -Xms768m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5010
# Druid configs
druid_processing_buffer_sizeBytes=25000000

View File

@ -1933,5 +1933,104 @@
}
}
]
},
{
"description": "groupBy, 1 agg, subquery over join of union to inline",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "query",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "join",
"left": {
"type": "union",
"dataSources": ["wikipedia_editstream", "wikipedia_editstream"]
},
"right": {
"type": "inline",
"columnNames": ["page", "lookupPage"],
"columnTypes": ["string", "string"],
"rows": [
["Wikipedia:Vandalismusmeldung", "inline join!"]
]
},
"rightPrefix": "j.",
"condition": "page == \"j.page\"",
"joinType": "LEFT"
},
"intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
"granularity": "all",
"virtualColumns": [
{
"type": "expression",
"name": "lookupPage",
"expression": "nvl(\"j.lookupPage\", \"page\")",
"outputType": "string"
}
],
"aggregations": [
{
"type": "count",
"name": "rows"
}
],
"dimensions": ["lookupPage"]
}
},
"intervals": ["2013-01-01T00:00:00.000/2013-01-02T00:00:00.000"],
"granularity": "all",
"aggregations": [
{
"type": "longSum",
"name": "rows_outer",
"fieldName": "rows"
}
],
"dimensions": ["lookupPage"],
"limitSpec": {
"type": "default",
"columns": [
{
"dimension": "rows_outer",
"dimensionOrder": "numeric",
"direction": "descending"
}
],
"limit": 3
},
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
},
"expectedResults": [
{
"version": "v1",
"timestamp": "2013-01-01T00:00:00.000Z",
"event": {
"lookupPage": "inline join!",
"rows_outer": 1982
}
},
{
"version": "v1",
"timestamp": "2013-01-01T00:00:00.000Z",
"event": {
"lookupPage": "Wikipedia:Administrators'_noticeboard/Incidents",
"rows_outer": 1980
}
},
{
"version": "v1",
"timestamp": "2013-01-01T00:00:00.000Z",
"event": {
"lookupPage": "Wikipedia:Administrator_intervention_against_vandalism",
"rows_outer": 1600
}
}
]
}
]

View File

@ -74,6 +74,9 @@ public class JoinDataSource implements DataSource
this.joinType = Preconditions.checkNotNull(joinType, "joinType");
}
/**
* Create a join dataSource from a string condition.
*/
@JsonCreator
public static JoinDataSource create(
@JsonProperty("left") DataSource left,
@ -97,6 +100,20 @@ public class JoinDataSource implements DataSource
);
}
/**
* Create a join dataSource from an existing {@link JoinConditionAnalysis}.
*/
public static JoinDataSource create(
final DataSource left,
final DataSource right,
final String rightPrefix,
final JoinConditionAnalysis conditionAnalysis,
final JoinType joinType
)
{
return new JoinDataSource(left, right, rightPrefix, conditionAnalysis, joinType);
}
@Override
public Set<String> getTableNames()
{

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import java.util.Collections;
@ -160,6 +161,7 @@ public class Queries
// Verify preconditions and invariants, just in case.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(retVal.getDataSource());
// Sanity check: query must be based on a single table.
if (!analysis.getBaseTableDataSource().isPresent()) {
throw new ISE("Unable to apply specific segments to non-table-based dataSource[%s]", query.getDataSource());
}
@ -172,4 +174,45 @@ public class Queries
return retVal;
}
/**
* Rewrite "query" to refer to some specific base datasource, instead of the one it currently refers to.
*
* Unlike the seemingly-similar {@link Query#withDataSource}, this will walk down the datasource tree and replace
* only the base datasource (in the sense defined in {@link DataSourceAnalysis}).
*/
public static <T> Query<T> withBaseDataSource(final Query<T> query, final DataSource newBaseDataSource)
{
final Query<T> retVal;
if (query.getDataSource() instanceof QueryDataSource) {
final Query<?> subQuery = ((QueryDataSource) query.getDataSource()).getQuery();
retVal = query.withDataSource(new QueryDataSource(withBaseDataSource(subQuery, newBaseDataSource)));
} else {
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
DataSource current = newBaseDataSource;
for (final PreJoinableClause clause : analysis.getPreJoinableClauses()) {
current = JoinDataSource.create(
current,
clause.getDataSource(),
clause.getPrefix(),
clause.getCondition(),
clause.getJoinType()
);
}
retVal = query.withDataSource(current);
}
// Verify postconditions, just in case.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(retVal.getDataSource());
if (!newBaseDataSource.equals(analysis.getBaseDataSource())) {
throw new ISE("Unable to replace base dataSource");
}
return retVal;
}
}

View File

@ -22,10 +22,10 @@ package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import java.util.List;
import java.util.Set;
@ -39,7 +39,10 @@ public class UnionDataSource implements DataSource
@JsonCreator
public UnionDataSource(@JsonProperty("dataSources") List<TableDataSource> dataSources)
{
Preconditions.checkNotNull(dataSources, "dataSources cannot be null for unionDataSource");
if (dataSources == null || dataSources.isEmpty()) {
throw new ISE("'dataSources' must be non-null and non-empty for 'union'");
}
this.dataSources = dataSources;
}

View File

@ -20,11 +20,14 @@
package org.apache.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
public class UnionQueryRunner<T> implements QueryRunner<T>
{
@ -41,34 +44,51 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
Query<T> query = queryPlus.getQuery();
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
return new MergeSequence<>(
query.getResultOrdering(),
Sequences.simple(
Lists.transform(
((UnionDataSource) dataSource).getDataSources(),
new Function<DataSource, Sequence<T>>()
{
@Override
public Sequence<T> apply(DataSource singleSource)
{
return baseRunner.run(
queryPlus.withQuery(
query.withDataSource(singleSource)
// assign the subqueryId. this will be used to validate that every query servers
// have responded per subquery in RetryQueryRunner
.withDefaultSubQueryId()
),
responseContext
);
}
}
)
)
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (analysis.isConcreteTableBased() && analysis.getBaseUnionDataSource().isPresent()) {
// Union of tables.
final UnionDataSource unionDataSource = analysis.getBaseUnionDataSource().get();
if (unionDataSource.getDataSources().isEmpty()) {
// Shouldn't happen, because UnionDataSource doesn't allow empty unions.
throw new ISE("Unexpectedly received empty union");
} else if (unionDataSource.getDataSources().size() == 1) {
// Single table. Run as a normal query.
return baseRunner.run(
queryPlus.withQuery(
Queries.withBaseDataSource(
query,
Iterables.getOnlyElement(unionDataSource.getDataSources())
)
),
responseContext
);
} else {
// Split up the tables and merge their results.
return new MergeSequence<>(
query.getResultOrdering(),
Sequences.simple(
Lists.transform(
unionDataSource.getDataSources(),
(Function<DataSource, Sequence<T>>) singleSource ->
baseRunner.run(
queryPlus.withQuery(
Queries.withBaseDataSource(query, singleSource)
// assign the subqueryId. this will be used to validate that every query servers
// have responded per subquery in RetryQueryRunner
.withDefaultSubQueryId()
),
responseContext
)
)
)
);
}
} else {
// Not a union of tables. Do nothing special.
return baseRunner.run(queryPlus, responseContext);
}
}

View File

@ -168,7 +168,7 @@ public class DataSourceAnalysis
}
/**
* Returns the base (bottom-leftmost) datasource.
* Returns the base (bottom-leftmost) datasource.
*/
public DataSource getBaseDataSource()
{
@ -176,8 +176,10 @@ public class DataSourceAnalysis
}
/**
* Returns the same datasource as {@link #getBaseDataSource()}, but only if it is a table. Useful on data servers,
* since they generally can only handle queries where the base datasource is a table.
* If {@link #getBaseDataSource()} is a {@link TableDataSource}, returns it. Otherwise, returns an empty Optional.
*
* Note that this can return empty even if {@link #isConcreteTableBased()} is true. This happens if the base
* datasource is a {@link UnionDataSource} of {@link TableDataSource}.
*/
public Optional<TableDataSource> getBaseTableDataSource()
{
@ -188,6 +190,18 @@ public class DataSourceAnalysis
}
}
/**
* If {@link #getBaseDataSource()} is a {@link UnionDataSource}, returns it. Otherwise, returns an empty Optional.
*/
public Optional<UnionDataSource> getBaseUnionDataSource()
{
if (baseDataSource instanceof UnionDataSource) {
return Optional.of((UnionDataSource) baseDataSource);
} else {
return Optional.empty();
}
}
/**
* Returns the bottommost (i.e. innermost) {@link Query} from a possible stack of outer queries at the root of
* the datasource tree. This is the query that will be applied to the base datasource plus any joinables that might
@ -243,8 +257,8 @@ public class DataSourceAnalysis
/**
* Returns true if this datasource is concrete-based (see {@link #isConcreteBased()}, and the base datasource is a
* 'table' or union of them. This is an important property because it corresponds to datasources that can be handled
* by Druid data servers, like Historicals.
* {@link TableDataSource} or a {@link UnionDataSource} composed entirely of {@link TableDataSource}. This is an
* important property, because it corresponds to datasources that can be handled by Druid's distributed query stack.
*/
public boolean isConcreteTableBased()
{

View File

@ -22,6 +22,7 @@ package org.apache.druid.query;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@ -32,6 +33,7 @@ import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.join.JoinType;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -330,4 +332,151 @@ public class QueriesTest
final Query<Result<TimeseriesResultValue>> ignored = Queries.withSpecificSegments(query, descriptors);
}
@Test
public void testWithBaseDataSourceBasic()
{
Assert.assertEquals(
Druids.newTimeseriesQueryBuilder()
.dataSource("bar")
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build(),
Queries.withBaseDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build(),
new TableDataSource("bar")
)
);
}
@Test
public void testWithBaseDataSourceSubQueryStack()
{
Assert.assertEquals(
Druids.newTimeseriesQueryBuilder()
.dataSource(
new QueryDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource(
new QueryDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource("bar")
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build()
)
)
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build()
)
)
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build(),
Queries.withBaseDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource(
new QueryDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource(
new QueryDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build()
)
)
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build()
)
)
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build(),
new TableDataSource("bar")
)
);
}
@Test
public void testWithBaseDataSourceSubQueryStackWithJoinOnUnion()
{
Assert.assertEquals(
Druids.newTimeseriesQueryBuilder()
.dataSource(
new QueryDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource(
new QueryDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource(
JoinDataSource.create(
new TableDataSource("foo"),
new TableDataSource("bar"),
"j0.",
"\"foo.x\" == \"bar.x\"",
JoinType.INNER,
ExprMacroTable.nil()
)
)
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build()
)
)
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build()
)
)
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build(),
Queries.withBaseDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource(
new QueryDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource(
new QueryDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource(
JoinDataSource.create(
new UnionDataSource(
ImmutableList.of(
new TableDataSource("foo"),
new TableDataSource("bar")
)
),
new TableDataSource("bar"),
"j0.",
"\"foo.x\" == \"bar.x\"",
JoinType.INNER,
ExprMacroTable.nil()
)
)
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build()
)
)
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build()
)
)
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build(),
new TableDataSource("foo")
)
);
}
}

View File

@ -52,6 +52,16 @@ public class UnionDataSourceTest
)
);
@Test
public void test_constructor_empty()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("'dataSources' must be non-null and non-empty for 'union'");
//noinspection ResultOfObjectAllocationIgnored
new UnionDataSource(Collections.emptyList());
}
@Test
public void test_getTableNames()
{

View File

@ -69,6 +69,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(TABLE_FOO, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses());
@ -87,6 +88,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(unionDataSource, analysis.getDataSource());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.of(unionDataSource), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses());
@ -105,6 +107,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.of(queryDataSource.getQuery()), analysis.getBaseQuery());
Assert.assertEquals(
Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)),
@ -127,6 +130,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.of(unionDataSource), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.of(queryDataSource.getQuery()), analysis.getBaseQuery());
Assert.assertEquals(
Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)),
@ -147,6 +151,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses());
@ -165,6 +170,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.of(queryDataSource.getQuery()), analysis.getBaseQuery());
Assert.assertEquals(
Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)),
@ -185,6 +191,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(INLINE, analysis.getDataSource());
Assert.assertEquals(INLINE, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses());
@ -223,6 +230,8 @@ public class DataSourceAnalysisTest
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(
@ -273,6 +282,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(
@ -302,6 +312,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(
ImmutableList.of(
new PreJoinableClause("1.", subquery(TABLE_FOO), JoinType.INNER, joinClause("1."))
@ -329,6 +340,7 @@ public class DataSourceAnalysisTest
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.of(unionDataSource), analysis.getBaseUnionDataSource());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
@ -364,6 +376,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(
Optional.of(
subquery(
@ -408,6 +421,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(
@ -437,6 +451,7 @@ public class DataSourceAnalysisTest
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(

View File

@ -307,12 +307,12 @@ public class BrokerServerView implements TimelineServerView
@Override
public Optional<VersionedIntervalTimeline<String, ServerSelector>> getTimeline(final DataSourceAnalysis analysis)
{
final TableDataSource tableDataSource =
final TableDataSource table =
analysis.getBaseTableDataSource()
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
synchronized (lock) {
return Optional.ofNullable(timelines.get(tableDataSource.getName()));
return Optional.ofNullable(timelines.get(table.getName()));
}
}

View File

@ -51,7 +51,6 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
@ -147,12 +146,11 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
{
// We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final Optional<TableDataSource> baseTableDataSource = analysis.getBaseTableDataSource();
if (!baseTableDataSource.isPresent() || !dataSource.equals(baseTableDataSource.get().getName())) {
// Report error, since we somehow got a query for a datasource we can't handle.
throw new ISE("Cannot handle datasource: %s", analysis.getDataSource());
}
// Sanity check: make sure the query is based on the table we're meant to handle.
analysis.getBaseTableDataSource()
.filter(ds -> dataSource.equals(ds.getName()))
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {

View File

@ -32,6 +32,7 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@ -136,7 +137,11 @@ public class SimpleServerView implements TimelineServerView
@Override
public Optional<? extends TimelineLookup<String, ServerSelector>> getTimeline(DataSourceAnalysis analysis)
{
return Optional.ofNullable(timelines.get(analysis.getBaseTableDataSource().get().getName()));
final TableDataSource table =
analysis.getBaseTableDataSource()
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
return Optional.ofNullable(timelines.get(table.getName()));
}
@Override

View File

@ -445,6 +445,39 @@ public class ClientQuerySegmentWalkerTest
Assert.assertEquals(2, scheduler.getTotalReleased().get());
}
@Test
public void testGroupByOnUnionOfOneTable()
{
final GroupByQuery query =
(GroupByQuery) GroupByQuery.builder()
.setDataSource(
new UnionDataSource(ImmutableList.of(new TableDataSource(FOO)))
)
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"))
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build()
.withId(UUID.randomUUID().toString());
testQuery(
query,
ImmutableList.of(
ExpectedQuery.cluster(query.withDataSource(new TableDataSource(FOO)))
),
ImmutableList.of(
new Object[]{"x", 2L},
new Object[]{"y", 1L},
new Object[]{"z", 1L}
)
);
Assert.assertEquals(1, scheduler.getTotalRun().get());
Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
Assert.assertEquals(1, scheduler.getTotalAcquired().get());
Assert.assertEquals(1, scheduler.getTotalReleased().get());
}
@Test
public void testJoinOnGroupByOnTable()
{
@ -505,6 +538,95 @@ public class ClientQuerySegmentWalkerTest
Assert.assertEquals(2, scheduler.getTotalReleased().get());
}
@Test
public void testJoinOnGroupByOnUnionOfTables()
{
final UnionDataSource unionDataSource = new UnionDataSource(
ImmutableList.of(
new TableDataSource(FOO),
new TableDataSource(BAR)
)
);
final GroupByQuery subquery =
GroupByQuery.builder()
.setDataSource(unionDataSource)
.setGranularity(Granularities.ALL)
.setInterval(Collections.singletonList(INTERVAL))
.setDimensions(DefaultDimensionSpec.of("s"))
.setDimFilter(new SelectorDimFilter("s", "y", null))
.build();
final GroupByQuery query =
(GroupByQuery) GroupByQuery.builder()
.setDataSource(
JoinDataSource.create(
unionDataSource,
new QueryDataSource(subquery),
"j.",
"\"j.s\" == \"s\"",
JoinType.INNER,
ExprMacroTable.nil()
)
)
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"), DefaultDimensionSpec.of("j.s"))
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build()
.withId(UUID.randomUUID().toString());
testQuery(
query,
ImmutableList.of(
ExpectedQuery.cluster(
subquery.withDataSource(
subquery.getDataSource().getChildren().get(0)
)
),
ExpectedQuery.cluster(
subquery.withDataSource(
subquery.getDataSource().getChildren().get(1)
)
),
ExpectedQuery.cluster(
query.withDataSource(
query.getDataSource().withChildren(
ImmutableList.of(
unionDataSource.getChildren().get(0),
InlineDataSource.fromIterable(
ImmutableList.of(new Object[]{"y"}),
RowSignature.builder().add("s", ValueType.STRING).build()
)
)
)
)
),
ExpectedQuery.cluster(
query.withDataSource(
query.getDataSource().withChildren(
ImmutableList.of(
unionDataSource.getChildren().get(1),
InlineDataSource.fromIterable(
ImmutableList.of(new Object[]{"y"}),
RowSignature.builder().add("s", ValueType.STRING).build()
)
)
)
)
)
),
ImmutableList.of(new Object[]{"y", "y", 1L})
);
// note: this should really be 1, but in the interim queries that are composed of multiple queries count each
// invocation of either the cluster or local walker in ClientQuerySegmentWalker
Assert.assertEquals(4, scheduler.getTotalRun().get());
Assert.assertEquals(4, scheduler.getTotalPrioritizedAndLaned().get());
Assert.assertEquals(4, scheduler.getTotalAcquired().get());
Assert.assertEquals(4, scheduler.getTotalReleased().get());
}
@Test
public void testGroupByOnScanMultiValue()
{