From d886463253e1f537e32b8aedac60c8ee741f6cc7 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 22 Jan 2020 14:54:47 -0800 Subject: [PATCH] Add join-related DataSource types, and analysis functionality. (#9235) * Add join-related DataSource types, and analysis functionality. Builds on #9111 and implements the datasource analysis mentioned in #8728. Still can't handle join datasources, but we're a step closer. Join-related DataSource types: 1) Add "join", "lookup", and "inline" datasources. 2) Add "getChildren" and "withChildren" methods to DataSource, which will be used in the future for query rewriting (e.g. inlining of subqueries). DataSource analysis functionality: 1) Add DataSourceAnalysis class, which breaks down datasources into three components: outer queries, a base datasource (left-most of the highest level left-leaning join tree), and other joined-in leaf datasources (the right-hand branches of the left-leaning join tree). 2) Add "isConcrete", "isGlobal", and "isCacheable" methods to DataSource in order to support analysis. Other notes: 1) Renamed DataSource#getNames to DataSource#getTableNames, which I think is clearer. Also, made it a Set, so implementations don't need to worry about duplicates. 2) The addition of "isCacheable" should work around #8713, since UnionDataSource now returns false for cacheability. * Remove javadoc comment. * Updates reflecting code review. * Add comments. * Add more comments. --- .../CachingClusteredClientBenchmark.java | 2 +- .../overlord/SingleTaskBackgroundRunner.java | 2 +- .../org/apache/druid/query/BaseQuery.java | 8 +- .../org/apache/druid/query/DataSource.java | 63 ++- .../apache/druid/query/DataSourceUtil.java | 31 -- .../druid/query/DefaultQueryMetrics.java | 20 +- .../apache/druid/query/InlineDataSource.java | 241 +++++++++ .../apache/druid/query/JoinDataSource.java | 212 ++++++++ .../apache/druid/query/LookupDataSource.java | 125 +++++ .../java/org/apache/druid/query/Queries.java | 1 + .../apache/druid/query/QueryDataSource.java | 45 +- .../apache/druid/query/TableDataSource.java | 48 +- .../apache/druid/query/UnionDataSource.java | 54 +- .../query/planning/DataSourceAnalysis.java | 282 ++++++++++ .../query/planning/PreJoinableClause.java | 107 ++++ .../druid/segment/join/JoinableClause.java | 5 +- .../apache/druid/segment/join/Joinables.java | 55 ++ .../apache/druid/query/DataSourceTest.java | 10 +- .../druid/query/InlineDataSourceTest.java | 295 +++++++++++ .../druid/query/JoinDataSourceTest.java | 160 ++++++ .../druid/query/LookupDataSourceTest.java | 102 ++++ .../druid/query/QueryDataSourceTest.java | 156 ++++++ .../druid/query/TableDataSourceTest.java | 115 +++++ .../druid/query/UnionDataSourceTest.java | 144 ++++++ .../druid/query/UnionQueryRunnerTest.java | 2 +- .../metadata/SegmentMetadataQueryTest.java | 4 +- .../planning/DataSourceAnalysisTest.java | 482 ++++++++++++++++++ .../query/planning/PreJoinableClauseTest.java | 71 +++ .../druid/segment/join/JoinablesTest.java | 77 +++ .../apache/druid/client/BrokerServerView.java | 3 +- .../org/apache/druid/client/CacheUtil.java | 154 ++++-- .../druid/client/CachingClusteredClient.java | 4 +- .../druid/client/CachingQueryRunner.java | 9 +- .../druid/client/CoordinatorServerView.java | 2 +- .../druid/client/ResultLevelCacheUtil.java | 90 ---- .../query/ResultLevelCachingQueryRunner.java | 17 +- .../appenderator/SinkQuerySegmentWalker.java | 7 + .../server/ClientQuerySegmentWalker.java | 29 +- .../apache/druid/server/QueryLifecycle.java | 4 +- .../org/apache/druid/server/QueryManager.java | 3 +- .../apache/druid/server/QueryResource.java | 2 +- .../server/coordination/ServerManager.java | 20 +- .../server/log/LoggingRequestLogger.java | 31 +- .../router/TieredBrokerHostSelector.java | 2 +- .../apache/druid/client/CacheUtilTest.java | 192 +++++++ .../sql/calcite/rel/DruidOuterQueryRel.java | 5 +- .../druid/sql/calcite/rel/DruidQueryRel.java | 6 +- .../druid/sql/calcite/rel/DruidRel.java | 6 +- .../druid/sql/calcite/rel/DruidSemiJoin.java | 10 +- .../druid/sql/calcite/rel/DruidUnionRel.java | 6 +- 50 files changed, 3252 insertions(+), 269 deletions(-) delete mode 100644 processing/src/main/java/org/apache/druid/query/DataSourceUtil.java create mode 100644 processing/src/main/java/org/apache/druid/query/InlineDataSource.java create mode 100644 processing/src/main/java/org/apache/druid/query/JoinDataSource.java create mode 100644 processing/src/main/java/org/apache/druid/query/LookupDataSource.java create mode 100644 processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java create mode 100644 processing/src/main/java/org/apache/druid/query/planning/PreJoinableClause.java create mode 100644 processing/src/main/java/org/apache/druid/segment/join/Joinables.java create mode 100644 processing/src/test/java/org/apache/druid/query/InlineDataSourceTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/LookupDataSourceTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/planning/PreJoinableClauseTest.java create mode 100644 processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java delete mode 100644 server/src/main/java/org/apache/druid/client/ResultLevelCacheUtil.java create mode 100644 server/src/test/java/org/apache/druid/client/CacheUtilTest.java diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index 2d4d0f5a650..d7b8b63bedf 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -522,7 +522,7 @@ public class CachingClusteredClientBenchmark @Override public TimelineLookup getTimeline(DataSource dataSource) { - final String table = Iterables.getOnlyElement(dataSource.getNames()); + final String table = Iterables.getOnlyElement(dataSource.getTableNames()); return timelines.get(table); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index fab5a4f1391..ecb5d9ab03d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -328,7 +328,7 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke private QueryRunner getQueryRunnerImpl(Query query) { QueryRunner queryRunner = null; - final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames()); + final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getTableNames()); if (runningItem != null) { final Task task = runningItem.getTask(); diff --git a/processing/src/main/java/org/apache/druid/query/BaseQuery.java b/processing/src/main/java/org/apache/druid/query/BaseQuery.java index e967d54c601..cc2cd6df6b4 100644 --- a/processing/src/main/java/org/apache/druid/query/BaseQuery.java +++ b/processing/src/main/java/org/apache/druid/query/BaseQuery.java @@ -266,18 +266,20 @@ public abstract class BaseQuery implements Query return false; } BaseQuery baseQuery = (BaseQuery) o; + + // Must use getDuration() instead of "duration" because duration is lazily computed. return descending == baseQuery.descending && Objects.equals(dataSource, baseQuery.dataSource) && Objects.equals(context, baseQuery.context) && Objects.equals(querySegmentSpec, baseQuery.querySegmentSpec) && - Objects.equals(duration, baseQuery.duration) && + Objects.equals(getDuration(), baseQuery.getDuration()) && Objects.equals(granularity, baseQuery.granularity); } @Override public int hashCode() { - - return Objects.hash(dataSource, descending, context, querySegmentSpec, duration, granularity); + // Must use getDuration() instead of "duration" because duration is lazily computed. + return Objects.hash(dataSource, descending, context, querySegmentSpec, getDuration(), granularity); } } diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index 13a3cf5cc4b..549b06b0b3e 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -23,17 +23,62 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import java.util.List; +import java.util.Set; -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, - include = JsonTypeInfo.As.PROPERTY, - property = "type", - defaultImpl = LegacyDataSource.class) +/** + * Represents a source... of data... for a query. Analogous to the "FROM" clause in SQL. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LegacyDataSource.class) @JsonSubTypes({ - @JsonSubTypes.Type(value = TableDataSource.class, name = "table"), - @JsonSubTypes.Type(value = QueryDataSource.class, name = "query"), - @JsonSubTypes.Type(value = UnionDataSource.class, name = "union") - }) + @JsonSubTypes.Type(value = TableDataSource.class, name = "table"), + @JsonSubTypes.Type(value = QueryDataSource.class, name = "query"), + @JsonSubTypes.Type(value = UnionDataSource.class, name = "union"), + @JsonSubTypes.Type(value = JoinDataSource.class, name = "join"), + @JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"), + @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline") +}) public interface DataSource { - List getNames(); + /** + * Returns the names of all table datasources involved in this query. Does not include names for non-tables, like + * lookups or inline datasources. + */ + Set getTableNames(); + + /** + * Returns datasources that this datasource depends on. Will be empty for leaf datasources like 'table'. + */ + List getChildren(); + + /** + * Return a new DataSource, identical to this one, with different children. The number of children must be equal + * to the number of children that this datasource already has. + */ + DataSource withChildren(List children); + + /** + * Returns true if queries on this dataSource are cacheable at both the result level and per-segment level. + * Currently, dataSources that modify the behavior of per-segment processing are not cacheable (like 'join'). + * Nor are dataSources that do not actually reference segments (like 'inline'), since cache keys are always based + * on segment identifiers. + * + * Note: Ideally, queries on 'join' datasources _would_ be cacheable, but we cannot currently do this due to lacking + * the code necessary to compute cache keys properly. + */ + boolean isCacheable(); + + /** + * Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or + * for queries of those. + */ + boolean isGlobal(); + + /** + * Returns true if this datasource represents concrete data that can be scanned via a + * {@link org.apache.druid.segment.Segment} adapter of some kind. True for e.g. 'table' but not for 'query' or 'join'. + * + * @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteBased() which uses this + * @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteTableBased() which uses this + */ + boolean isConcrete(); } diff --git a/processing/src/main/java/org/apache/druid/query/DataSourceUtil.java b/processing/src/main/java/org/apache/druid/query/DataSourceUtil.java deleted file mode 100644 index fdc7747f2d9..00000000000 --- a/processing/src/main/java/org/apache/druid/query/DataSourceUtil.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query; - -import java.util.List; - -public class DataSourceUtil -{ - public static String getMetricName(DataSource dataSource) - { - final List names = dataSource.getNames(); - return names.size() == 1 ? names.get(0) : names.toString(); - } -} diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java index 145f8dac352..fcdebd22bc8 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java @@ -20,6 +20,7 @@ package org.apache.druid.query; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -30,7 +31,9 @@ import org.joda.time.Interval; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the @@ -42,9 +45,22 @@ public class DefaultQueryMetrics> implements QueryMet protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); protected final Map metrics = new HashMap<>(); - /** Non final to give subclasses ability to reassign it. */ + /** + * Non final to give subclasses ability to reassign it. + */ protected Thread ownerThread = Thread.currentThread(); + private static String getTableNamesAsString(DataSource dataSource) + { + final Set names = dataSource.getTableNames(); + + if (names.size() == 1) { + return Iterables.getOnlyElement(names); + } else { + return names.stream().sorted().collect(Collectors.toList()).toString(); + } + } + protected void checkModifiedFromOwnerThread() { if (Thread.currentThread() != ownerThread) { @@ -77,7 +93,7 @@ public class DefaultQueryMetrics> implements QueryMet @Override public void dataSource(QueryType query) { - setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource())); + setDimension(DruidMetrics.DATASOURCE, getTableNamesAsString(query.getDataSource())); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java new file mode 100644 index 00000000000..890fbc00ba4 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.RowAdapter; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.function.ToLongFunction; + +/** + * Represents an inline datasource, where the rows are embedded within the DataSource object itself. + * + * The rows are backed by an Iterable, which can be lazy or not. Lazy datasources will only be iterated if someone calls + * {@link #getRows()} and iterates the result, or until someone calls {@link #getRowsAsList()}. + */ +public class InlineDataSource implements DataSource +{ + private final List columnNames; + private final List columnTypes; + private final Iterable rows; + + private InlineDataSource( + final List columnNames, + final List columnTypes, + final Iterable rows + ) + { + this.columnNames = Preconditions.checkNotNull(columnNames, "'columnNames' must be nonnull"); + this.columnTypes = Preconditions.checkNotNull(columnTypes, "'columnTypes' must be nonnull"); + this.rows = Preconditions.checkNotNull(rows, "'rows' must be nonnull"); + + if (columnNames.size() != columnTypes.size()) { + throw new IAE("columnNames and columnTypes must be the same length"); + } + } + + /** + * Factory method for Jackson. Used for inline datasources that were originally encoded as JSON. Private because + * non-Jackson callers should use {@link #fromIterable}. + */ + @JsonCreator + private static InlineDataSource fromJson( + @JsonProperty("columnNames") List columnNames, + @JsonProperty("columnTypes") List columnTypes, + @JsonProperty("rows") List rows + ) + { + return new InlineDataSource(columnNames, columnTypes, rows); + } + + /** + * Creates an inline datasource from an Iterable. The Iterable will not be iterated until someone calls + * {@link #getRows()} and iterates the result, or until someone calls {@link #getRowsAsList()}. + * + * @param columnNames names of each column in the rows + * @param columnTypes types of each column in the rows + * @param rows rows, each of the same length as columnNames and columnTypes + */ + public static InlineDataSource fromIterable( + final List columnNames, + final List columnTypes, + final Iterable rows + ) + { + return new InlineDataSource(columnNames, columnTypes, rows); + } + + @Override + public Set getTableNames() + { + return Collections.emptySet(); + } + + @JsonProperty + public List getColumnNames() + { + return columnNames; + } + + @JsonProperty + public List getColumnTypes() + { + return columnTypes; + } + + /** + * Returns rows as a list. If the original Iterable behind this datasource was a List, this method will return it + * as-is, without copying it. Otherwise, this method will walk the iterable and copy it into a List before returning. + */ + @JsonProperty("rows") + public List getRowsAsList() + { + return rows instanceof List ? ((List) rows) : Lists.newArrayList(rows); + } + + /** + * Returns rows as an Iterable. + */ + @JsonIgnore + public Iterable getRows() + { + return rows; + } + + @Override + public List getChildren() + { + return Collections.emptyList(); + } + + @Override + public DataSource withChildren(List children) + { + if (!children.isEmpty()) { + throw new IAE("Cannot accept children"); + } + + return this; + } + + @Override + public boolean isCacheable() + { + return false; + } + + @Override + public boolean isGlobal() + { + return true; + } + + @Override + public boolean isConcrete() + { + return false; + } + + public Map getRowSignature() + { + final ImmutableMap.Builder retVal = ImmutableMap.builder(); + + for (int i = 0; i < columnNames.size(); i++) { + retVal.put(columnNames.get(i), columnTypes.get(i)); + } + + return retVal.build(); + } + + public RowAdapter rowAdapter() + { + return new RowAdapter() + { + @Override + public ToLongFunction timestampFunction() + { + final int columnNumber = columnNames.indexOf(ColumnHolder.TIME_COLUMN_NAME); + + if (columnNumber >= 0) { + return row -> (long) row[columnNumber]; + } else { + return row -> 0L; + } + } + + @Override + public Function columnFunction(String columnName) + { + final int columnNumber = columnNames.indexOf(columnName); + + if (columnNumber >= 0) { + return row -> row[columnNumber]; + } else { + return row -> null; + } + } + }; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + InlineDataSource that = (InlineDataSource) o; + return Objects.equals(columnNames, that.columnNames) && + Objects.equals(columnTypes, that.columnTypes) && + Objects.equals(rows, that.rows); + } + + @Override + public int hashCode() + { + return Objects.hash(columnNames, columnTypes, rows); + } + + @Override + public String toString() + { + // Don't include 'rows' in stringification, because it might be long and/or lazy. + return "InlineDataSource{" + + "columnNames=" + columnNames + + ", columnTypes=" + columnTypes + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java new file mode 100644 index 00000000000..087a666c871 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.JoinType; +import org.apache.druid.segment.join.Joinables; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Represents a join of two datasources. + * + * Logically, this datasource contains the result of: + * + * (1) prefixing all right-side columns with "rightPrefix" + * (2) then, joining the left and (prefixed) right sides using the provided type and condition + * + * Any columns from the left-hand side that start with "rightPrefix", and are at least one character longer than + * the prefix, will be shadowed. It is up to the caller to ensure that no important columns are shadowed by the + * chosen prefix. + * + * When analyzed by {@link org.apache.druid.query.planning.DataSourceAnalysis}, the right-hand side of this datasource + * will become a {@link org.apache.druid.query.planning.PreJoinableClause} object. + */ +public class JoinDataSource implements DataSource +{ + private final DataSource left; + private final DataSource right; + private final String rightPrefix; + private final JoinConditionAnalysis conditionAnalysis; + private final JoinType joinType; + + private JoinDataSource( + DataSource left, + DataSource right, + String rightPrefix, + JoinConditionAnalysis conditionAnalysis, + JoinType joinType + ) + { + this.left = Preconditions.checkNotNull(left, "left"); + this.right = Preconditions.checkNotNull(right, "right"); + this.rightPrefix = Joinables.validatePrefix(rightPrefix); + this.conditionAnalysis = Preconditions.checkNotNull(conditionAnalysis, "conditionAnalysis"); + this.joinType = Preconditions.checkNotNull(joinType, "joinType"); + } + + @JsonCreator + public static JoinDataSource create( + @JsonProperty("left") DataSource left, + @JsonProperty("right") DataSource right, + @JsonProperty("rightPrefix") String rightPrefix, + @JsonProperty("condition") String condition, + @JsonProperty("joinType") JoinType joinType, + @JacksonInject ExprMacroTable macroTable + ) + { + return new JoinDataSource( + left, + right, + StringUtils.nullToEmptyNonDruidDataString(rightPrefix), + JoinConditionAnalysis.forExpression( + Preconditions.checkNotNull(condition, "condition"), + StringUtils.nullToEmptyNonDruidDataString(rightPrefix), + macroTable + ), + joinType + ); + } + + @Override + public Set getTableNames() + { + final Set names = new HashSet<>(); + names.addAll(left.getTableNames()); + names.addAll(right.getTableNames()); + return names; + } + + @JsonProperty + public DataSource getLeft() + { + return left; + } + + @JsonProperty + public DataSource getRight() + { + return right; + } + + @JsonProperty + public String getRightPrefix() + { + return rightPrefix; + } + + @JsonProperty + public String getCondition() + { + return conditionAnalysis.getOriginalExpression(); + } + + public JoinConditionAnalysis getConditionAnalysis() + { + return conditionAnalysis; + } + + @JsonProperty + public JoinType getJoinType() + { + return joinType; + } + + @Override + public List getChildren() + { + return ImmutableList.of(left, right); + } + + @Override + public DataSource withChildren(List children) + { + if (children.size() != 2) { + throw new IAE("Expected [2] children, got [%d]", children.size()); + } + + return new JoinDataSource(children.get(0), children.get(1), rightPrefix, conditionAnalysis, joinType); + } + + @Override + public boolean isCacheable() + { + return false; + } + + @Override + public boolean isGlobal() + { + return left.isGlobal() && right.isGlobal(); + } + + @Override + public boolean isConcrete() + { + return false; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JoinDataSource that = (JoinDataSource) o; + return Objects.equals(left, that.left) && + Objects.equals(right, that.right) && + Objects.equals(rightPrefix, that.rightPrefix) && + Objects.equals(conditionAnalysis, that.conditionAnalysis) && + joinType == that.joinType; + } + + @Override + public int hashCode() + { + return Objects.hash(left, right, rightPrefix, conditionAnalysis, joinType); + } + + @Override + public String toString() + { + return "JoinDataSource{" + + "left=" + left + + ", right=" + right + + ", rightPrefix='" + rightPrefix + '\'' + + ", condition=" + conditionAnalysis + + ", joinType=" + joinType + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java new file mode 100644 index 00000000000..a2c99f7d1fd --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Represents a lookup. + * + * Currently, this datasource is not actually queryable, and attempts to do so will lead to errors. It is here as a + * placeholder for a future time in which it will become queryable. + * + * The "lookupName" referred to here should be provided by a + * {@link org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider}. + */ +public class LookupDataSource implements DataSource +{ + private final String lookupName; + + @JsonCreator + public LookupDataSource( + @JsonProperty("lookup") String lookupName + ) + { + this.lookupName = Preconditions.checkNotNull(lookupName, "lookup"); + } + + @Override + public Set getTableNames() + { + return Collections.emptySet(); + } + + @JsonProperty("lookup") + public String getLookupName() + { + return lookupName; + } + + @Override + public List getChildren() + { + return Collections.emptyList(); + } + + @Override + public DataSource withChildren(List children) + { + if (!children.isEmpty()) { + throw new IAE("Cannot accept children"); + } + + return this; + } + + @Override + public boolean isCacheable() + { + return false; + } + + @Override + public boolean isGlobal() + { + return true; + } + + @Override + public boolean isConcrete() + { + return false; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LookupDataSource that = (LookupDataSource) o; + return Objects.equals(lookupName, that.lookupName); + } + + @Override + public int hashCode() + { + return Objects.hash(lookupName); + } + + @Override + public String toString() + { + return "LookupDataSource{" + + "lookupName='" + lookupName + '\'' + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/Queries.java b/processing/src/main/java/org/apache/druid/query/Queries.java index 1fbe33587d0..37408a4aea3 100644 --- a/processing/src/main/java/org/apache/druid/query/Queries.java +++ b/processing/src/main/java/org/apache/druid/query/Queries.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; /** + * */ @PublicApi public class Queries diff --git a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java index 5e5201711d8..94d47d511f4 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java @@ -22,8 +22,13 @@ package org.apache.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import org.apache.druid.java.util.common.IAE; +import java.util.Collections; import java.util.List; +import java.util.Set; @JsonTypeName("query") public class QueryDataSource implements DataSource @@ -34,13 +39,13 @@ public class QueryDataSource implements DataSource @JsonCreator public QueryDataSource(@JsonProperty("query") Query query) { - this.query = query; + this.query = Preconditions.checkNotNull(query, "'query' must be nonnull"); } @Override - public List getNames() + public Set getTableNames() { - return query.getDataSource().getNames(); + return query.getDataSource().getTableNames(); } @JsonProperty @@ -49,6 +54,40 @@ public class QueryDataSource implements DataSource return query; } + @Override + public List getChildren() + { + return Collections.singletonList(query.getDataSource()); + } + + @Override + public DataSource withChildren(List children) + { + if (children.size() != 1) { + throw new IAE("Must have exactly one child"); + } + + return new QueryDataSource(query.withDataSource(Iterables.getOnlyElement(children))); + } + + @Override + public boolean isCacheable() + { + return false; + } + + @Override + public boolean isGlobal() + { + return query.getDataSource().isGlobal(); + } + + @Override + public boolean isConcrete() + { + return false; + } + @Override public String toString() { diff --git a/processing/src/main/java/org/apache/druid/query/TableDataSource.java b/processing/src/main/java/org/apache/druid/query/TableDataSource.java index f26aa53b93a..4c371cf8451 100644 --- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java @@ -22,20 +22,22 @@ package org.apache.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; import java.util.Collections; import java.util.List; +import java.util.Set; @JsonTypeName("table") public class TableDataSource implements DataSource { - @JsonProperty private final String name; @JsonCreator public TableDataSource(@JsonProperty("name") String name) { - this.name = (name == null ? null : name); + this.name = Preconditions.checkNotNull(name, "'name' must be nonnull"); } @JsonProperty @@ -45,9 +47,43 @@ public class TableDataSource implements DataSource } @Override - public List getNames() + public Set getTableNames() { - return Collections.singletonList(name); + return Collections.singleton(name); + } + + @Override + public List getChildren() + { + return Collections.emptyList(); + } + + @Override + public DataSource withChildren(List children) + { + if (!children.isEmpty()) { + throw new IAE("Cannot accept children"); + } + + return this; + } + + @Override + public boolean isCacheable() + { + return true; + } + + @Override + public boolean isGlobal() + { + return false; + } + + @Override + public boolean isConcrete() + { + return true; } @Override @@ -57,7 +93,7 @@ public class TableDataSource implements DataSource } @Override - public boolean equals(Object o) + public final boolean equals(Object o) { if (this == o) { return true; @@ -76,7 +112,7 @@ public class TableDataSource implements DataSource } @Override - public int hashCode() + public final int hashCode() { return name.hashCode(); } diff --git a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java index 290ac538b0a..3bd25b017f1 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java @@ -23,9 +23,12 @@ package org.apache.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import org.apache.druid.java.util.common.IAE; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; public class UnionDataSource implements DataSource @@ -41,9 +44,11 @@ public class UnionDataSource implements DataSource } @Override - public List getNames() + public Set getTableNames() { - return dataSources.stream().map(input -> Iterables.getOnlyElement(input.getNames())).collect(Collectors.toList()); + return dataSources.stream() + .map(input -> Iterables.getOnlyElement(input.getTableNames())) + .collect(Collectors.toSet()); } @JsonProperty @@ -52,6 +57,51 @@ public class UnionDataSource implements DataSource return dataSources; } + @Override + public List getChildren() + { + return ImmutableList.copyOf(dataSources); + } + + @Override + public DataSource withChildren(List children) + { + if (children.size() != dataSources.size()) { + throw new IAE("Expected [%d] children, got [%d]", dataSources.size(), children.size()); + } + + if (!children.stream().allMatch(dataSource -> dataSource instanceof TableDataSource)) { + throw new IAE("All children must be tables"); + } + + return new UnionDataSource( + children.stream().map(dataSource -> (TableDataSource) dataSource).collect(Collectors.toList()) + ); + } + + @Override + public boolean isCacheable() + { + // Disables result-level caching for 'union' datasources, which doesn't work currently. + // See https://github.com/apache/druid/issues/8713 for reference. + // + // Note that per-segment caching is still effective, since at the time the per-segment cache evaluates a query + // for cacheability, it would have already been rewritten to a query on a single table. + return false; + } + + @Override + public boolean isGlobal() + { + return dataSources.stream().allMatch(DataSource::isGlobal); + } + + @Override + public boolean isConcrete() + { + return dataSources.stream().allMatch(DataSource::isConcrete); + } + @Override public boolean equals(Object o) { diff --git a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java new file mode 100644 index 00000000000..4237e50dc47 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.planning; + +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.JoinDataSource; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.spec.QuerySegmentSpec; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Analysis of a datasource for purposes of deciding how to execute a particular query. + * + * The analysis breaks a datasource down in the following way: + * + *
+ *
+ *                             Q  <-- Possible outer query datasource(s) [may be multiple stacked]
+ *                             |
+ *                             J  <-- Possible join tree, expected to be left-leaning
+ *                            / \
+ *                           J  Dj <--  Other leaf datasources
+ *   Base datasource        / \         which will be joined
+ *  (bottom-leftmost) -->  Db Dj  <---- into the base datasource
+ *
+ * 
+ * + * The base datasource (Db) is returned by {@link #getBaseDataSource()}. The other leaf datasources are returned by + * {@link #getPreJoinableClauses()}. The outer query datasources are available as part of {@link #getDataSource()}, + * which just returns the original datasource that was provided for analysis. + * + * The base datasource (Db) will never be a join, but it can be any other type of datasource (table, query, etc). + * Note that join trees are only flattened if they occur at the top of the overall tree (or underneath an outer query), + * and that join trees are only flattened to the degree that they are left-leaning. Due to these facts, it is possible + * for the base or leaf datasources to include additional joins. + * + * The base datasource is the one that will be considered by the core Druid query stack for scanning via + * {@link org.apache.druid.segment.Segment} and {@link org.apache.druid.segment.StorageAdapter}. The other leaf + * datasources must be joinable onto the base data. + * + * The idea here is to keep things simple and dumb. So we focus only on identifying left-leaning join trees, which map + * neatly onto a series of hash table lookups at query time. The user/system generating the queries, e.g. the druid-sql + * layer (or the end user in the case of native queries), is responsible for containing the smarts to structure the + * tree in a way that will lead to optimal execution. + */ +public class DataSourceAnalysis +{ + private final DataSource dataSource; + private final DataSource baseDataSource; + @Nullable + private final QuerySegmentSpec baseQuerySegmentSpec; + private final List preJoinableClauses; + + private DataSourceAnalysis( + DataSource dataSource, + DataSource baseDataSource, + @Nullable QuerySegmentSpec baseQuerySegmentSpec, + List preJoinableClauses + ) + { + if (baseDataSource instanceof JoinDataSource) { + // The base cannot be a join (this is a class invariant). + // If it happens, it's a bug in the datasource analyzer. + throw new IAE("Base dataSource cannot be a join! Original dataSource was: %s", dataSource); + } + + this.dataSource = dataSource; + this.baseDataSource = baseDataSource; + this.baseQuerySegmentSpec = baseQuerySegmentSpec; + this.preJoinableClauses = preJoinableClauses; + } + + public static DataSourceAnalysis forDataSource(final DataSource dataSource) + { + // Strip outer queries, retaining querySegmentSpecs as we go down (lowest will become the 'baseQuerySegmentSpec'). + QuerySegmentSpec baseQuerySegmentSpec = null; + DataSource current = dataSource; + + while (current instanceof QueryDataSource) { + final Query subQuery = ((QueryDataSource) current).getQuery(); + + if (!(subQuery instanceof BaseQuery)) { + // All builtin query types are BaseQuery, so we only expect this with funky extension queries. + throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName()); + } + + baseQuerySegmentSpec = ((BaseQuery) subQuery).getQuerySegmentSpec(); + current = subQuery.getDataSource(); + } + + if (current instanceof JoinDataSource) { + final Pair> flattened = flattenJoin((JoinDataSource) current); + return new DataSourceAnalysis(dataSource, flattened.lhs, baseQuerySegmentSpec, flattened.rhs); + } else { + return new DataSourceAnalysis(dataSource, current, baseQuerySegmentSpec, Collections.emptyList()); + } + } + + /** + * Flatten a datasource into two parts: the left-hand side datasource (the 'base' datasource), and a list of join + * clauses, if any. + * + * @throws IllegalArgumentException if dataSource cannot be fully flattened. + */ + private static Pair> flattenJoin(final JoinDataSource dataSource) + { + DataSource current = dataSource; + final List preJoinableClauses = new ArrayList<>(); + + while (current instanceof JoinDataSource) { + final JoinDataSource joinDataSource = (JoinDataSource) current; + current = joinDataSource.getLeft(); + preJoinableClauses.add( + new PreJoinableClause( + joinDataSource.getRightPrefix(), + joinDataSource.getRight(), + joinDataSource.getJoinType(), + joinDataSource.getConditionAnalysis() + ) + ); + } + + // Join clauses were added in the order we saw them while traversing down, but we need to apply them in the + // going-up order. So reverse them. + Collections.reverse(preJoinableClauses); + + return Pair.of(current, preJoinableClauses); + } + + /** + * Returns the topmost datasource: the original one passed to {@link #forDataSource(DataSource)}. + */ + public DataSource getDataSource() + { + return dataSource; + } + + /** + * Returns the baseĀ (bottom-leftmost) datasource. + */ + public DataSource getBaseDataSource() + { + return baseDataSource; + } + + /** + * Returns the same datasource as {@link #getBaseDataSource()}, but only if it is a table. Useful on data servers, + * since they generally can only handle queries where the base datasource is a table. + */ + public Optional getBaseTableDataSource() + { + if (baseDataSource instanceof TableDataSource) { + return Optional.of((TableDataSource) baseDataSource); + } else { + return Optional.empty(); + } + } + + /** + * Returns the {@link QuerySegmentSpec} that is associated with the base datasource, if any. This only happens + * when there is an outer query datasource. In this case, the base querySegmentSpec is the one associated with the + * innermost subquery. + */ + public Optional getBaseQuerySegmentSpec() + { + return Optional.ofNullable(baseQuerySegmentSpec); + } + + /** + * Returns join clauses corresponding to joinable leaf datasources (every leaf except the bottom-leftmost). + */ + public List getPreJoinableClauses() + { + return preJoinableClauses; + } + + /** + * Returns true if all servers have the ability to compute this datasource. These datasources depend only on + * globally broadcast data, like lookups or inline data. + */ + public boolean isGlobal() + { + return dataSource.isGlobal(); + } + + /** + * Returns true if this datasource can be computed by the core Druid query stack via a scan of a concrete base + * datasource. All other datasources involved, if any, must be global. + */ + public boolean isConcreteBased() + { + return baseDataSource.isConcrete() && preJoinableClauses.stream() + .allMatch(clause -> clause.getDataSource().isGlobal()); + } + + /** + * Returns true if this datasource is concrete-based (see {@link #isConcreteBased()}, and the base datasource is a + * 'table' or union of them. This is an important property because it corresponds to datasources that can be handled + * by Druid data servers, like Historicals. + */ + public boolean isConcreteTableBased() + { + // At the time of writing this comment, UnionDataSource children are required to be tables, so the instanceof + // check is redundant. But in the future, we will likely want to support unions of things other than tables, + // so check anyway for future-proofing. + return isConcreteBased() && (baseDataSource instanceof TableDataSource + || (baseDataSource instanceof UnionDataSource && + baseDataSource.getChildren() + .stream() + .allMatch(ds -> ds instanceof TableDataSource))); + } + + /** + * Returns true if this datasource represents a subquery. + */ + public boolean isQuery() + { + return dataSource instanceof QueryDataSource; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DataSourceAnalysis that = (DataSourceAnalysis) o; + return Objects.equals(dataSource, that.dataSource) && + Objects.equals(baseDataSource, that.baseDataSource) && + Objects.equals(baseQuerySegmentSpec, that.baseQuerySegmentSpec) && + Objects.equals(preJoinableClauses, that.preJoinableClauses); + } + + @Override + public int hashCode() + { + return Objects.hash(dataSource, baseDataSource, baseQuerySegmentSpec, preJoinableClauses); + } + + @Override + public String toString() + { + return "DataSourceAnalysis{" + + "dataSource=" + dataSource + + ", baseDataSource=" + baseDataSource + + ", baseQuerySegmentSpec=" + baseQuerySegmentSpec + + ", joinClauses=" + preJoinableClauses + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/planning/PreJoinableClause.java b/processing/src/main/java/org/apache/druid/query/planning/PreJoinableClause.java new file mode 100644 index 00000000000..5ed7f71d561 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/planning/PreJoinableClause.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.planning; + +import com.google.common.base.Preconditions; +import org.apache.druid.query.DataSource; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.JoinType; +import org.apache.druid.segment.join.Joinables; + +import java.util.Objects; + +/** + * Like {@link org.apache.druid.segment.join.JoinableClause}, but contains a {@link DataSource} instead of a + * {@link org.apache.druid.segment.join.Joinable}. This is useful because when analyzing joins, we don't want to + * actually create Joinables, since that can be an expensive operation. + */ +public class PreJoinableClause +{ + private final String prefix; + private final DataSource dataSource; + private final JoinType joinType; + private final JoinConditionAnalysis condition; + + PreJoinableClause( + final String prefix, + final DataSource dataSource, + final JoinType joinType, + final JoinConditionAnalysis condition + ) + { + this.prefix = Joinables.validatePrefix(prefix); + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.joinType = Preconditions.checkNotNull(joinType, "joinType"); + this.condition = Preconditions.checkNotNull(condition, "condition"); + } + + public String getPrefix() + { + return prefix; + } + + public DataSource getDataSource() + { + return dataSource; + } + + public JoinType getJoinType() + { + return joinType; + } + + public JoinConditionAnalysis getCondition() + { + return condition; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PreJoinableClause that = (PreJoinableClause) o; + return Objects.equals(prefix, that.prefix) && + Objects.equals(dataSource, that.dataSource) && + joinType == that.joinType && + Objects.equals(condition, that.condition); + } + + @Override + public int hashCode() + { + return Objects.hash(prefix, dataSource, joinType, condition); + } + + @Override + public String toString() + { + return "JoinClause{" + + "prefix='" + prefix + '\'' + + ", dataSource=" + dataSource + + ", joinType=" + joinType + + ", condition=" + condition + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java index 4f985dbf3f5..5e9bfc39bdd 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java @@ -22,7 +22,6 @@ package org.apache.druid.segment.join; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; -import javax.annotation.Nullable; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -39,9 +38,9 @@ public class JoinableClause private final JoinType joinType; private final JoinConditionAnalysis condition; - public JoinableClause(@Nullable String prefix, Joinable joinable, JoinType joinType, JoinConditionAnalysis condition) + public JoinableClause(String prefix, Joinable joinable, JoinType joinType, JoinConditionAnalysis condition) { - this.prefix = prefix != null ? prefix : ""; + this.prefix = Joinables.validatePrefix(prefix); this.joinable = Preconditions.checkNotNull(joinable, "joinable"); this.joinType = Preconditions.checkNotNull(joinType, "joinType"); this.condition = Preconditions.checkNotNull(condition, "condition"); diff --git a/processing/src/main/java/org/apache/druid/segment/join/Joinables.java b/processing/src/main/java/org/apache/druid/segment/join/Joinables.java new file mode 100644 index 00000000000..6bb95a1a502 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/join/Joinables.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join; + +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.column.ColumnHolder; + +import javax.annotation.Nullable; + +/** + * Utility methods for working with {@link Joinable} related classes. + */ +public class Joinables +{ + /** + * Checks that "prefix" is a valid prefix for a join clause (see {@link JoinableClause#getPrefix()}) and, if so, + * returns it. Otherwise, throws an exception. + */ + public static String validatePrefix(@Nullable final String prefix) + { + if (prefix == null || prefix.isEmpty()) { + throw new IAE("Join clause cannot have null or empty prefix"); + } else if (isPrefixedBy(ColumnHolder.TIME_COLUMN_NAME, prefix) || ColumnHolder.TIME_COLUMN_NAME.equals(prefix)) { + throw new IAE( + "Join clause cannot have prefix[%s], since it would shadow %s", + prefix, + ColumnHolder.TIME_COLUMN_NAME + ); + } else { + return prefix; + } + } + + public static boolean isPrefixedBy(final String columnName, final String prefix) + { + return columnName.startsWith(prefix) && columnName.length() > prefix.length(); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java index 537650881b1..090570db7ac 100644 --- a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java @@ -20,6 +20,7 @@ package org.apache.druid.query; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; @@ -53,7 +54,10 @@ public class DataSourceTest @Test public void testTableDataSource() throws IOException { - DataSource dataSource = JSON_MAPPER.readValue("{\"type\":\"table\", \"name\":\"somedatasource\"}", DataSource.class); + DataSource dataSource = JSON_MAPPER.readValue( + "{\"type\":\"table\", \"name\":\"somedatasource\"}", + DataSource.class + ); Assert.assertEquals(new TableDataSource("somedatasource"), dataSource); } @@ -88,8 +92,8 @@ public class DataSourceTest Lists.newArrayList(((UnionDataSource) dataSource).getDataSources()) ); Assert.assertEquals( - Lists.newArrayList("ds1", "ds2"), - Lists.newArrayList(dataSource.getNames()) + ImmutableSet.of("ds1", "ds2"), + dataSource.getTableNames() ); final DataSource serde = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(dataSource), DataSource.class); diff --git a/processing/src/test/java/org/apache/druid/query/InlineDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/InlineDataSourceTest.java new file mode 100644 index 00000000000..c533ec3f9e1 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/InlineDataSourceTest.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.segment.RowAdapter; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +public class InlineDataSourceTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final AtomicLong iterationCounter = new AtomicLong(); + + private final List rows = ImmutableList.of( + new Object[]{DateTimes.of("2000").getMillis(), "foo", 0d, ImmutableMap.of("n", "0")}, + new Object[]{DateTimes.of("2000").getMillis(), "bar", 1d, ImmutableMap.of("n", "1")}, + new Object[]{DateTimes.of("2000").getMillis(), "baz", 2d, ImmutableMap.of("n", "2")} + ); + + private final Iterable rowsIterable = () -> { + iterationCounter.incrementAndGet(); + return rows.iterator(); + }; + + private final List expectedColumnNames = ImmutableList.of( + ColumnHolder.TIME_COLUMN_NAME, + "str", + "double", + "complex" + ); + + private final List expectedColumnTypes = ImmutableList.of( + ValueType.LONG, + ValueType.STRING, + ValueType.DOUBLE, + ValueType.COMPLEX + ); + + private final InlineDataSource listDataSource = InlineDataSource.fromIterable( + expectedColumnNames, + expectedColumnTypes, + rows + ); + + private final InlineDataSource iterableDataSource = InlineDataSource.fromIterable( + expectedColumnNames, + expectedColumnTypes, + rowsIterable + ); + + @Test + public void test_getTableNames() + { + Assert.assertEquals(Collections.emptySet(), listDataSource.getTableNames()); + Assert.assertEquals(Collections.emptySet(), iterableDataSource.getTableNames()); + } + + @Test + public void test_getColumnNames() + { + Assert.assertEquals(expectedColumnNames, listDataSource.getColumnNames()); + Assert.assertEquals(expectedColumnNames, iterableDataSource.getColumnNames()); + } + + @Test + public void test_getColumnTypes() + { + Assert.assertEquals(expectedColumnTypes, listDataSource.getColumnTypes()); + Assert.assertEquals(expectedColumnTypes, iterableDataSource.getColumnTypes()); + } + + @Test + public void test_getChildren() + { + Assert.assertEquals(Collections.emptyList(), listDataSource.getChildren()); + Assert.assertEquals(Collections.emptyList(), iterableDataSource.getChildren()); + } + + @Test + public void test_getRowSignature() + { + Assert.assertEquals( + ImmutableMap.of( + ColumnHolder.TIME_COLUMN_NAME, ValueType.LONG, + "str", ValueType.STRING, + "double", ValueType.DOUBLE, + "complex", ValueType.COMPLEX + ), + listDataSource.getRowSignature() + ); + } + + @Test + public void test_isCacheable() + { + Assert.assertFalse(listDataSource.isCacheable()); + } + + @Test + public void test_isGlobal() + { + Assert.assertTrue(listDataSource.isGlobal()); + } + + @Test + public void test_isConcrete() + { + Assert.assertFalse(listDataSource.isConcrete()); + } + + @Test + public void test_rowAdapter() + { + final RowAdapter adapter = listDataSource.rowAdapter(); + final Object[] row = rows.get(1); + + Assert.assertEquals(DateTimes.of("2000").getMillis(), adapter.timestampFunction().applyAsLong(row)); + Assert.assertEquals("bar", adapter.columnFunction("str").apply(row)); + Assert.assertEquals(1d, adapter.columnFunction("double").apply(row)); + Assert.assertEquals(ImmutableMap.of("n", "1"), adapter.columnFunction("complex").apply(row)); + } + + @Test + public void test_getRows_list() + { + Assert.assertSame(this.rows, listDataSource.getRowsAsList()); + } + + @Test + public void test_getRows_iterable() + { + final Iterable iterable = iterableDataSource.getRows(); + Assert.assertNotSame(this.rows, iterable); + + // No iteration yet. + Assert.assertEquals(0, iterationCounter.get()); + + assertRowsEqual(this.rows, ImmutableList.copyOf(iterable)); + + // OK, now we've iterated. + Assert.assertEquals(1, iterationCounter.get()); + + // Read again, we should iterate again. + //noinspection MismatchedQueryAndUpdateOfCollection + final List ignored = Lists.newArrayList(iterable); + Assert.assertEquals(2, iterationCounter.get()); + } + + @Test + public void test_getRowsAsList_list() + { + Assert.assertSame(this.rows, listDataSource.getRowsAsList()); + } + + @Test + public void test_getRowsAsList_iterable() + { + final List list = iterableDataSource.getRowsAsList(); + + Assert.assertEquals(1, iterationCounter.get()); + assertRowsEqual(this.rows, list); + + // Read again, we should *not* iterate again (in contrast to "test_getRows_iterable"). + //noinspection MismatchedQueryAndUpdateOfCollection + final List ignored = Lists.newArrayList(list); + Assert.assertEquals(1, iterationCounter.get()); + } + + @Test + public void test_withChildren_empty() + { + Assert.assertSame(listDataSource, listDataSource.withChildren(Collections.emptyList())); + } + + @Test + public void test_withChildren_nonEmpty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Cannot accept children"); + + // Workaround so "withChildren" isn't flagged as unused in the DataSource interface. + ((DataSource) listDataSource).withChildren(ImmutableList.of(new TableDataSource("foo"))); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(InlineDataSource.class) + .usingGetClass() + .withNonnullFields("columnNames", "columnTypes", "rows") + .verify(); + } + + @Test + public void test_toString_iterable() + { + // Verify that toString does not iterate the rows. + final String ignored = iterableDataSource.toString(); + Assert.assertEquals(0, iterationCounter.get()); + } + + @Test + public void test_serde_list() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final InlineDataSource deserialized = (InlineDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(listDataSource), + DataSource.class + ); + + Assert.assertEquals(listDataSource.getColumnNames(), deserialized.getColumnNames()); + Assert.assertEquals(listDataSource.getColumnTypes(), deserialized.getColumnTypes()); + assertRowsEqual(listDataSource.getRows(), deserialized.getRows()); + } + + @Test + public void test_serde_iterable() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final InlineDataSource deserialized = (InlineDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(iterableDataSource), + DataSource.class + ); + + // Lazy iterables turn into Lists upon serialization. + Assert.assertEquals(listDataSource.getColumnNames(), deserialized.getColumnNames()); + Assert.assertEquals(listDataSource.getColumnTypes(), deserialized.getColumnTypes()); + assertRowsEqual(listDataSource.getRows(), deserialized.getRows()); + + // Should have iterated once. + Assert.assertEquals(1, iterationCounter.get()); + } + + /** + * This method exists because "equals" on two equivalent Object[] won't return true, so we need to check + * for equality deeply. + */ + private static void assertRowsEqual(final Iterable expectedRows, final Iterable actualRows) + { + if (expectedRows instanceof List && actualRows instanceof List) { + // Only check equality deeply when both rows1 and rows2 are Lists, i.e., non-lazy. + final List expectedRowsList = (List) expectedRows; + final List actualRowsList = (List) actualRows; + + final int sz = expectedRowsList.size(); + Assert.assertEquals("number of rows", sz, actualRowsList.size()); + + // Super slow for LinkedLists, but we don't expect those to be used here. + // (They're generally forbidden in Druid except for special cases.) + for (int i = 0; i < sz; i++) { + Assert.assertArrayEquals("row #" + i, expectedRowsList.get(i), actualRowsList.get(i)); + } + } else { + // If they're not both Lists, we don't want to iterate them during equality checks, so do a non-deep check. + // This might still return true if whatever class they are has another way of checking equality. But, usually we + // expect this to return false. + Assert.assertEquals("rows", expectedRows, actualRows); + } + } + +} diff --git a/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java new file mode 100644 index 00000000000..e0a990e753a --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.join.JoinType; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; + +public class JoinDataSourceTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final TableDataSource fooTable = new TableDataSource("foo"); + private final TableDataSource barTable = new TableDataSource("bar"); + private final LookupDataSource lookylooLookup = new LookupDataSource("lookyloo"); + + private final JoinDataSource joinTableToLookup = JoinDataSource.create( + fooTable, + lookylooLookup, + "j.", + "x == \"j.x\"", + JoinType.LEFT, + ExprMacroTable.nil() + ); + + private final JoinDataSource joinTableToTable = JoinDataSource.create( + fooTable, + barTable, + "j.", + "x == \"j.x\"", + JoinType.LEFT, + ExprMacroTable.nil() + ); + + @Test + public void test_getTableNames_tableToTable() + { + Assert.assertEquals(ImmutableSet.of("foo", "bar"), joinTableToTable.getTableNames()); + } + + @Test + public void test_getTableNames_tableToLookup() + { + Assert.assertEquals(Collections.singleton("foo"), joinTableToLookup.getTableNames()); + } + + @Test + public void test_getChildren_tableToTable() + { + Assert.assertEquals(ImmutableList.of(fooTable, barTable), joinTableToTable.getChildren()); + } + + @Test + public void test_getChildren_tableToLookup() + { + Assert.assertEquals(ImmutableList.of(fooTable, lookylooLookup), joinTableToLookup.getChildren()); + } + + @Test + public void test_isCacheable_tableToTable() + { + Assert.assertFalse(joinTableToTable.isCacheable()); + } + + @Test + public void test_isCacheable_lookup() + { + Assert.assertFalse(joinTableToLookup.isCacheable()); + } + + @Test + public void test_isConcrete_tableToTable() + { + Assert.assertFalse(joinTableToTable.isConcrete()); + } + + @Test + public void test_isConcrete_tableToLookup() + { + Assert.assertFalse(joinTableToLookup.isConcrete()); + } + + @Test + public void test_isGlobal_tableToTable() + { + Assert.assertFalse(joinTableToTable.isGlobal()); + } + + @Test + public void test_isGlobal_tableToLookup() + { + Assert.assertFalse(joinTableToLookup.isGlobal()); + } + + @Test + public void test_withChildren_empty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Expected [2] children, got [0]"); + + final DataSource ignored = joinTableToTable.withChildren(Collections.emptyList()); + } + + @Test + public void test_withChildren_two() + { + final DataSource transformed = joinTableToTable.withChildren(ImmutableList.of(fooTable, lookylooLookup)); + + Assert.assertEquals(joinTableToLookup, transformed); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(JoinDataSource.class) + .usingGetClass() + .withNonnullFields("left", "right", "rightPrefix", "conditionAnalysis", "joinType") + .verify(); + } + + @Test + public void test_serde() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final JoinDataSource deserialized = (JoinDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(joinTableToLookup), + DataSource.class + ); + + Assert.assertEquals(joinTableToLookup, deserialized); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/LookupDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/LookupDataSourceTest.java new file mode 100644 index 00000000000..c68579ff60a --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/LookupDataSourceTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; + +public class LookupDataSourceTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final LookupDataSource lookylooDataSource = new LookupDataSource("lookyloo"); + + @Test + public void test_getTableNames() + { + Assert.assertEquals(Collections.emptySet(), lookylooDataSource.getTableNames()); + } + + @Test + public void test_getChildren() + { + Assert.assertEquals(Collections.emptyList(), lookylooDataSource.getChildren()); + } + + @Test + public void test_isCacheable() + { + Assert.assertFalse(lookylooDataSource.isCacheable()); + } + + @Test + public void test_isGlobal() + { + Assert.assertTrue(lookylooDataSource.isGlobal()); + } + + @Test + public void test_isConcrete() + { + Assert.assertFalse(lookylooDataSource.isConcrete()); + } + + @Test + public void test_withChildren_empty() + { + Assert.assertSame(lookylooDataSource, lookylooDataSource.withChildren(Collections.emptyList())); + } + + @Test + public void test_withChildren_nonEmpty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Cannot accept children"); + + lookylooDataSource.withChildren(ImmutableList.of(new LookupDataSource("bar"))); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(LookupDataSource.class).usingGetClass().withNonnullFields("lookupName").verify(); + } + + @Test + public void test_serde() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final LookupDataSource deserialized = (LookupDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(lookylooDataSource), + DataSource.class + ); + + Assert.assertEquals(lookylooDataSource, deserialized); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java new file mode 100644 index 00000000000..df8c1f6b564 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; + +public class QueryDataSourceTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final TimeseriesQuery queryOnTable = + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(); + + private final TimeseriesQuery queryOnLookup = + Druids.newTimeseriesQueryBuilder() + .dataSource(new LookupDataSource("lookyloo")) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(); + + private final QueryDataSource queryOnTableDataSource = new QueryDataSource(queryOnTable); + private final QueryDataSource queryOnLookupDataSource = new QueryDataSource(queryOnLookup); + + @Test + public void test_getTableNames_table() + { + Assert.assertEquals(Collections.singleton("foo"), queryOnTableDataSource.getTableNames()); + } + + @Test + public void test_getTableNames_lookup() + { + Assert.assertEquals(Collections.emptySet(), queryOnLookupDataSource.getTableNames()); + } + + @Test + public void test_getChildren_table() + { + Assert.assertEquals(Collections.singletonList(new TableDataSource("foo")), queryOnTableDataSource.getChildren()); + } + + @Test + public void test_getChildren_lookup() + { + Assert.assertEquals( + Collections.singletonList(new LookupDataSource("lookyloo")), + queryOnLookupDataSource.getChildren() + ); + } + + @Test + public void test_isCacheable_table() + { + Assert.assertFalse(queryOnTableDataSource.isCacheable()); + } + + @Test + public void test_isCacheable_lookup() + { + Assert.assertFalse(queryOnLookupDataSource.isCacheable()); + } + + @Test + public void test_isConcrete_table() + { + Assert.assertFalse(queryOnTableDataSource.isConcrete()); + } + + @Test + public void test_isConcrete_lookup() + { + Assert.assertFalse(queryOnLookupDataSource.isConcrete()); + } + + @Test + public void test_isGlobal_table() + { + Assert.assertFalse(queryOnTableDataSource.isGlobal()); + } + + @Test + public void test_isGlobal_lookup() + { + Assert.assertTrue(queryOnLookupDataSource.isGlobal()); + } + + @Test + public void test_withChildren_empty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Must have exactly one child"); + + final DataSource ignored = queryOnLookupDataSource.withChildren(Collections.emptyList()); + } + + @Test + public void test_withChildren_single() + { + final TableDataSource barTable = new TableDataSource("bar"); + + final QueryDataSource transformed = + (QueryDataSource) queryOnLookupDataSource.withChildren(Collections.singletonList(barTable)); + + Assert.assertEquals(barTable, transformed.getQuery().getDataSource()); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(QueryDataSource.class).usingGetClass().withNonnullFields("query").verify(); + } + + @Test + public void test_serde() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final QueryDataSource deserialized = (QueryDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(queryOnTableDataSource), + DataSource.class + ); + + Assert.assertEquals(queryOnTableDataSource, deserialized); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java new file mode 100644 index 00000000000..ef50f3e45d9 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; + +public class TableDataSourceTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final TableDataSource fooDataSource = new TableDataSource("foo"); + + @Test + public void test_getTableNames() + { + Assert.assertEquals(Collections.singleton("foo"), fooDataSource.getTableNames()); + } + + @Test + public void test_getChildren() + { + Assert.assertEquals(Collections.emptyList(), fooDataSource.getChildren()); + } + + @Test + public void test_isCacheable() + { + Assert.assertTrue(fooDataSource.isCacheable()); + } + + @Test + public void test_isGlobal() + { + Assert.assertFalse(fooDataSource.isGlobal()); + } + + @Test + public void test_isConcrete() + { + Assert.assertTrue(fooDataSource.isConcrete()); + } + + @Test + public void test_withChildren_empty() + { + Assert.assertSame(fooDataSource, fooDataSource.withChildren(Collections.emptyList())); + } + + @Test + public void test_withChildren_nonEmpty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Cannot accept children"); + + fooDataSource.withChildren(ImmutableList.of(new TableDataSource("bar"))); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(TableDataSource.class).withNonnullFields("name").verify(); + } + + @Test + public void test_equals_legacy() + { + final LegacyDataSource legacyFoo = new LegacyDataSource("foo"); + final LegacyDataSource legacyBar = new LegacyDataSource("bar"); + + Assert.assertEquals(legacyFoo, fooDataSource); + Assert.assertEquals(fooDataSource, legacyFoo); + + Assert.assertNotEquals(legacyBar, fooDataSource); + Assert.assertNotEquals(fooDataSource, legacyBar); + } + + @Test + public void test_serde() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final TableDataSource deserialized = (TableDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(fooDataSource), + DataSource.class + ); + + Assert.assertEquals(fooDataSource, deserialized); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java new file mode 100644 index 00000000000..117225d890b --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; +import java.util.List; + +public class UnionDataSourceTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final UnionDataSource unionDataSource = new UnionDataSource( + ImmutableList.of( + new TableDataSource("foo"), + new TableDataSource("bar") + ) + ); + + private final UnionDataSource unionDataSourceWithDuplicates = new UnionDataSource( + ImmutableList.of( + new TableDataSource("bar"), + new TableDataSource("foo"), + new TableDataSource("bar") + ) + ); + + @Test + public void test_getTableNames() + { + Assert.assertEquals(ImmutableSet.of("foo", "bar"), unionDataSource.getTableNames()); + } + + @Test + public void test_getTableNames_withDuplicates() + { + Assert.assertEquals(ImmutableSet.of("foo", "bar"), unionDataSourceWithDuplicates.getTableNames()); + } + + @Test + public void test_getChildren() + { + Assert.assertEquals( + ImmutableList.of(new TableDataSource("foo"), new TableDataSource("bar")), + unionDataSource.getChildren() + ); + } + + @Test + public void test_getChildren_withDuplicates() + { + Assert.assertEquals( + ImmutableList.of(new TableDataSource("bar"), new TableDataSource("foo"), new TableDataSource("bar")), + unionDataSourceWithDuplicates.getChildren() + ); + } + + @Test + public void test_isCacheable() + { + Assert.assertFalse(unionDataSource.isCacheable()); + } + + @Test + public void test_isGlobal() + { + Assert.assertFalse(unionDataSource.isGlobal()); + } + + @Test + public void test_isConcrete() + { + Assert.assertTrue(unionDataSource.isConcrete()); + } + + @Test + public void test_withChildren_empty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Expected [2] children, got [0]"); + + unionDataSource.withChildren(Collections.emptyList()); + } + + @Test + public void test_withChildren_sameNumber() + { + final List newDataSources = ImmutableList.of( + new TableDataSource("baz"), + new TableDataSource("qux") + ); + + //noinspection unchecked + Assert.assertEquals( + new UnionDataSource(newDataSources), + unionDataSource.withChildren((List) newDataSources) + ); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(UnionDataSource.class).usingGetClass().withNonnullFields("dataSources").verify(); + } + + @Test + public void test_serde() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final UnionDataSource deserialized = (UnionDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(unionDataSource), + DataSource.class + ); + + Assert.assertEquals(unionDataSource, deserialized); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/UnionQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/UnionQueryRunnerTest.java index 3b9e5e8971f..a9ce7a9471b 100644 --- a/processing/src/test/java/org/apache/druid/query/UnionQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/UnionQueryRunnerTest.java @@ -44,7 +44,7 @@ public class UnionQueryRunnerTest { // verify that table datasource is passed to baseQueryRunner Assert.assertTrue(queryPlus.getQuery().getDataSource() instanceof TableDataSource); - String dsName = Iterables.getOnlyElement(queryPlus.getQuery().getDataSource().getNames()); + String dsName = Iterables.getOnlyElement(queryPlus.getQuery().getDataSource().getTableNames()); if ("ds1".equals(dsName)) { ds1.compareAndSet(false, true); return Sequences.simple(Arrays.asList(1, 2, 3)); diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java index 5b254ea18eb..5f044ec339f 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java @@ -917,7 +917,7 @@ public class SegmentMetadataQueryTest Query query = MAPPER.readValue(queryStr, Query.class); Assert.assertTrue(query instanceof SegmentMetadataQuery); - Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames())); + Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getTableNames())); Assert.assertEquals( Intervals.of("2013-12-04T00:00:00.000Z/2013-12-05T00:00:00.000Z"), query.getIntervals().get(0) @@ -937,7 +937,7 @@ public class SegmentMetadataQueryTest + "}"; Query query = MAPPER.readValue(queryStr, Query.class); Assert.assertTrue(query instanceof SegmentMetadataQuery); - Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames())); + Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getTableNames())); Assert.assertEquals(Intervals.ETERNITY, query.getIntervals().get(0)); Assert.assertTrue(((SegmentMetadataQuery) query).isUsingDefaultInterval()); diff --git a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java new file mode 100644 index 00000000000..1b24c5e3898 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.planning; + +import com.google.common.collect.ImmutableList; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.JoinDataSource; +import org.apache.druid.query.LookupDataSource; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.JoinType; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class DataSourceAnalysisTest +{ + private static final List MILLENIUM_INTERVALS = ImmutableList.of(Intervals.of("2000/3000")); + private static final TableDataSource TABLE_FOO = new TableDataSource("foo"); + private static final TableDataSource TABLE_BAR = new TableDataSource("bar"); + private static final LookupDataSource LOOKUP_LOOKYLOO = new LookupDataSource("lookyloo"); + private static final InlineDataSource INLINE = InlineDataSource.fromIterable( + ImmutableList.of("column"), + ImmutableList.of(ValueType.STRING), + ImmutableList.of(new Object[0]) + ); + + @Test + public void testTable() + { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(TABLE_FOO); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(TABLE_FOO, analysis.getDataSource()); + Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testUnion() + { + final UnionDataSource unionDataSource = new UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR)); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(unionDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(unionDataSource, analysis.getDataSource()); + Assert.assertEquals(unionDataSource, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testQueryOnTable() + { + final QueryDataSource queryDataSource = subquery(TABLE_FOO); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertTrue(analysis.isQuery()); + Assert.assertEquals(queryDataSource, analysis.getDataSource()); + Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals( + Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)), + analysis.getBaseQuerySegmentSpec() + ); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testQueryOnUnion() + { + final UnionDataSource unionDataSource = new UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR)); + final QueryDataSource queryDataSource = subquery(unionDataSource); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertTrue(analysis.isQuery()); + Assert.assertEquals(queryDataSource, analysis.getDataSource()); + Assert.assertEquals(unionDataSource, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals( + Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)), + analysis.getBaseQuerySegmentSpec() + ); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testLookup() + { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(LOOKUP_LOOKYLOO); + + Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertFalse(analysis.isConcreteTableBased()); + Assert.assertTrue(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getDataSource()); + Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testQueryOnLookup() + { + final QueryDataSource queryDataSource = subquery(LOOKUP_LOOKYLOO); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource); + + Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertFalse(analysis.isConcreteTableBased()); + Assert.assertTrue(analysis.isGlobal()); + Assert.assertTrue(analysis.isQuery()); + Assert.assertEquals(queryDataSource, analysis.getDataSource()); + Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals( + Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)), + analysis.getBaseQuerySegmentSpec() + ); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testInline() + { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(INLINE); + + Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertFalse(analysis.isConcreteTableBased()); + Assert.assertTrue(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(INLINE, analysis.getDataSource()); + Assert.assertEquals(INLINE, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testJoinSimpleLeftLeaning() + { + // Join of a table onto a variety of simple joinable objects (lookup, inline, subquery) with a left-leaning + // structure (no right children are joins themselves). + + final JoinDataSource joinDataSource = + join( + join( + join( + TABLE_FOO, + LOOKUP_LOOKYLOO, + "1.", + JoinType.INNER + ), + INLINE, + "2.", + JoinType.LEFT + ), + subquery(LOOKUP_LOOKYLOO), + "3.", + JoinType.FULL + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(joinDataSource, analysis.getDataSource()); + Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("1.", LOOKUP_LOOKYLOO, JoinType.INNER, joinClause("1.")), + new PreJoinableClause("2.", INLINE, JoinType.LEFT, joinClause("2.")), + new PreJoinableClause("3.", subquery(LOOKUP_LOOKYLOO), JoinType.FULL, joinClause("3.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testJoinSimpleRightLeaning() + { + // Join of a table onto a variety of simple joinable objects (lookup, inline, subquery) with a right-leaning + // structure (no left children are joins themselves). + // + // Note that unlike the left-leaning stack, which is fully flattened, this one will not get flattened at all. + + final JoinDataSource rightLeaningJoinStack = + join( + LOOKUP_LOOKYLOO, + join( + INLINE, + subquery(LOOKUP_LOOKYLOO), + "1.", + JoinType.LEFT + ), + "2.", + JoinType.FULL + ); + + final JoinDataSource joinDataSource = + join( + TABLE_FOO, + rightLeaningJoinStack, + "3.", + JoinType.RIGHT + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(joinDataSource, analysis.getDataSource()); + Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("3.", rightLeaningJoinStack, JoinType.RIGHT, joinClause("3.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testJoinOverTableSubquery() + { + final JoinDataSource joinDataSource = join( + TABLE_FOO, + subquery(TABLE_FOO), + "1.", + JoinType.INNER + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); + + Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertFalse(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(joinDataSource, analysis.getDataSource()); + Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("1.", subquery(TABLE_FOO), JoinType.INNER, joinClause("1.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testJoinTableUnionToLookup() + { + final UnionDataSource unionDataSource = new UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR)); + final JoinDataSource joinDataSource = join( + unionDataSource, + LOOKUP_LOOKYLOO, + "1.", + JoinType.INNER + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(joinDataSource, analysis.getDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(unionDataSource, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("1.", LOOKUP_LOOKYLOO, JoinType.INNER, joinClause("1.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testJoinUnderTopLevelSubqueries() + { + final QueryDataSource queryDataSource = + subquery( + subquery( + join( + TABLE_FOO, + LOOKUP_LOOKYLOO, + "1.", + JoinType.INNER + ) + ) + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertTrue(analysis.isQuery()); + Assert.assertEquals(queryDataSource, analysis.getDataSource()); + Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals( + Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)), + analysis.getBaseQuerySegmentSpec() + ); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("1.", LOOKUP_LOOKYLOO, JoinType.INNER, joinClause("1.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testJoinLookupToLookup() + { + final JoinDataSource joinDataSource = join( + LOOKUP_LOOKYLOO, + LOOKUP_LOOKYLOO, + "1.", + JoinType.INNER + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); + + Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertFalse(analysis.isConcreteTableBased()); + Assert.assertTrue(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(joinDataSource, analysis.getDataSource()); + Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("1.", LOOKUP_LOOKYLOO, JoinType.INNER, joinClause("1.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testJoinLookupToTable() + { + final JoinDataSource joinDataSource = join( + LOOKUP_LOOKYLOO, + TABLE_FOO, + "1.", + JoinType.INNER + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); + + Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertFalse(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(joinDataSource, analysis.getDataSource()); + Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("1.", TABLE_FOO, JoinType.INNER, joinClause("1.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(DataSourceAnalysis.class) + .usingGetClass() + .withNonnullFields("dataSource", "baseDataSource", "preJoinableClauses") + .verify(); + } + + /** + * Generate a datasource that joins on a column named "x" on both sides. + */ + private static JoinDataSource join( + final DataSource left, + final DataSource right, + final String rightPrefix, + final JoinType joinType + ) + { + return JoinDataSource.create( + left, + right, + rightPrefix, + joinClause(rightPrefix).getOriginalExpression(), + joinType, + ExprMacroTable.nil() + ); + } + + /** + * Generate a join clause that joins on a column named "x" on both sides. + */ + private static JoinConditionAnalysis joinClause( + final String rightPrefix + ) + { + return JoinConditionAnalysis.forExpression( + StringUtils.format("x == \"%sx\"", rightPrefix), + rightPrefix, + ExprMacroTable.nil() + ); + } + + /** + * Generate a datasource that does a subquery on another datasource. The specific kind of query doesn't matter + * much for the purpose of this test class, so it's always the same. + */ + private static QueryDataSource subquery(final DataSource dataSource) + { + return new QueryDataSource( + GroupByQuery.builder() + .setDataSource(dataSource) + .setInterval(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)) + .setGranularity(Granularities.ALL) + .build() + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/planning/PreJoinableClauseTest.java b/processing/src/test/java/org/apache/druid/query/planning/PreJoinableClauseTest.java new file mode 100644 index 00000000000..80762a758c8 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/planning/PreJoinableClauseTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.planning; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.JoinType; +import org.junit.Assert; +import org.junit.Test; + +public class PreJoinableClauseTest +{ + private final PreJoinableClause clause = new PreJoinableClause( + "j.", + new TableDataSource("foo"), + JoinType.LEFT, + JoinConditionAnalysis.forExpression("x == \"j.x\"", "j.", ExprMacroTable.nil()) + ); + + @Test + public void test_getPrefix() + { + Assert.assertEquals("j.", clause.getPrefix()); + } + + @Test + public void test_getJoinType() + { + Assert.assertEquals(JoinType.LEFT, clause.getJoinType()); + } + + @Test + public void test_getCondition() + { + Assert.assertEquals("x == \"j.x\"", clause.getCondition().getOriginalExpression()); + } + + @Test + public void test_getDataSource() + { + Assert.assertEquals(new TableDataSource("foo"), clause.getDataSource()); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(PreJoinableClause.class) + .usingGetClass() + .withNonnullFields("prefix", "dataSource", "joinType", "condition") + .verify(); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java new file mode 100644 index 00000000000..ae3f845529e --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join; + +import org.apache.druid.segment.column.ColumnHolder; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class JoinablesTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void test_validatePrefix_null() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Join clause cannot have null or empty prefix"); + + Joinables.validatePrefix(null); + } + + @Test + public void test_validatePrefix_empty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Join clause cannot have null or empty prefix"); + + Joinables.validatePrefix(""); + } + + @Test + public void test_validatePrefix_underscore() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Join clause cannot have prefix[_]"); + + Joinables.validatePrefix("_"); + } + + @Test + public void test_validatePrefix_timeColumn() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Join clause cannot have prefix[__time]"); + + Joinables.validatePrefix(ColumnHolder.TIME_COLUMN_NAME); + } + + @Test + public void test_isPrefixedBy() + { + Assert.assertTrue(Joinables.isPrefixedBy("foo", "")); + Assert.assertTrue(Joinables.isPrefixedBy("foo", "f")); + Assert.assertTrue(Joinables.isPrefixedBy("foo", "fo")); + Assert.assertFalse(Joinables.isPrefixedBy("foo", "foo")); + } +} diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index d3f0a667230..4b365b7ce89 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -59,6 +59,7 @@ import java.util.function.Function; import java.util.stream.Collectors; /** + * */ @ManageLifecycle public class BrokerServerView implements TimelineServerView @@ -293,7 +294,7 @@ public class BrokerServerView implements TimelineServerView @Override public VersionedIntervalTimeline getTimeline(DataSource dataSource) { - String table = Iterables.getOnlyElement(dataSource.getNames()); + String table = Iterables.getOnlyElement(dataSource.getTableNames()); synchronized (lock) { return timelines.get(table); } diff --git a/server/src/main/java/org/apache/druid/client/CacheUtil.java b/server/src/main/java/org/apache/druid/client/CacheUtil.java index d3d9183e1c1..242bece98c1 100644 --- a/server/src/main/java/org/apache/druid/client/CacheUtil.java +++ b/server/src/main/java/org/apache/druid/client/CacheUtil.java @@ -25,13 +25,52 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.nio.ByteBuffer; public class CacheUtil { + public enum ServerType + { + BROKER { + @Override + boolean willMergeRunners() + { + return false; + } + }, + DATA { + @Override + boolean willMergeRunners() + { + return true; + } + }; + + /** + * Same meaning as the "willMergeRunners" parameter to {@link CacheStrategy#isCacheable}. + */ + abstract boolean willMergeRunners(); + } + + public static Cache.NamedKey computeResultLevelCacheKey(String resultLevelCacheIdentifier) + { + return new Cache.NamedKey(resultLevelCacheIdentifier, StringUtils.toUtf8(resultLevelCacheIdentifier)); + } + + public static void populateResultCache( + Cache cache, + Cache.NamedKey key, + byte[] resultBytes + ) + { + cache.put(key, resultBytes); + } + public static Cache.NamedKey computeSegmentCacheKey( String segmentId, SegmentDescriptor descriptor, @@ -54,64 +93,105 @@ public class CacheUtil ); } - public static boolean useCacheOnBrokers( + /** + * Returns whether the segment-level cache should be checked for a particular query. + * + * @param query the query to check + * @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query + * @param cacheConfig current active cache config + * @param serverType BROKER or DATA + */ + public static boolean isUseSegmentCache( Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig + @Nullable CacheStrategy> cacheStrategy, + CacheConfig cacheConfig, + ServerType serverType ) { - return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + && QueryContexts.isUseCache(query) + && cacheConfig.isUseCache(); } - public static boolean populateCacheOnBrokers( + /** + * Returns whether the result-level cache should be populated for a particular query. + * + * @param query the query to check + * @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query + * @param cacheConfig current active cache config + * @param serverType BROKER or DATA + */ + public static boolean isPopulateSegmentCache( Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig + @Nullable CacheStrategy> cacheStrategy, + CacheConfig cacheConfig, + ServerType serverType ) { - return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + && QueryContexts.isPopulateCache(query) + && cacheConfig.isPopulateCache(); } - public static boolean useCacheOnDataNodes( + /** + * Returns whether the result-level cache should be checked for a particular query. + * + * @param query the query to check + * @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query + * @param cacheConfig current active cache config + * @param serverType BROKER or DATA + */ + public static boolean isUseResultCache( Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig + @Nullable CacheStrategy> cacheStrategy, + CacheConfig cacheConfig, + ServerType serverType ) { - return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true); + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + && QueryContexts.isUseResultLevelCache(query) + && cacheConfig.isUseResultLevelCache(); } - public static boolean populateCacheOnDataNodes( + /** + * Returns whether the result-level cache should be populated for a particular query. + * + * @param query the query to check + * @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query + * @param cacheConfig current active cache config + * @param serverType BROKER or DATA + */ + public static boolean isPopulateResultCache( Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig + @Nullable CacheStrategy> cacheStrategy, + CacheConfig cacheConfig, + ServerType serverType ) { - return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true); + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + && QueryContexts.isPopulateResultLevelCache(query) + && cacheConfig.isPopulateResultLevelCache(); } - private static boolean useCache( - Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig + /** + * Returns whether a particular query is cacheable. Does not check whether we are actually configured to use or + * populate the cache; that should be done separately. + * + * @param query the query to check + * @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query + * @param cacheConfig current active cache config + * @param serverType BROKER or DATA + */ + static boolean isQueryCacheable( + final Query query, + @Nullable final CacheStrategy> cacheStrategy, + final CacheConfig cacheConfig, + final ServerType serverType ) { - return QueryContexts.isUseCache(query) - && strategy != null - && cacheConfig.isUseCache() - && cacheConfig.isQueryCacheable(query); + return cacheStrategy != null + && cacheStrategy.isCacheable(query, serverType.willMergeRunners()) + && cacheConfig.isQueryCacheable(query) + && query.getDataSource().isCacheable(); } - - private static boolean populateCache( - Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig - ) - { - return QueryContexts.isPopulateCache(query) - && strategy != null - && cacheConfig.isPopulateCache() - && cacheConfig.isQueryCacheable(query); - } - } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 414c0541b64..9fbc354109c 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -241,8 +241,8 @@ public class CachingClusteredClient implements QuerySegmentWalker this.toolChest = warehouse.getToolChest(query); this.strategy = toolChest.getCacheStrategy(query); - this.useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig); - this.populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig); + this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER); + this.populateCache = CacheUtil.isPopulateSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER); this.isBySegment = QueryContexts.isBySegment(query); // Note that enabling this leads to putting uncovered intervals information in the response headers // and might blow up in some cases https://github.com/apache/druid/issues/2108 diff --git a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java index 13011396367..f69c413fdf0 100644 --- a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java @@ -77,8 +77,13 @@ public class CachingQueryRunner implements QueryRunner { Query query = queryPlus.getQuery(); final CacheStrategy strategy = toolChest.getCacheStrategy(query); - final boolean populateCache = CacheUtil.populateCacheOnDataNodes(query, strategy, cacheConfig); - final boolean useCache = CacheUtil.useCacheOnDataNodes(query, strategy, cacheConfig); + final boolean populateCache = CacheUtil.isPopulateSegmentCache( + query, + strategy, + cacheConfig, + CacheUtil.ServerType.DATA + ); + final boolean useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.DATA); final Cache.NamedKey key; if (strategy != null && (useCache || populateCache)) { diff --git a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java index 1f1d801d9e3..2517a8f0e9b 100644 --- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java +++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java @@ -194,7 +194,7 @@ public class CoordinatorServerView implements InventoryView public VersionedIntervalTimeline getTimeline(DataSource dataSource) { - String table = Iterables.getOnlyElement(dataSource.getNames()); + String table = Iterables.getOnlyElement(dataSource.getTableNames()); synchronized (lock) { return timelines.get(table); } diff --git a/server/src/main/java/org/apache/druid/client/ResultLevelCacheUtil.java b/server/src/main/java/org/apache/druid/client/ResultLevelCacheUtil.java deleted file mode 100644 index 7ca9dc19917..00000000000 --- a/server/src/main/java/org/apache/druid/client/ResultLevelCacheUtil.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.client; - -import org.apache.druid.client.cache.Cache; -import org.apache.druid.client.cache.CacheConfig; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.CacheStrategy; -import org.apache.druid.query.Query; -import org.apache.druid.query.QueryContexts; - -public class ResultLevelCacheUtil -{ - private static final Logger log = new Logger(ResultLevelCacheUtil.class); - - public static Cache.NamedKey computeResultLevelCacheKey(String resultLevelCacheIdentifier) - { - return new Cache.NamedKey(resultLevelCacheIdentifier, StringUtils.toUtf8(resultLevelCacheIdentifier)); - } - - public static void populate( - Cache cache, - Cache.NamedKey key, - byte[] resultBytes - ) - { - log.debug("Populating results into cache"); - cache.put(key, resultBytes); - } - - public static boolean useResultLevelCacheOnBrokers( - Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig - ) - { - return useResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); - } - - public static boolean populateResultLevelCacheOnBrokers( - Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig - ) - { - return populateResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); - } - - private static boolean useResultLevelCache( - Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig - ) - { - return QueryContexts.isUseResultLevelCache(query) - && strategy != null - && cacheConfig.isUseResultLevelCache() - && cacheConfig.isQueryCacheable(query); - } - - private static boolean populateResultLevelCache( - Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig - ) - { - return QueryContexts.isPopulateResultLevelCache(query) - && strategy != null - && cacheConfig.isPopulateResultLevelCache() - && cacheConfig.isQueryCacheable(query); - } -} diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index 0a7407fa52f..93a7d13f41a 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -25,7 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import org.apache.druid.client.ResultLevelCacheUtil; +import org.apache.druid.client.CacheUtil; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.java.util.common.RE; @@ -71,8 +71,13 @@ public class ResultLevelCachingQueryRunner implements QueryRunner this.cacheConfig = cacheConfig; this.query = query; this.strategy = queryToolChest.getCacheStrategy(query); - this.populateResultCache = ResultLevelCacheUtil.populateResultLevelCacheOnBrokers(query, strategy, cacheConfig); - this.useResultCache = ResultLevelCacheUtil.useResultLevelCacheOnBrokers(query, strategy, cacheConfig); + this.populateResultCache = CacheUtil.isPopulateResultCache( + query, + strategy, + cacheConfig, + CacheUtil.ServerType.BROKER + ); + this.useResultCache = CacheUtil.isUseResultCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER); } @Override @@ -162,7 +167,7 @@ public class ResultLevelCachingQueryRunner implements QueryRunner ) { if (useResultCache && queryCacheKey != null) { - return cache.get(ResultLevelCacheUtil.computeResultLevelCacheKey(queryCacheKey)); + return cache.get(CacheUtil.computeResultLevelCacheKey(queryCacheKey)); } return null; } @@ -216,7 +221,7 @@ public class ResultLevelCachingQueryRunner implements QueryRunner ResultLevelCachePopulator resultLevelCachePopulator = new ResultLevelCachePopulator( cache, objectMapper, - ResultLevelCacheUtil.computeResultLevelCacheKey(cacheKeyStr), + CacheUtil.computeResultLevelCacheKey(cacheKeyStr), cacheConfig, true ); @@ -292,7 +297,7 @@ public class ResultLevelCachingQueryRunner implements QueryRunner public void populateResults() { - ResultLevelCacheUtil.populate( + CacheUtil.populateResultCache( cache, key, Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream").toByteArray() diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 6c528792018..451e079b26b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -53,6 +53,7 @@ import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SinkQueryRunners; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.Segment; @@ -168,6 +169,12 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker return new NoopQueryRunner<>(); } + // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + if (!analysis.getPreJoinableClauses().isEmpty()) { + throw new ISE("Cannot handle join dataSource"); + } + final QueryRunnerFactory> factory = conglomerate.findFactory(query); if (factory == null) { throw new ISE("Unknown query type[%s].", query.getClass()); diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 29a173ccf98..ddc15aedef0 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -25,6 +25,7 @@ import com.google.inject.Inject; import org.apache.druid.client.CachingClusteredClient; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.FluentQueryRunnerBuilder; import org.apache.druid.query.PostProcessingOperator; @@ -37,10 +38,12 @@ import org.apache.druid.query.ResultLevelCachingQueryRunner; import org.apache.druid.query.RetryQueryRunner; import org.apache.druid.query.RetryQueryRunnerConfig; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.initialization.ServerConfig; import org.joda.time.Interval; /** + * */ public class ClientQuerySegmentWalker implements QuerySegmentWalker { @@ -79,12 +82,24 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { + // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + if (!analysis.getPreJoinableClauses().isEmpty()) { + throw new ISE("Cannot handle join dataSource"); + } + return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals)); } @Override public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) { + // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + if (!analysis.getPreJoinableClauses().isEmpty()) { + throw new ISE("Cannot handle join dataSource"); + } + return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs)); } @@ -93,12 +108,14 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker QueryToolChest> toolChest = warehouse.getToolChest(query); // This does not adhere to the fluent workflow. See https://github.com/apache/druid/issues/5517 - return new ResultLevelCachingQueryRunner<>(makeRunner(query, baseClientRunner, toolChest), - toolChest, - query, - objectMapper, - cache, - cacheConfig); + return new ResultLevelCachingQueryRunner<>( + makeRunner(query, baseClientRunner, toolChest), + toolChest, + query, + objectMapper, + cache, + cacheConfig + ); } private QueryRunner makeRunner( diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java index 87508107d19..6e876c4d092 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java +++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java @@ -189,7 +189,7 @@ public class QueryLifecycle AuthorizationUtils.authorizeAllResourceActions( authenticationResult, Iterables.transform( - baseQuery.getDataSource().getNames(), + baseQuery.getDataSource().getTableNames(), AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR ), authorizerMapper @@ -213,7 +213,7 @@ public class QueryLifecycle AuthorizationUtils.authorizeAllResourceActions( req, Iterables.transform( - baseQuery.getDataSource().getNames(), + baseQuery.getDataSource().getTableNames(), AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR ), authorizerMapper diff --git a/server/src/main/java/org/apache/druid/server/QueryManager.java b/server/src/main/java/org/apache/druid/server/QueryManager.java index a90bf4b077d..0fd1807d86e 100644 --- a/server/src/main/java/org/apache/druid/server/QueryManager.java +++ b/server/src/main/java/org/apache/druid/server/QueryManager.java @@ -27,7 +27,6 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.query.Query; import org.apache.druid.query.QueryWatcher; -import java.util.List; import java.util.Set; public class QueryManager implements QueryWatcher @@ -61,7 +60,7 @@ public class QueryManager implements QueryWatcher public void registerQuery(Query query, final ListenableFuture future) { final String id = query.getId(); - final List datasources = query.getDataSource().getNames(); + final Set datasources = query.getDataSource().getTableNames(); queries.put(id, future); queryDatasources.putAll(id, datasources); future.addListener( diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index 78100aa153a..e2fb263def4 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -190,7 +190,7 @@ public class QueryResource implements QueryCountStatsProvider "%s[%s_%s_%s]", currThreadName, query.getType(), - query.getDataSource().getNames(), + query.getDataSource().getTableNames(), queryId ); diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index 98e6f3e814a..41c71160e5e 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -53,6 +53,7 @@ import org.apache.druid.query.ReferenceCountingSegmentQueryRunner; import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ReferenceCountingSegment; @@ -71,8 +72,6 @@ import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; -/** - */ public class ServerManager implements QuerySegmentWalker { private static final EmittingLogger log = new EmittingLogger(ServerManager.class); @@ -128,6 +127,12 @@ public class ServerManager implements QuerySegmentWalker throw new ISE("Unknown query type[%s].", query.getClass()); } + // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + if (!analysis.getPreJoinableClauses().isEmpty()) { + throw new ISE("Cannot handle join dataSource"); + } + final QueryToolChest> toolChest = factory.getToolchest(); final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); @@ -162,8 +167,7 @@ public class ServerManager implements QuerySegmentWalker { @Override public Iterable> apply( - @Nullable - final TimelineObjectHolder holder + @Nullable final TimelineObjectHolder holder ) { if (holder == null) { @@ -210,7 +214,7 @@ public class ServerManager implements QuerySegmentWalker private String getDataSourceName(DataSource dataSource) { - return Iterables.getOnlyElement(dataSource.getNames()); + return Iterables.getOnlyElement(dataSource.getTableNames()); } @Override @@ -224,6 +228,12 @@ public class ServerManager implements QuerySegmentWalker return new NoopQueryRunner(); } + // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + if (!analysis.getPreJoinableClauses().isEmpty()) { + throw new ISE("Cannot handle join dataSource"); + } + final QueryToolChest> toolChest = factory.getToolchest(); String dataSourceName = getDataSourceName(query.getDataSource()); diff --git a/server/src/main/java/org/apache/druid/server/log/LoggingRequestLogger.java b/server/src/main/java/org/apache/druid/server/log/LoggingRequestLogger.java index a0af4cb2fa1..bcb628a195b 100644 --- a/server/src/main/java/org/apache/druid/server/log/LoggingRequestLogger.java +++ b/server/src/main/java/org/apache/druid/server/log/LoggingRequestLogger.java @@ -20,21 +20,16 @@ package org.apache.druid.server.log; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.DataSource; import org.apache.druid.query.Query; -import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; -import org.apache.druid.query.UnionDataSource; import org.apache.druid.server.RequestLogLine; import org.slf4j.MDC; import java.io.IOException; import java.util.Map; -import java.util.stream.Collectors; public class LoggingRequestLogger implements RequestLogger { @@ -66,7 +61,7 @@ public class LoggingRequestLogger implements RequestLogger final Query query = requestLogLine.getQuery(); MDC.put("queryId", query.getId()); MDC.put("sqlQueryId", StringUtils.nullToEmptyNonDruidDataString(query.getSqlQueryId())); - MDC.put("dataSource", findInnerDatasource(query).toString()); + MDC.put("dataSource", String.join(",", query.getDataSource().getTableNames())); MDC.put("queryType", query.getType()); MDC.put("isNested", String.valueOf(!(query.getDataSource() instanceof TableDataSource))); MDC.put("hasFilters", Boolean.toString(query.hasFilters())); @@ -119,30 +114,6 @@ public class LoggingRequestLogger implements RequestLogger return setContextMDC; } - private Object findInnerDatasource(Query query) - { - DataSource _ds = query.getDataSource(); - if (_ds instanceof TableDataSource) { - return ((TableDataSource) _ds).getName(); - } - if (_ds instanceof QueryDataSource) { - return findInnerDatasource(((QueryDataSource) _ds).getQuery()); - } - if (_ds instanceof UnionDataSource) { - return Joiner.on(",") - .join( - ((UnionDataSource) _ds) - .getDataSources() - .stream() - .map(TableDataSource::getName) - .collect(Collectors.toList()) - ); - } else { - // should not come here - return query.getDataSource(); - } - } - @Override public String toString() { diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java index 96473e1614e..08ce78ff0fb 100644 --- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java +++ b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java @@ -201,7 +201,7 @@ public class TieredBrokerHostSelector if (brokerServiceName == null) { // For Union Queries tier will be selected on the rules for first dataSource. - List rules = ruleManager.getRulesWithDefault(Iterables.getFirst(query.getDataSource().getNames(), null)); + List rules = ruleManager.getRulesWithDefault(Iterables.getFirst(query.getDataSource().getTableNames(), null)); // find the rule that can apply to the entire set of intervals DateTime now = DateTimes.nowUtc(); diff --git a/server/src/test/java/org/apache/druid/client/CacheUtilTest.java b/server/src/test/java/org/apache/druid/client/CacheUtilTest.java new file mode 100644 index 00000000000..e7a512046fa --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/CacheUtilTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.Druids; +import org.apache.druid.query.LookupDataSource; +import org.apache.druid.query.Query; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class CacheUtilTest +{ + private final TimeseriesQuery timeseriesQuery = + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(); + + @Test + public void test_isQueryCacheable_cacheableOnBroker() + { + Assert.assertTrue( + CacheUtil.isQueryCacheable( + timeseriesQuery, + new DummyCacheStrategy<>(true, true), + makeCacheConfig(ImmutableMap.of()), + CacheUtil.ServerType.BROKER + ) + ); + } + + @Test + public void test_isQueryCacheable_cacheableOnDataServer() + { + Assert.assertTrue( + CacheUtil.isQueryCacheable( + timeseriesQuery, + new DummyCacheStrategy<>(true, true), + makeCacheConfig(ImmutableMap.of()), + CacheUtil.ServerType.DATA + ) + ); + } + + @Test + public void test_isQueryCacheable_unCacheableOnBroker() + { + Assert.assertFalse( + CacheUtil.isQueryCacheable( + timeseriesQuery, + new DummyCacheStrategy<>(false, true), + makeCacheConfig(ImmutableMap.of()), + CacheUtil.ServerType.BROKER + ) + ); + } + + @Test + public void test_isQueryCacheable_unCacheableOnDataServer() + { + Assert.assertFalse( + CacheUtil.isQueryCacheable( + timeseriesQuery, + new DummyCacheStrategy<>(true, false), + makeCacheConfig(ImmutableMap.of()), + CacheUtil.ServerType.DATA + ) + ); + } + + @Test + public void test_isQueryCacheable_unCacheableType() + { + Assert.assertFalse( + CacheUtil.isQueryCacheable( + timeseriesQuery, + new DummyCacheStrategy<>(true, false), + makeCacheConfig(ImmutableMap.of("unCacheable", ImmutableList.of("timeseries"))), + CacheUtil.ServerType.BROKER + ) + ); + } + + @Test + public void test_isQueryCacheable_unCacheableDataSource() + { + Assert.assertFalse( + CacheUtil.isQueryCacheable( + timeseriesQuery.withDataSource(new LookupDataSource("lookyloo")), + new DummyCacheStrategy<>(true, true), + makeCacheConfig(ImmutableMap.of()), + CacheUtil.ServerType.BROKER + ) + ); + } + + @Test + public void test_isQueryCacheable_nullCacheStrategy() + { + Assert.assertFalse( + CacheUtil.isQueryCacheable( + timeseriesQuery, + null, + makeCacheConfig(ImmutableMap.of()), + CacheUtil.ServerType.BROKER + ) + ); + } + + private static CacheConfig makeCacheConfig(final Map properties) + { + return TestHelper.makeJsonMapper().convertValue(properties, CacheConfig.class); + } + + private static class DummyCacheStrategy> + implements CacheStrategy + { + private final boolean cacheableOnBrokers; + private final boolean cacheableOnDataServers; + + public DummyCacheStrategy(boolean cacheableOnBrokers, boolean cacheableOnDataServers) + { + this.cacheableOnBrokers = cacheableOnBrokers; + this.cacheableOnDataServers = cacheableOnDataServers; + } + + @Override + public boolean isCacheable(QueryType query, boolean willMergeRunners) + { + return willMergeRunners ? cacheableOnDataServers : cacheableOnBrokers; + } + + @Override + public byte[] computeCacheKey(QueryType query) + { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] computeResultLevelCacheKey(QueryType query) + { + throw new UnsupportedOperationException(); + } + + @Override + public TypeReference getCacheObjectClazz() + { + throw new UnsupportedOperationException(); + } + + @Override + public Function prepareForCache(boolean isResultLevelCache) + { + throw new UnsupportedOperationException(); + } + + @Override + public Function pullFromCache(boolean isResultLevelCache) + { + throw new UnsupportedOperationException(); + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java index a1b1910a2ef..934361a8b40 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java @@ -41,6 +41,7 @@ import org.apache.druid.sql.calcite.table.RowSignature; import javax.annotation.Nullable; import java.util.List; +import java.util.Set; /** * DruidRel that uses a "query" dataSource. @@ -199,9 +200,9 @@ public class DruidOuterQueryRel extends DruidRel } @Override - public List getDataSourceNames() + public Set getDataSourceNames() { - return ((DruidRel) sourceRel).getDataSourceNames(); + return ((DruidRel) sourceRel).getDataSourceNames(); } @Override diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java index 3deb8e41553..fed6d886a6f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java @@ -35,7 +35,7 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.sql.calcite.table.DruidTable; import javax.annotation.Nonnull; -import java.util.List; +import java.util.Set; /** * DruidRel that uses a "table" dataSource. @@ -123,9 +123,9 @@ public class DruidQueryRel extends DruidRel } @Override - public List getDataSourceNames() + public Set getDataSourceNames() { - return druidTable.getDataSource().getNames(); + return druidTable.getDataSource().getTableNames(); } @Override diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java index c5549ddb1a1..9f56e2f5814 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java @@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; -import java.util.List; +import java.util.Set; public abstract class DruidRel extends AbstractRelNode { @@ -110,7 +110,7 @@ public abstract class DruidRel extends AbstractRelNode public abstract T asDruidConvention(); /** - * Get a list of names of datasources read by this DruidRel + * Get the set of names of table datasources read by this DruidRel */ - public abstract List getDataSourceNames(); + public abstract Set getDataSourceNames(); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidSemiJoin.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidSemiJoin.java index 6248a360dfd..e31437b6962 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidSemiJoin.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidSemiJoin.java @@ -171,13 +171,13 @@ public class DruidSemiJoin extends DruidRel } @Override - public List getDataSourceNames() + public Set getDataSourceNames() { final DruidRel druidRight = (DruidRel) this.right; - Set datasourceNames = new LinkedHashSet<>(); - datasourceNames.addAll(left.getDataSourceNames()); - datasourceNames.addAll(druidRight.getDataSourceNames()); - return new ArrayList<>(datasourceNames); + Set dataSourceNames = new LinkedHashSet<>(); + dataSourceNames.addAll(left.getDataSourceNames()); + dataSourceNames.addAll(druidRight.getDataSourceNames()); + return dataSourceNames; } @Override diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java index 41f08098b76..2fe3d2ca027 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java @@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.guava.Sequences; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; public class DruidUnionRel extends DruidRel @@ -166,12 +167,11 @@ public class DruidUnionRel extends DruidRel } @Override - public List getDataSourceNames() + public Set getDataSourceNames() { return rels.stream() .flatMap(rel -> ((DruidRel) rel).getDataSourceNames().stream()) - .distinct() - .collect(Collectors.toList()); + .collect(Collectors.toSet()); } @Override