From 21703d81ac4aabb8c2dfc66cacd18a42b3c38fbd Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 26 Aug 2020 14:23:54 -0700 Subject: [PATCH] Fix handling of 'join' on top of 'union' datasources. (#10318) * Fix handling of 'join' on top of 'union' datasources. The problem is that unions are typically rewritten into a series of individual queries on the underlying tables, but this isn't done when the union is wrapped in a join. The main changes are in UnionQueryRunner: 1) Replace an instanceof UnionQueryRunner check with DataSourceAnalysis. 2) Replace a "query.withDataSource" call with a new function, "Queries.withBaseDataSource". Together, these enable UnionQueryRunner to "see through" a join. * Tests. * Adjust heap sizes for integration tests. * Different approach, more tests. * Tweak. * Styling. --- docs/querying/datasource.md | 3 + .../SingleTaskBackgroundRunnerTest.java | 22 +++ .../docker/environment-configs/broker | 2 +- .../docker/environment-configs/historical | 2 +- .../historical-for-query-retry-test | 2 +- .../queries/wikipedia_editstream_queries.json | 99 ++++++++++++ .../apache/druid/query/JoinDataSource.java | 17 ++ .../java/org/apache/druid/query/Queries.java | 43 +++++ .../apache/druid/query/UnionDataSource.java | 7 +- .../apache/druid/query/UnionQueryRunner.java | 72 ++++++--- .../query/planning/DataSourceAnalysis.java | 24 ++- .../org/apache/druid/query/QueriesTest.java | 149 ++++++++++++++++++ .../druid/query/UnionDataSourceTest.java | 10 ++ .../planning/DataSourceAnalysisTest.java | 15 ++ .../apache/druid/client/BrokerServerView.java | 4 +- .../appenderator/SinkQuerySegmentWalker.java | 10 +- .../apache/druid/client/SimpleServerView.java | 7 +- .../server/ClientQuerySegmentWalkerTest.java | 122 ++++++++++++++ 18 files changed, 565 insertions(+), 45 deletions(-) diff --git a/docs/querying/datasource.md b/docs/querying/datasource.md index d6c478d110e..4b35208a67f 100644 --- a/docs/querying/datasource.md +++ b/docs/querying/datasource.md @@ -131,6 +131,9 @@ Union datasources allow you to treat two or more table datasources as a single d do not need to have identical schemas. If they do not fully match up, then columns that exist in one table but not another will be treated as if they contained all null values in the tables where they do not exist. +The list of "dataSources" must be nonempty. If you want to query an empty dataset, use an [`inline` datasource](#inline) +instead. + Union datasources are not available in Druid SQL. Refer to the [Query execution](query-execution.md#union) page for more details on how queries are executed when you diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 4cfa87d8a36..d8d65930483 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -32,17 +32,24 @@ import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.scan.ScanResultValue; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.NoopDataSegmentArchiver; import org.apache.druid.segment.loading.NoopDataSegmentKiller; import org.apache.druid.segment.loading.NoopDataSegmentMover; import org.apache.druid.segment.loading.NoopDataSegmentPusher; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.SetAndVerifyContextQueryRunner; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.easymock.EasyMock; +import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -135,6 +142,21 @@ public class SingleTaskBackgroundRunnerTest ); } + @Test + public void testGetQueryRunner() throws ExecutionException, InterruptedException + { + runner.run(new NoopTask(null, null, "foo", 500L, 0, null, null, null)).get().getStatusCode(); + + final QueryRunner queryRunner = + Druids.newScanQueryBuilder() + .dataSource("foo") + .intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)) + .build() + .getRunner(runner); + + Assert.assertThat(queryRunner, CoreMatchers.instanceOf(SetAndVerifyContextQueryRunner.class)); + } + @Test public void testStop() throws ExecutionException, InterruptedException, TimeoutException { diff --git a/integration-tests/docker/environment-configs/broker b/integration-tests/docker/environment-configs/broker index 6f564b91776..3f2424f5d79 100644 --- a/integration-tests/docker/environment-configs/broker +++ b/integration-tests/docker/environment-configs/broker @@ -21,7 +21,7 @@ DRUID_SERVICE=broker DRUID_LOG_PATH=/shared/logs/broker.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 +SERVICE_DRUID_JAVA_OPTS=-server -Xmx256m -Xms256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 # Druid configs druid_processing_buffer_sizeBytes=25000000 diff --git a/integration-tests/docker/environment-configs/historical b/integration-tests/docker/environment-configs/historical index d86e19ead1a..ed540a85fb4 100644 --- a/integration-tests/docker/environment-configs/historical +++ b/integration-tests/docker/environment-configs/historical @@ -21,7 +21,7 @@ DRUID_SERVICE=historical DRUID_LOG_PATH=/shared/logs/historical.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007 +SERVICE_DRUID_JAVA_OPTS=-server -Xmx768m -Xms768m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007 # Druid configs druid_processing_buffer_sizeBytes=25000000 diff --git a/integration-tests/docker/environment-configs/historical-for-query-retry-test b/integration-tests/docker/environment-configs/historical-for-query-retry-test index 3e1319bb205..53efc7284ee 100644 --- a/integration-tests/docker/environment-configs/historical-for-query-retry-test +++ b/integration-tests/docker/environment-configs/historical-for-query-retry-test @@ -21,7 +21,7 @@ DRUID_SERVICE=historical-for-query-retry-test DRUID_LOG_PATH=/shared/logs/historical-for-query-retry-test.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5010 +SERVICE_DRUID_JAVA_OPTS=-server -Xmx768m -Xms768m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5010 # Druid configs druid_processing_buffer_sizeBytes=25000000 diff --git a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json index d28fe8c445f..90cd3cbc0e1 100644 --- a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json +++ b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json @@ -1933,5 +1933,104 @@ } } ] + }, + { + "description": "groupBy, 1 agg, subquery over join of union to inline", + "query": { + "queryType": "groupBy", + "dataSource": { + "type": "query", + "query": { + "queryType": "groupBy", + "dataSource": { + "type": "join", + "left": { + "type": "union", + "dataSources": ["wikipedia_editstream", "wikipedia_editstream"] + }, + "right": { + "type": "inline", + "columnNames": ["page", "lookupPage"], + "columnTypes": ["string", "string"], + "rows": [ + ["Wikipedia:Vandalismusmeldung", "inline join!"] + ] + }, + "rightPrefix": "j.", + "condition": "page == \"j.page\"", + "joinType": "LEFT" + }, + "intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"], + "granularity": "all", + "virtualColumns": [ + { + "type": "expression", + "name": "lookupPage", + "expression": "nvl(\"j.lookupPage\", \"page\")", + "outputType": "string" + } + ], + "aggregations": [ + { + "type": "count", + "name": "rows" + } + ], + "dimensions": ["lookupPage"] + } + }, + "intervals": ["2013-01-01T00:00:00.000/2013-01-02T00:00:00.000"], + "granularity": "all", + "aggregations": [ + { + "type": "longSum", + "name": "rows_outer", + "fieldName": "rows" + } + ], + "dimensions": ["lookupPage"], + "limitSpec": { + "type": "default", + "columns": [ + { + "dimension": "rows_outer", + "dimensionOrder": "numeric", + "direction": "descending" + } + ], + "limit": 3 + }, + "context": { + "useCache": "true", + "populateCache": "true", + "timeout": 360000 + } + }, + "expectedResults": [ + { + "version": "v1", + "timestamp": "2013-01-01T00:00:00.000Z", + "event": { + "lookupPage": "inline join!", + "rows_outer": 1982 + } + }, + { + "version": "v1", + "timestamp": "2013-01-01T00:00:00.000Z", + "event": { + "lookupPage": "Wikipedia:Administrators'_noticeboard/Incidents", + "rows_outer": 1980 + } + }, + { + "version": "v1", + "timestamp": "2013-01-01T00:00:00.000Z", + "event": { + "lookupPage": "Wikipedia:Administrator_intervention_against_vandalism", + "rows_outer": 1600 + } + } + ] } ] diff --git a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java index 087a666c871..47e8e9ae15a 100644 --- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java @@ -74,6 +74,9 @@ public class JoinDataSource implements DataSource this.joinType = Preconditions.checkNotNull(joinType, "joinType"); } + /** + * Create a join dataSource from a string condition. + */ @JsonCreator public static JoinDataSource create( @JsonProperty("left") DataSource left, @@ -97,6 +100,20 @@ public class JoinDataSource implements DataSource ); } + /** + * Create a join dataSource from an existing {@link JoinConditionAnalysis}. + */ + public static JoinDataSource create( + final DataSource left, + final DataSource right, + final String rightPrefix, + final JoinConditionAnalysis conditionAnalysis, + final JoinType joinType + ) + { + return new JoinDataSource(left, right, rightPrefix, conditionAnalysis, joinType); + } + @Override public Set getTableNames() { diff --git a/processing/src/main/java/org/apache/druid/query/Queries.java b/processing/src/main/java/org/apache/druid/query/Queries.java index 1d69c82cbbd..7c6c4bfe9fc 100644 --- a/processing/src/main/java/org/apache/druid/query/Queries.java +++ b/processing/src/main/java/org/apache/druid/query/Queries.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.planning.PreJoinableClause; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import java.util.Collections; @@ -160,6 +161,7 @@ public class Queries // Verify preconditions and invariants, just in case. final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(retVal.getDataSource()); + // Sanity check: query must be based on a single table. if (!analysis.getBaseTableDataSource().isPresent()) { throw new ISE("Unable to apply specific segments to non-table-based dataSource[%s]", query.getDataSource()); } @@ -172,4 +174,45 @@ public class Queries return retVal; } + + /** + * Rewrite "query" to refer to some specific base datasource, instead of the one it currently refers to. + * + * Unlike the seemingly-similar {@link Query#withDataSource}, this will walk down the datasource tree and replace + * only the base datasource (in the sense defined in {@link DataSourceAnalysis}). + */ + public static Query withBaseDataSource(final Query query, final DataSource newBaseDataSource) + { + final Query retVal; + + if (query.getDataSource() instanceof QueryDataSource) { + final Query subQuery = ((QueryDataSource) query.getDataSource()).getQuery(); + retVal = query.withDataSource(new QueryDataSource(withBaseDataSource(subQuery, newBaseDataSource))); + } else { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + + DataSource current = newBaseDataSource; + + for (final PreJoinableClause clause : analysis.getPreJoinableClauses()) { + current = JoinDataSource.create( + current, + clause.getDataSource(), + clause.getPrefix(), + clause.getCondition(), + clause.getJoinType() + ); + } + + retVal = query.withDataSource(current); + } + + // Verify postconditions, just in case. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(retVal.getDataSource()); + + if (!newBaseDataSource.equals(analysis.getBaseDataSource())) { + throw new ISE("Unable to replace base dataSource"); + } + + return retVal; + } } diff --git a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java index 3bd25b017f1..628ae4230cf 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java @@ -22,10 +22,10 @@ package org.apache.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; import java.util.List; import java.util.Set; @@ -39,7 +39,10 @@ public class UnionDataSource implements DataSource @JsonCreator public UnionDataSource(@JsonProperty("dataSources") List dataSources) { - Preconditions.checkNotNull(dataSources, "dataSources cannot be null for unionDataSource"); + if (dataSources == null || dataSources.isEmpty()) { + throw new ISE("'dataSources' must be non-null and non-empty for 'union'"); + } + this.dataSources = dataSources; } diff --git a/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java b/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java index f0340698059..db864351be0 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java @@ -20,11 +20,14 @@ package org.apache.druid.query; import com.google.common.base.Function; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.MergeSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.planning.DataSourceAnalysis; public class UnionQueryRunner implements QueryRunner { @@ -41,34 +44,51 @@ public class UnionQueryRunner implements QueryRunner public Sequence run(final QueryPlus queryPlus, final ResponseContext responseContext) { Query query = queryPlus.getQuery(); - DataSource dataSource = query.getDataSource(); - if (dataSource instanceof UnionDataSource) { - return new MergeSequence<>( - query.getResultOrdering(), - Sequences.simple( - Lists.transform( - ((UnionDataSource) dataSource).getDataSources(), - new Function>() - { - @Override - public Sequence apply(DataSource singleSource) - { - return baseRunner.run( - queryPlus.withQuery( - query.withDataSource(singleSource) - // assign the subqueryId. this will be used to validate that every query servers - // have responded per subquery in RetryQueryRunner - .withDefaultSubQueryId() - ), - responseContext - ); - } - } - ) - ) - ); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + + if (analysis.isConcreteTableBased() && analysis.getBaseUnionDataSource().isPresent()) { + // Union of tables. + + final UnionDataSource unionDataSource = analysis.getBaseUnionDataSource().get(); + + if (unionDataSource.getDataSources().isEmpty()) { + // Shouldn't happen, because UnionDataSource doesn't allow empty unions. + throw new ISE("Unexpectedly received empty union"); + } else if (unionDataSource.getDataSources().size() == 1) { + // Single table. Run as a normal query. + return baseRunner.run( + queryPlus.withQuery( + Queries.withBaseDataSource( + query, + Iterables.getOnlyElement(unionDataSource.getDataSources()) + ) + ), + responseContext + ); + } else { + // Split up the tables and merge their results. + return new MergeSequence<>( + query.getResultOrdering(), + Sequences.simple( + Lists.transform( + unionDataSource.getDataSources(), + (Function>) singleSource -> + baseRunner.run( + queryPlus.withQuery( + Queries.withBaseDataSource(query, singleSource) + // assign the subqueryId. this will be used to validate that every query servers + // have responded per subquery in RetryQueryRunner + .withDefaultSubQueryId() + ), + responseContext + ) + ) + ) + ); + } } else { + // Not a union of tables. Do nothing special. return baseRunner.run(queryPlus, responseContext); } } diff --git a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java index 35f4a9baf9d..857b45f70b9 100644 --- a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java +++ b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java @@ -168,7 +168,7 @@ public class DataSourceAnalysis } /** - * Returns the baseĀ (bottom-leftmost) datasource. + * Returns the base (bottom-leftmost) datasource. */ public DataSource getBaseDataSource() { @@ -176,8 +176,10 @@ public class DataSourceAnalysis } /** - * Returns the same datasource as {@link #getBaseDataSource()}, but only if it is a table. Useful on data servers, - * since they generally can only handle queries where the base datasource is a table. + * If {@link #getBaseDataSource()} is a {@link TableDataSource}, returns it. Otherwise, returns an empty Optional. + * + * Note that this can return empty even if {@link #isConcreteTableBased()} is true. This happens if the base + * datasource is a {@link UnionDataSource} of {@link TableDataSource}. */ public Optional getBaseTableDataSource() { @@ -188,6 +190,18 @@ public class DataSourceAnalysis } } + /** + * If {@link #getBaseDataSource()} is a {@link UnionDataSource}, returns it. Otherwise, returns an empty Optional. + */ + public Optional getBaseUnionDataSource() + { + if (baseDataSource instanceof UnionDataSource) { + return Optional.of((UnionDataSource) baseDataSource); + } else { + return Optional.empty(); + } + } + /** * Returns the bottommost (i.e. innermost) {@link Query} from a possible stack of outer queries at the root of * the datasource tree. This is the query that will be applied to the base datasource plus any joinables that might @@ -243,8 +257,8 @@ public class DataSourceAnalysis /** * Returns true if this datasource is concrete-based (see {@link #isConcreteBased()}, and the base datasource is a - * 'table' or union of them. This is an important property because it corresponds to datasources that can be handled - * by Druid data servers, like Historicals. + * {@link TableDataSource} or a {@link UnionDataSource} composed entirely of {@link TableDataSource}. This is an + * important property, because it corresponds to datasources that can be handled by Druid's distributed query stack. */ public boolean isConcreteTableBased() { diff --git a/processing/src/test/java/org/apache/druid/query/QueriesTest.java b/processing/src/test/java/org/apache/druid/query/QueriesTest.java index 16c4783619f..78dcafd7352 100644 --- a/processing/src/test/java/org/apache/druid/query/QueriesTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueriesTest.java @@ -22,6 +22,7 @@ package org.apache.druid.query; import com.google.common.collect.ImmutableList; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -32,6 +33,7 @@ import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.join.JoinType; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -330,4 +332,151 @@ public class QueriesTest final Query> ignored = Queries.withSpecificSegments(query, descriptors); } + + @Test + public void testWithBaseDataSourceBasic() + { + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource("bar") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + Queries.withBaseDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + new TableDataSource("bar") + ) + ); + } + + @Test + public void testWithBaseDataSourceSubQueryStack() + { + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource("bar") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + Queries.withBaseDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + new TableDataSource("bar") + ) + ); + } + + @Test + public void testWithBaseDataSourceSubQueryStackWithJoinOnUnion() + { + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + JoinDataSource.create( + new TableDataSource("foo"), + new TableDataSource("bar"), + "j0.", + "\"foo.x\" == \"bar.x\"", + JoinType.INNER, + ExprMacroTable.nil() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + Queries.withBaseDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + JoinDataSource.create( + new UnionDataSource( + ImmutableList.of( + new TableDataSource("foo"), + new TableDataSource("bar") + ) + ), + new TableDataSource("bar"), + "j0.", + "\"foo.x\" == \"bar.x\"", + JoinType.INNER, + ExprMacroTable.nil() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + new TableDataSource("foo") + ) + ); + } } diff --git a/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java index 117225d890b..0c900b3f77c 100644 --- a/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java @@ -52,6 +52,16 @@ public class UnionDataSourceTest ) ); + @Test + public void test_constructor_empty() + { + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("'dataSources' must be non-null and non-empty for 'union'"); + + //noinspection ResultOfObjectAllocationIgnored + new UnionDataSource(Collections.emptyList()); + } + @Test public void test_getTableNames() { diff --git a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java index 597740cad0a..829d5c21e4d 100644 --- a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java +++ b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java @@ -69,6 +69,7 @@ public class DataSourceAnalysisTest Assert.assertEquals(TABLE_FOO, analysis.getDataSource()); Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuery()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); @@ -87,6 +88,7 @@ public class DataSourceAnalysisTest Assert.assertEquals(unionDataSource, analysis.getDataSource()); Assert.assertEquals(unionDataSource, analysis.getBaseDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.of(unionDataSource), analysis.getBaseUnionDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuery()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); @@ -105,6 +107,7 @@ public class DataSourceAnalysisTest Assert.assertEquals(queryDataSource, analysis.getDataSource()); Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource()); Assert.assertEquals(Optional.of(queryDataSource.getQuery()), analysis.getBaseQuery()); Assert.assertEquals( Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)), @@ -127,6 +130,7 @@ public class DataSourceAnalysisTest Assert.assertEquals(queryDataSource, analysis.getDataSource()); Assert.assertEquals(unionDataSource, analysis.getBaseDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.of(unionDataSource), analysis.getBaseUnionDataSource()); Assert.assertEquals(Optional.of(queryDataSource.getQuery()), analysis.getBaseQuery()); Assert.assertEquals( Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)), @@ -147,6 +151,7 @@ public class DataSourceAnalysisTest Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getDataSource()); Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuery()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); @@ -165,6 +170,7 @@ public class DataSourceAnalysisTest Assert.assertEquals(queryDataSource, analysis.getDataSource()); Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource()); Assert.assertEquals(Optional.of(queryDataSource.getQuery()), analysis.getBaseQuery()); Assert.assertEquals( Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)), @@ -185,6 +191,7 @@ public class DataSourceAnalysisTest Assert.assertEquals(INLINE, analysis.getDataSource()); Assert.assertEquals(INLINE, analysis.getBaseDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuery()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); @@ -223,6 +230,8 @@ public class DataSourceAnalysisTest Assert.assertEquals(joinDataSource, analysis.getDataSource()); Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuery()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); Assert.assertEquals( @@ -273,6 +282,7 @@ public class DataSourceAnalysisTest Assert.assertEquals(joinDataSource, analysis.getDataSource()); Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuery()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); Assert.assertEquals( @@ -302,6 +312,7 @@ public class DataSourceAnalysisTest Assert.assertEquals(joinDataSource, analysis.getDataSource()); Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource()); Assert.assertEquals( ImmutableList.of( new PreJoinableClause("1.", subquery(TABLE_FOO), JoinType.INNER, joinClause("1.")) @@ -329,6 +340,7 @@ public class DataSourceAnalysisTest Assert.assertFalse(analysis.isQuery()); Assert.assertEquals(joinDataSource, analysis.getDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.of(unionDataSource), analysis.getBaseUnionDataSource()); Assert.assertEquals(unionDataSource, analysis.getBaseDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuery()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); @@ -364,6 +376,7 @@ public class DataSourceAnalysisTest Assert.assertEquals(queryDataSource, analysis.getDataSource()); Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource()); Assert.assertEquals( Optional.of( subquery( @@ -408,6 +421,7 @@ public class DataSourceAnalysisTest Assert.assertEquals(joinDataSource, analysis.getDataSource()); Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuery()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); Assert.assertEquals( @@ -437,6 +451,7 @@ public class DataSourceAnalysisTest Assert.assertEquals(joinDataSource, analysis.getDataSource()); Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuery()); Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); Assert.assertEquals( diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index debd72131c2..6d668839d7a 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -307,12 +307,12 @@ public class BrokerServerView implements TimelineServerView @Override public Optional> getTimeline(final DataSourceAnalysis analysis) { - final TableDataSource tableDataSource = + final TableDataSource table = analysis.getBaseTableDataSource() .orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource())); synchronized (lock) { - return Optional.ofNullable(timelines.get(tableDataSource.getName())); + return Optional.ofNullable(timelines.get(table.getName())); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 887ad6d2b2a..c9b31d50b4b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -51,7 +51,6 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SinkQueryRunners; -import org.apache.druid.query.TableDataSource; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; @@ -147,12 +146,11 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker { // We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out. final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); - final Optional baseTableDataSource = analysis.getBaseTableDataSource(); - if (!baseTableDataSource.isPresent() || !dataSource.equals(baseTableDataSource.get().getName())) { - // Report error, since we somehow got a query for a datasource we can't handle. - throw new ISE("Cannot handle datasource: %s", analysis.getDataSource()); - } + // Sanity check: make sure the query is based on the table we're meant to handle. + analysis.getBaseTableDataSource() + .filter(ds -> dataSource.equals(ds.getName())) + .orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource())); final QueryRunnerFactory> factory = conglomerate.findFactory(query); if (factory == null) { diff --git a/server/src/test/java/org/apache/druid/client/SimpleServerView.java b/server/src/test/java/org/apache/druid/client/SimpleServerView.java index 65da4afc79c..d71fbf48472 100644 --- a/server/src/test/java/org/apache/druid/client/SimpleServerView.java +++ b/server/src/test/java/org/apache/druid/client/SimpleServerView.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryWatcher; +import org.apache.druid.query.TableDataSource; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -136,7 +137,11 @@ public class SimpleServerView implements TimelineServerView @Override public Optional> getTimeline(DataSourceAnalysis analysis) { - return Optional.ofNullable(timelines.get(analysis.getBaseTableDataSource().get().getName())); + final TableDataSource table = + analysis.getBaseTableDataSource() + .orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource())); + + return Optional.ofNullable(timelines.get(table.getName())); } @Override diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 377a59fe8f7..1582378b00d 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -445,6 +445,39 @@ public class ClientQuerySegmentWalkerTest Assert.assertEquals(2, scheduler.getTotalReleased().get()); } + @Test + public void testGroupByOnUnionOfOneTable() + { + final GroupByQuery query = + (GroupByQuery) GroupByQuery.builder() + .setDataSource( + new UnionDataSource(ImmutableList.of(new TableDataSource(FOO))) + ) + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ONLY_ETERNITY) + .setDimensions(DefaultDimensionSpec.of("s")) + .setAggregatorSpecs(new CountAggregatorFactory("cnt")) + .build() + .withId(UUID.randomUUID().toString()); + + testQuery( + query, + ImmutableList.of( + ExpectedQuery.cluster(query.withDataSource(new TableDataSource(FOO))) + ), + ImmutableList.of( + new Object[]{"x", 2L}, + new Object[]{"y", 1L}, + new Object[]{"z", 1L} + ) + ); + + Assert.assertEquals(1, scheduler.getTotalRun().get()); + Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(1, scheduler.getTotalAcquired().get()); + Assert.assertEquals(1, scheduler.getTotalReleased().get()); + } + @Test public void testJoinOnGroupByOnTable() { @@ -505,6 +538,95 @@ public class ClientQuerySegmentWalkerTest Assert.assertEquals(2, scheduler.getTotalReleased().get()); } + @Test + public void testJoinOnGroupByOnUnionOfTables() + { + final UnionDataSource unionDataSource = new UnionDataSource( + ImmutableList.of( + new TableDataSource(FOO), + new TableDataSource(BAR) + ) + ); + + final GroupByQuery subquery = + GroupByQuery.builder() + .setDataSource(unionDataSource) + .setGranularity(Granularities.ALL) + .setInterval(Collections.singletonList(INTERVAL)) + .setDimensions(DefaultDimensionSpec.of("s")) + .setDimFilter(new SelectorDimFilter("s", "y", null)) + .build(); + + final GroupByQuery query = + (GroupByQuery) GroupByQuery.builder() + .setDataSource( + JoinDataSource.create( + unionDataSource, + new QueryDataSource(subquery), + "j.", + "\"j.s\" == \"s\"", + JoinType.INNER, + ExprMacroTable.nil() + ) + ) + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ONLY_ETERNITY) + .setDimensions(DefaultDimensionSpec.of("s"), DefaultDimensionSpec.of("j.s")) + .setAggregatorSpecs(new CountAggregatorFactory("cnt")) + .build() + .withId(UUID.randomUUID().toString()); + + testQuery( + query, + ImmutableList.of( + ExpectedQuery.cluster( + subquery.withDataSource( + subquery.getDataSource().getChildren().get(0) + ) + ), + ExpectedQuery.cluster( + subquery.withDataSource( + subquery.getDataSource().getChildren().get(1) + ) + ), + ExpectedQuery.cluster( + query.withDataSource( + query.getDataSource().withChildren( + ImmutableList.of( + unionDataSource.getChildren().get(0), + InlineDataSource.fromIterable( + ImmutableList.of(new Object[]{"y"}), + RowSignature.builder().add("s", ValueType.STRING).build() + ) + ) + ) + ) + ), + ExpectedQuery.cluster( + query.withDataSource( + query.getDataSource().withChildren( + ImmutableList.of( + unionDataSource.getChildren().get(1), + InlineDataSource.fromIterable( + ImmutableList.of(new Object[]{"y"}), + RowSignature.builder().add("s", ValueType.STRING).build() + ) + ) + ) + ) + ) + ), + ImmutableList.of(new Object[]{"y", "y", 1L}) + ); + + // note: this should really be 1, but in the interim queries that are composed of multiple queries count each + // invocation of either the cluster or local walker in ClientQuerySegmentWalker + Assert.assertEquals(4, scheduler.getTotalRun().get()); + Assert.assertEquals(4, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(4, scheduler.getTotalAcquired().get()); + Assert.assertEquals(4, scheduler.getTotalReleased().get()); + } + @Test public void testGroupByOnScanMultiValue() {