Add join-related DataSource types, and analysis functionality. (#9235)

* Add join-related DataSource types, and analysis functionality.

Builds on #9111 and implements the datasource analysis mentioned in #8728. Still can't
handle join datasources, but we're a step closer.

Join-related DataSource types:

1) Add "join", "lookup", and "inline" datasources.
2) Add "getChildren" and "withChildren" methods to DataSource, which will be used
   in the future for query rewriting (e.g. inlining of subqueries).

DataSource analysis functionality:

1) Add DataSourceAnalysis class, which breaks down datasources into three components:
   outer queries, a base datasource (left-most of the highest level left-leaning join
   tree), and other joined-in leaf datasources (the right-hand branches of the
   left-leaning join tree).
2) Add "isConcrete", "isGlobal", and "isCacheable" methods to DataSource in order to
   support analysis.

Other notes:

1) Renamed DataSource#getNames to DataSource#getTableNames, which I think is clearer.
   Also, made it a Set, so implementations don't need to worry about duplicates.
2) The addition of "isCacheable" should work around #8713, since UnionDataSource now
   returns false for cacheability.

* Remove javadoc comment.

* Updates reflecting code review.

* Add comments.

* Add more comments.
This commit is contained in:
Gian Merlino 2020-01-22 14:54:47 -08:00 committed by GitHub
parent d541cbe436
commit d886463253
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 3252 additions and 269 deletions

View File

@ -522,7 +522,7 @@ public class CachingClusteredClientBenchmark
@Override
public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
{
final String table = Iterables.getOnlyElement(dataSource.getNames());
final String table = Iterables.getOnlyElement(dataSource.getTableNames());
return timelines.get(table);
}

View File

@ -328,7 +328,7 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
QueryRunner<T> queryRunner = null;
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getTableNames());
if (runningItem != null) {
final Task task = runningItem.getTask();

View File

@ -266,18 +266,20 @@ public abstract class BaseQuery<T> implements Query<T>
return false;
}
BaseQuery<?> baseQuery = (BaseQuery<?>) o;
// Must use getDuration() instead of "duration" because duration is lazily computed.
return descending == baseQuery.descending &&
Objects.equals(dataSource, baseQuery.dataSource) &&
Objects.equals(context, baseQuery.context) &&
Objects.equals(querySegmentSpec, baseQuery.querySegmentSpec) &&
Objects.equals(duration, baseQuery.duration) &&
Objects.equals(getDuration(), baseQuery.getDuration()) &&
Objects.equals(granularity, baseQuery.granularity);
}
@Override
public int hashCode()
{
return Objects.hash(dataSource, descending, context, querySegmentSpec, duration, granularity);
// Must use getDuration() instead of "duration" because duration is lazily computed.
return Objects.hash(dataSource, descending, context, querySegmentSpec, getDuration(), granularity);
}
}

View File

@ -23,17 +23,62 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.List;
import java.util.Set;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type",
defaultImpl = LegacyDataSource.class)
/**
* Represents a source... of data... for a query. Analogous to the "FROM" clause in SQL.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LegacyDataSource.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = TableDataSource.class, name = "table"),
@JsonSubTypes.Type(value = QueryDataSource.class, name = "query"),
@JsonSubTypes.Type(value = UnionDataSource.class, name = "union")
})
@JsonSubTypes.Type(value = UnionDataSource.class, name = "union"),
@JsonSubTypes.Type(value = JoinDataSource.class, name = "join"),
@JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"),
@JsonSubTypes.Type(value = InlineDataSource.class, name = "inline")
})
public interface DataSource
{
List<String> getNames();
/**
* Returns the names of all table datasources involved in this query. Does not include names for non-tables, like
* lookups or inline datasources.
*/
Set<String> getTableNames();
/**
* Returns datasources that this datasource depends on. Will be empty for leaf datasources like 'table'.
*/
List<DataSource> getChildren();
/**
* Return a new DataSource, identical to this one, with different children. The number of children must be equal
* to the number of children that this datasource already has.
*/
DataSource withChildren(List<DataSource> children);
/**
* Returns true if queries on this dataSource are cacheable at both the result level and per-segment level.
* Currently, dataSources that modify the behavior of per-segment processing are not cacheable (like 'join').
* Nor are dataSources that do not actually reference segments (like 'inline'), since cache keys are always based
* on segment identifiers.
*
* Note: Ideally, queries on 'join' datasources _would_ be cacheable, but we cannot currently do this due to lacking
* the code necessary to compute cache keys properly.
*/
boolean isCacheable();
/**
* Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or
* for queries of those.
*/
boolean isGlobal();
/**
* Returns true if this datasource represents concrete data that can be scanned via a
* {@link org.apache.druid.segment.Segment} adapter of some kind. True for e.g. 'table' but not for 'query' or 'join'.
*
* @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteBased() which uses this
* @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteTableBased() which uses this
*/
boolean isConcrete();
}

View File

@ -1,31 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import java.util.List;
public class DataSourceUtil
{
public static String getMetricName(DataSource dataSource)
{
final List<String> names = dataSource.getNames();
return names.size() == 1 ? names.get(0) : names.toString();
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.query;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@ -30,7 +31,9 @@ import org.joda.time.Interval;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the
@ -42,9 +45,22 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
protected final Map<String, Number> metrics = new HashMap<>();
/** Non final to give subclasses ability to reassign it. */
/**
* Non final to give subclasses ability to reassign it.
*/
protected Thread ownerThread = Thread.currentThread();
private static String getTableNamesAsString(DataSource dataSource)
{
final Set<String> names = dataSource.getTableNames();
if (names.size() == 1) {
return Iterables.getOnlyElement(names);
} else {
return names.stream().sorted().collect(Collectors.toList()).toString();
}
}
protected void checkModifiedFromOwnerThread()
{
if (Thread.currentThread() != ownerThread) {
@ -77,7 +93,7 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
@Override
public void dataSource(QueryType query)
{
setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()));
setDimension(DruidMetrics.DATASOURCE, getTableNamesAsString(query.getDataSource()));
}
@Override

View File

@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.ToLongFunction;
/**
* Represents an inline datasource, where the rows are embedded within the DataSource object itself.
*
* The rows are backed by an Iterable, which can be lazy or not. Lazy datasources will only be iterated if someone calls
* {@link #getRows()} and iterates the result, or until someone calls {@link #getRowsAsList()}.
*/
public class InlineDataSource implements DataSource
{
private final List<String> columnNames;
private final List<ValueType> columnTypes;
private final Iterable<Object[]> rows;
private InlineDataSource(
final List<String> columnNames,
final List<ValueType> columnTypes,
final Iterable<Object[]> rows
)
{
this.columnNames = Preconditions.checkNotNull(columnNames, "'columnNames' must be nonnull");
this.columnTypes = Preconditions.checkNotNull(columnTypes, "'columnTypes' must be nonnull");
this.rows = Preconditions.checkNotNull(rows, "'rows' must be nonnull");
if (columnNames.size() != columnTypes.size()) {
throw new IAE("columnNames and columnTypes must be the same length");
}
}
/**
* Factory method for Jackson. Used for inline datasources that were originally encoded as JSON. Private because
* non-Jackson callers should use {@link #fromIterable}.
*/
@JsonCreator
private static InlineDataSource fromJson(
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<ValueType> columnTypes,
@JsonProperty("rows") List<Object[]> rows
)
{
return new InlineDataSource(columnNames, columnTypes, rows);
}
/**
* Creates an inline datasource from an Iterable. The Iterable will not be iterated until someone calls
* {@link #getRows()} and iterates the result, or until someone calls {@link #getRowsAsList()}.
*
* @param columnNames names of each column in the rows
* @param columnTypes types of each column in the rows
* @param rows rows, each of the same length as columnNames and columnTypes
*/
public static InlineDataSource fromIterable(
final List<String> columnNames,
final List<ValueType> columnTypes,
final Iterable<Object[]> rows
)
{
return new InlineDataSource(columnNames, columnTypes, rows);
}
@Override
public Set<String> getTableNames()
{
return Collections.emptySet();
}
@JsonProperty
public List<String> getColumnNames()
{
return columnNames;
}
@JsonProperty
public List<ValueType> getColumnTypes()
{
return columnTypes;
}
/**
* Returns rows as a list. If the original Iterable behind this datasource was a List, this method will return it
* as-is, without copying it. Otherwise, this method will walk the iterable and copy it into a List before returning.
*/
@JsonProperty("rows")
public List<Object[]> getRowsAsList()
{
return rows instanceof List ? ((List<Object[]>) rows) : Lists.newArrayList(rows);
}
/**
* Returns rows as an Iterable.
*/
@JsonIgnore
public Iterable<Object[]> getRows()
{
return rows;
}
@Override
public List<DataSource> getChildren()
{
return Collections.emptyList();
}
@Override
public DataSource withChildren(List<DataSource> children)
{
if (!children.isEmpty()) {
throw new IAE("Cannot accept children");
}
return this;
}
@Override
public boolean isCacheable()
{
return false;
}
@Override
public boolean isGlobal()
{
return true;
}
@Override
public boolean isConcrete()
{
return false;
}
public Map<String, ValueType> getRowSignature()
{
final ImmutableMap.Builder<String, ValueType> retVal = ImmutableMap.builder();
for (int i = 0; i < columnNames.size(); i++) {
retVal.put(columnNames.get(i), columnTypes.get(i));
}
return retVal.build();
}
public RowAdapter<Object[]> rowAdapter()
{
return new RowAdapter<Object[]>()
{
@Override
public ToLongFunction<Object[]> timestampFunction()
{
final int columnNumber = columnNames.indexOf(ColumnHolder.TIME_COLUMN_NAME);
if (columnNumber >= 0) {
return row -> (long) row[columnNumber];
} else {
return row -> 0L;
}
}
@Override
public Function<Object[], Object> columnFunction(String columnName)
{
final int columnNumber = columnNames.indexOf(columnName);
if (columnNumber >= 0) {
return row -> row[columnNumber];
} else {
return row -> null;
}
}
};
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InlineDataSource that = (InlineDataSource) o;
return Objects.equals(columnNames, that.columnNames) &&
Objects.equals(columnTypes, that.columnTypes) &&
Objects.equals(rows, that.rows);
}
@Override
public int hashCode()
{
return Objects.hash(columnNames, columnTypes, rows);
}
@Override
public String toString()
{
// Don't include 'rows' in stringification, because it might be long and/or lazy.
return "InlineDataSource{" +
"columnNames=" + columnNames +
", columnTypes=" + columnTypes +
'}';
}
}

View File

@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.Joinables;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* Represents a join of two datasources.
*
* Logically, this datasource contains the result of:
*
* (1) prefixing all right-side columns with "rightPrefix"
* (2) then, joining the left and (prefixed) right sides using the provided type and condition
*
* Any columns from the left-hand side that start with "rightPrefix", and are at least one character longer than
* the prefix, will be shadowed. It is up to the caller to ensure that no important columns are shadowed by the
* chosen prefix.
*
* When analyzed by {@link org.apache.druid.query.planning.DataSourceAnalysis}, the right-hand side of this datasource
* will become a {@link org.apache.druid.query.planning.PreJoinableClause} object.
*/
public class JoinDataSource implements DataSource
{
private final DataSource left;
private final DataSource right;
private final String rightPrefix;
private final JoinConditionAnalysis conditionAnalysis;
private final JoinType joinType;
private JoinDataSource(
DataSource left,
DataSource right,
String rightPrefix,
JoinConditionAnalysis conditionAnalysis,
JoinType joinType
)
{
this.left = Preconditions.checkNotNull(left, "left");
this.right = Preconditions.checkNotNull(right, "right");
this.rightPrefix = Joinables.validatePrefix(rightPrefix);
this.conditionAnalysis = Preconditions.checkNotNull(conditionAnalysis, "conditionAnalysis");
this.joinType = Preconditions.checkNotNull(joinType, "joinType");
}
@JsonCreator
public static JoinDataSource create(
@JsonProperty("left") DataSource left,
@JsonProperty("right") DataSource right,
@JsonProperty("rightPrefix") String rightPrefix,
@JsonProperty("condition") String condition,
@JsonProperty("joinType") JoinType joinType,
@JacksonInject ExprMacroTable macroTable
)
{
return new JoinDataSource(
left,
right,
StringUtils.nullToEmptyNonDruidDataString(rightPrefix),
JoinConditionAnalysis.forExpression(
Preconditions.checkNotNull(condition, "condition"),
StringUtils.nullToEmptyNonDruidDataString(rightPrefix),
macroTable
),
joinType
);
}
@Override
public Set<String> getTableNames()
{
final Set<String> names = new HashSet<>();
names.addAll(left.getTableNames());
names.addAll(right.getTableNames());
return names;
}
@JsonProperty
public DataSource getLeft()
{
return left;
}
@JsonProperty
public DataSource getRight()
{
return right;
}
@JsonProperty
public String getRightPrefix()
{
return rightPrefix;
}
@JsonProperty
public String getCondition()
{
return conditionAnalysis.getOriginalExpression();
}
public JoinConditionAnalysis getConditionAnalysis()
{
return conditionAnalysis;
}
@JsonProperty
public JoinType getJoinType()
{
return joinType;
}
@Override
public List<DataSource> getChildren()
{
return ImmutableList.of(left, right);
}
@Override
public DataSource withChildren(List<DataSource> children)
{
if (children.size() != 2) {
throw new IAE("Expected [2] children, got [%d]", children.size());
}
return new JoinDataSource(children.get(0), children.get(1), rightPrefix, conditionAnalysis, joinType);
}
@Override
public boolean isCacheable()
{
return false;
}
@Override
public boolean isGlobal()
{
return left.isGlobal() && right.isGlobal();
}
@Override
public boolean isConcrete()
{
return false;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JoinDataSource that = (JoinDataSource) o;
return Objects.equals(left, that.left) &&
Objects.equals(right, that.right) &&
Objects.equals(rightPrefix, that.rightPrefix) &&
Objects.equals(conditionAnalysis, that.conditionAnalysis) &&
joinType == that.joinType;
}
@Override
public int hashCode()
{
return Objects.hash(left, right, rightPrefix, conditionAnalysis, joinType);
}
@Override
public String toString()
{
return "JoinDataSource{" +
"left=" + left +
", right=" + right +
", rightPrefix='" + rightPrefix + '\'' +
", condition=" + conditionAnalysis +
", joinType=" + joinType +
'}';
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* Represents a lookup.
*
* Currently, this datasource is not actually queryable, and attempts to do so will lead to errors. It is here as a
* placeholder for a future time in which it will become queryable.
*
* The "lookupName" referred to here should be provided by a
* {@link org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider}.
*/
public class LookupDataSource implements DataSource
{
private final String lookupName;
@JsonCreator
public LookupDataSource(
@JsonProperty("lookup") String lookupName
)
{
this.lookupName = Preconditions.checkNotNull(lookupName, "lookup");
}
@Override
public Set<String> getTableNames()
{
return Collections.emptySet();
}
@JsonProperty("lookup")
public String getLookupName()
{
return lookupName;
}
@Override
public List<DataSource> getChildren()
{
return Collections.emptyList();
}
@Override
public DataSource withChildren(List<DataSource> children)
{
if (!children.isEmpty()) {
throw new IAE("Cannot accept children");
}
return this;
}
@Override
public boolean isCacheable()
{
return false;
}
@Override
public boolean isGlobal()
{
return true;
}
@Override
public boolean isConcrete()
{
return false;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LookupDataSource that = (LookupDataSource) o;
return Objects.equals(lookupName, that.lookupName);
}
@Override
public int hashCode()
{
return Objects.hash(lookupName);
}
@Override
public String toString()
{
return "LookupDataSource{" +
"lookupName='" + lookupName + '\'' +
'}';
}
}

View File

@ -34,6 +34,7 @@ import java.util.Map;
import java.util.Set;
/**
*
*/
@PublicApi
public class Queries

View File

@ -22,8 +22,13 @@ package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.IAE;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@JsonTypeName("query")
public class QueryDataSource implements DataSource
@ -34,13 +39,13 @@ public class QueryDataSource implements DataSource
@JsonCreator
public QueryDataSource(@JsonProperty("query") Query query)
{
this.query = query;
this.query = Preconditions.checkNotNull(query, "'query' must be nonnull");
}
@Override
public List<String> getNames()
public Set<String> getTableNames()
{
return query.getDataSource().getNames();
return query.getDataSource().getTableNames();
}
@JsonProperty
@ -49,6 +54,40 @@ public class QueryDataSource implements DataSource
return query;
}
@Override
public List<DataSource> getChildren()
{
return Collections.singletonList(query.getDataSource());
}
@Override
public DataSource withChildren(List<DataSource> children)
{
if (children.size() != 1) {
throw new IAE("Must have exactly one child");
}
return new QueryDataSource(query.withDataSource(Iterables.getOnlyElement(children)));
}
@Override
public boolean isCacheable()
{
return false;
}
@Override
public boolean isGlobal()
{
return query.getDataSource().isGlobal();
}
@Override
public boolean isConcrete()
{
return false;
}
@Override
public String toString()
{

View File

@ -22,20 +22,22 @@ package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@JsonTypeName("table")
public class TableDataSource implements DataSource
{
@JsonProperty
private final String name;
@JsonCreator
public TableDataSource(@JsonProperty("name") String name)
{
this.name = (name == null ? null : name);
this.name = Preconditions.checkNotNull(name, "'name' must be nonnull");
}
@JsonProperty
@ -45,9 +47,43 @@ public class TableDataSource implements DataSource
}
@Override
public List<String> getNames()
public Set<String> getTableNames()
{
return Collections.singletonList(name);
return Collections.singleton(name);
}
@Override
public List<DataSource> getChildren()
{
return Collections.emptyList();
}
@Override
public DataSource withChildren(List<DataSource> children)
{
if (!children.isEmpty()) {
throw new IAE("Cannot accept children");
}
return this;
}
@Override
public boolean isCacheable()
{
return true;
}
@Override
public boolean isGlobal()
{
return false;
}
@Override
public boolean isConcrete()
{
return true;
}
@Override
@ -57,7 +93,7 @@ public class TableDataSource implements DataSource
}
@Override
public boolean equals(Object o)
public final boolean equals(Object o)
{
if (this == o) {
return true;
@ -76,7 +112,7 @@ public class TableDataSource implements DataSource
}
@Override
public int hashCode()
public final int hashCode()
{
return name.hashCode();
}

View File

@ -23,9 +23,12 @@ package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.IAE;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class UnionDataSource implements DataSource
@ -41,9 +44,11 @@ public class UnionDataSource implements DataSource
}
@Override
public List<String> getNames()
public Set<String> getTableNames()
{
return dataSources.stream().map(input -> Iterables.getOnlyElement(input.getNames())).collect(Collectors.toList());
return dataSources.stream()
.map(input -> Iterables.getOnlyElement(input.getTableNames()))
.collect(Collectors.toSet());
}
@JsonProperty
@ -52,6 +57,51 @@ public class UnionDataSource implements DataSource
return dataSources;
}
@Override
public List<DataSource> getChildren()
{
return ImmutableList.copyOf(dataSources);
}
@Override
public DataSource withChildren(List<DataSource> children)
{
if (children.size() != dataSources.size()) {
throw new IAE("Expected [%d] children, got [%d]", dataSources.size(), children.size());
}
if (!children.stream().allMatch(dataSource -> dataSource instanceof TableDataSource)) {
throw new IAE("All children must be tables");
}
return new UnionDataSource(
children.stream().map(dataSource -> (TableDataSource) dataSource).collect(Collectors.toList())
);
}
@Override
public boolean isCacheable()
{
// Disables result-level caching for 'union' datasources, which doesn't work currently.
// See https://github.com/apache/druid/issues/8713 for reference.
//
// Note that per-segment caching is still effective, since at the time the per-segment cache evaluates a query
// for cacheability, it would have already been rewritten to a query on a single table.
return false;
}
@Override
public boolean isGlobal()
{
return dataSources.stream().allMatch(DataSource::isGlobal);
}
@Override
public boolean isConcrete()
{
return dataSources.stream().allMatch(DataSource::isConcrete);
}
@Override
public boolean equals(Object o)
{

View File

@ -0,0 +1,282 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.planning;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.spec.QuerySegmentSpec;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* Analysis of a datasource for purposes of deciding how to execute a particular query.
*
* The analysis breaks a datasource down in the following way:
*
* <pre>
*
* Q <-- Possible outer query datasource(s) [may be multiple stacked]
* |
* J <-- Possible join tree, expected to be left-leaning
* / \
* J Dj <-- Other leaf datasources
* Base datasource / \ which will be joined
* (bottom-leftmost) --> Db Dj <---- into the base datasource
*
* </pre>
*
* The base datasource (Db) is returned by {@link #getBaseDataSource()}. The other leaf datasources are returned by
* {@link #getPreJoinableClauses()}. The outer query datasources are available as part of {@link #getDataSource()},
* which just returns the original datasource that was provided for analysis.
*
* The base datasource (Db) will never be a join, but it can be any other type of datasource (table, query, etc).
* Note that join trees are only flattened if they occur at the top of the overall tree (or underneath an outer query),
* and that join trees are only flattened to the degree that they are left-leaning. Due to these facts, it is possible
* for the base or leaf datasources to include additional joins.
*
* The base datasource is the one that will be considered by the core Druid query stack for scanning via
* {@link org.apache.druid.segment.Segment} and {@link org.apache.druid.segment.StorageAdapter}. The other leaf
* datasources must be joinable onto the base data.
*
* The idea here is to keep things simple and dumb. So we focus only on identifying left-leaning join trees, which map
* neatly onto a series of hash table lookups at query time. The user/system generating the queries, e.g. the druid-sql
* layer (or the end user in the case of native queries), is responsible for containing the smarts to structure the
* tree in a way that will lead to optimal execution.
*/
public class DataSourceAnalysis
{
private final DataSource dataSource;
private final DataSource baseDataSource;
@Nullable
private final QuerySegmentSpec baseQuerySegmentSpec;
private final List<PreJoinableClause> preJoinableClauses;
private DataSourceAnalysis(
DataSource dataSource,
DataSource baseDataSource,
@Nullable QuerySegmentSpec baseQuerySegmentSpec,
List<PreJoinableClause> preJoinableClauses
)
{
if (baseDataSource instanceof JoinDataSource) {
// The base cannot be a join (this is a class invariant).
// If it happens, it's a bug in the datasource analyzer.
throw new IAE("Base dataSource cannot be a join! Original dataSource was: %s", dataSource);
}
this.dataSource = dataSource;
this.baseDataSource = baseDataSource;
this.baseQuerySegmentSpec = baseQuerySegmentSpec;
this.preJoinableClauses = preJoinableClauses;
}
public static DataSourceAnalysis forDataSource(final DataSource dataSource)
{
// Strip outer queries, retaining querySegmentSpecs as we go down (lowest will become the 'baseQuerySegmentSpec').
QuerySegmentSpec baseQuerySegmentSpec = null;
DataSource current = dataSource;
while (current instanceof QueryDataSource) {
final Query<?> subQuery = ((QueryDataSource) current).getQuery();
if (!(subQuery instanceof BaseQuery)) {
// All builtin query types are BaseQuery, so we only expect this with funky extension queries.
throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName());
}
baseQuerySegmentSpec = ((BaseQuery<?>) subQuery).getQuerySegmentSpec();
current = subQuery.getDataSource();
}
if (current instanceof JoinDataSource) {
final Pair<DataSource, List<PreJoinableClause>> flattened = flattenJoin((JoinDataSource) current);
return new DataSourceAnalysis(dataSource, flattened.lhs, baseQuerySegmentSpec, flattened.rhs);
} else {
return new DataSourceAnalysis(dataSource, current, baseQuerySegmentSpec, Collections.emptyList());
}
}
/**
* Flatten a datasource into two parts: the left-hand side datasource (the 'base' datasource), and a list of join
* clauses, if any.
*
* @throws IllegalArgumentException if dataSource cannot be fully flattened.
*/
private static Pair<DataSource, List<PreJoinableClause>> flattenJoin(final JoinDataSource dataSource)
{
DataSource current = dataSource;
final List<PreJoinableClause> preJoinableClauses = new ArrayList<>();
while (current instanceof JoinDataSource) {
final JoinDataSource joinDataSource = (JoinDataSource) current;
current = joinDataSource.getLeft();
preJoinableClauses.add(
new PreJoinableClause(
joinDataSource.getRightPrefix(),
joinDataSource.getRight(),
joinDataSource.getJoinType(),
joinDataSource.getConditionAnalysis()
)
);
}
// Join clauses were added in the order we saw them while traversing down, but we need to apply them in the
// going-up order. So reverse them.
Collections.reverse(preJoinableClauses);
return Pair.of(current, preJoinableClauses);
}
/**
* Returns the topmost datasource: the original one passed to {@link #forDataSource(DataSource)}.
*/
public DataSource getDataSource()
{
return dataSource;
}
/**
* Returns the base (bottom-leftmost) datasource.
*/
public DataSource getBaseDataSource()
{
return baseDataSource;
}
/**
* Returns the same datasource as {@link #getBaseDataSource()}, but only if it is a table. Useful on data servers,
* since they generally can only handle queries where the base datasource is a table.
*/
public Optional<TableDataSource> getBaseTableDataSource()
{
if (baseDataSource instanceof TableDataSource) {
return Optional.of((TableDataSource) baseDataSource);
} else {
return Optional.empty();
}
}
/**
* Returns the {@link QuerySegmentSpec} that is associated with the base datasource, if any. This only happens
* when there is an outer query datasource. In this case, the base querySegmentSpec is the one associated with the
* innermost subquery.
*/
public Optional<QuerySegmentSpec> getBaseQuerySegmentSpec()
{
return Optional.ofNullable(baseQuerySegmentSpec);
}
/**
* Returns join clauses corresponding to joinable leaf datasources (every leaf except the bottom-leftmost).
*/
public List<PreJoinableClause> getPreJoinableClauses()
{
return preJoinableClauses;
}
/**
* Returns true if all servers have the ability to compute this datasource. These datasources depend only on
* globally broadcast data, like lookups or inline data.
*/
public boolean isGlobal()
{
return dataSource.isGlobal();
}
/**
* Returns true if this datasource can be computed by the core Druid query stack via a scan of a concrete base
* datasource. All other datasources involved, if any, must be global.
*/
public boolean isConcreteBased()
{
return baseDataSource.isConcrete() && preJoinableClauses.stream()
.allMatch(clause -> clause.getDataSource().isGlobal());
}
/**
* Returns true if this datasource is concrete-based (see {@link #isConcreteBased()}, and the base datasource is a
* 'table' or union of them. This is an important property because it corresponds to datasources that can be handled
* by Druid data servers, like Historicals.
*/
public boolean isConcreteTableBased()
{
// At the time of writing this comment, UnionDataSource children are required to be tables, so the instanceof
// check is redundant. But in the future, we will likely want to support unions of things other than tables,
// so check anyway for future-proofing.
return isConcreteBased() && (baseDataSource instanceof TableDataSource
|| (baseDataSource instanceof UnionDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds instanceof TableDataSource)));
}
/**
* Returns true if this datasource represents a subquery.
*/
public boolean isQuery()
{
return dataSource instanceof QueryDataSource;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DataSourceAnalysis that = (DataSourceAnalysis) o;
return Objects.equals(dataSource, that.dataSource) &&
Objects.equals(baseDataSource, that.baseDataSource) &&
Objects.equals(baseQuerySegmentSpec, that.baseQuerySegmentSpec) &&
Objects.equals(preJoinableClauses, that.preJoinableClauses);
}
@Override
public int hashCode()
{
return Objects.hash(dataSource, baseDataSource, baseQuerySegmentSpec, preJoinableClauses);
}
@Override
public String toString()
{
return "DataSourceAnalysis{" +
"dataSource=" + dataSource +
", baseDataSource=" + baseDataSource +
", baseQuerySegmentSpec=" + baseQuerySegmentSpec +
", joinClauses=" + preJoinableClauses +
'}';
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.planning;
import com.google.common.base.Preconditions;
import org.apache.druid.query.DataSource;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.Joinables;
import java.util.Objects;
/**
* Like {@link org.apache.druid.segment.join.JoinableClause}, but contains a {@link DataSource} instead of a
* {@link org.apache.druid.segment.join.Joinable}. This is useful because when analyzing joins, we don't want to
* actually create Joinables, since that can be an expensive operation.
*/
public class PreJoinableClause
{
private final String prefix;
private final DataSource dataSource;
private final JoinType joinType;
private final JoinConditionAnalysis condition;
PreJoinableClause(
final String prefix,
final DataSource dataSource,
final JoinType joinType,
final JoinConditionAnalysis condition
)
{
this.prefix = Joinables.validatePrefix(prefix);
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.joinType = Preconditions.checkNotNull(joinType, "joinType");
this.condition = Preconditions.checkNotNull(condition, "condition");
}
public String getPrefix()
{
return prefix;
}
public DataSource getDataSource()
{
return dataSource;
}
public JoinType getJoinType()
{
return joinType;
}
public JoinConditionAnalysis getCondition()
{
return condition;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PreJoinableClause that = (PreJoinableClause) o;
return Objects.equals(prefix, that.prefix) &&
Objects.equals(dataSource, that.dataSource) &&
joinType == that.joinType &&
Objects.equals(condition, that.condition);
}
@Override
public int hashCode()
{
return Objects.hash(prefix, dataSource, joinType, condition);
}
@Override
public String toString()
{
return "JoinClause{" +
"prefix='" + prefix + '\'' +
", dataSource=" + dataSource +
", joinType=" + joinType +
", condition=" + condition +
'}';
}
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.segment.join;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@ -39,9 +38,9 @@ public class JoinableClause
private final JoinType joinType;
private final JoinConditionAnalysis condition;
public JoinableClause(@Nullable String prefix, Joinable joinable, JoinType joinType, JoinConditionAnalysis condition)
public JoinableClause(String prefix, Joinable joinable, JoinType joinType, JoinConditionAnalysis condition)
{
this.prefix = prefix != null ? prefix : "";
this.prefix = Joinables.validatePrefix(prefix);
this.joinable = Preconditions.checkNotNull(joinable, "joinable");
this.joinType = Preconditions.checkNotNull(joinType, "joinType");
this.condition = Preconditions.checkNotNull(condition, "condition");

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
/**
* Utility methods for working with {@link Joinable} related classes.
*/
public class Joinables
{
/**
* Checks that "prefix" is a valid prefix for a join clause (see {@link JoinableClause#getPrefix()}) and, if so,
* returns it. Otherwise, throws an exception.
*/
public static String validatePrefix(@Nullable final String prefix)
{
if (prefix == null || prefix.isEmpty()) {
throw new IAE("Join clause cannot have null or empty prefix");
} else if (isPrefixedBy(ColumnHolder.TIME_COLUMN_NAME, prefix) || ColumnHolder.TIME_COLUMN_NAME.equals(prefix)) {
throw new IAE(
"Join clause cannot have prefix[%s], since it would shadow %s",
prefix,
ColumnHolder.TIME_COLUMN_NAME
);
} else {
return prefix;
}
}
public static boolean isPrefixedBy(final String columnName, final String prefix)
{
return columnName.startsWith(prefix) && columnName.length() > prefix.length();
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -53,7 +54,10 @@ public class DataSourceTest
@Test
public void testTableDataSource() throws IOException
{
DataSource dataSource = JSON_MAPPER.readValue("{\"type\":\"table\", \"name\":\"somedatasource\"}", DataSource.class);
DataSource dataSource = JSON_MAPPER.readValue(
"{\"type\":\"table\", \"name\":\"somedatasource\"}",
DataSource.class
);
Assert.assertEquals(new TableDataSource("somedatasource"), dataSource);
}
@ -88,8 +92,8 @@ public class DataSourceTest
Lists.newArrayList(((UnionDataSource) dataSource).getDataSources())
);
Assert.assertEquals(
Lists.newArrayList("ds1", "ds2"),
Lists.newArrayList(dataSource.getNames())
ImmutableSet.of("ds1", "ds2"),
dataSource.getTableNames()
);
final DataSource serde = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(dataSource), DataSource.class);

View File

@ -0,0 +1,295 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class InlineDataSourceTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final AtomicLong iterationCounter = new AtomicLong();
private final List<Object[]> rows = ImmutableList.of(
new Object[]{DateTimes.of("2000").getMillis(), "foo", 0d, ImmutableMap.of("n", "0")},
new Object[]{DateTimes.of("2000").getMillis(), "bar", 1d, ImmutableMap.of("n", "1")},
new Object[]{DateTimes.of("2000").getMillis(), "baz", 2d, ImmutableMap.of("n", "2")}
);
private final Iterable<Object[]> rowsIterable = () -> {
iterationCounter.incrementAndGet();
return rows.iterator();
};
private final List<String> expectedColumnNames = ImmutableList.of(
ColumnHolder.TIME_COLUMN_NAME,
"str",
"double",
"complex"
);
private final List<ValueType> expectedColumnTypes = ImmutableList.of(
ValueType.LONG,
ValueType.STRING,
ValueType.DOUBLE,
ValueType.COMPLEX
);
private final InlineDataSource listDataSource = InlineDataSource.fromIterable(
expectedColumnNames,
expectedColumnTypes,
rows
);
private final InlineDataSource iterableDataSource = InlineDataSource.fromIterable(
expectedColumnNames,
expectedColumnTypes,
rowsIterable
);
@Test
public void test_getTableNames()
{
Assert.assertEquals(Collections.emptySet(), listDataSource.getTableNames());
Assert.assertEquals(Collections.emptySet(), iterableDataSource.getTableNames());
}
@Test
public void test_getColumnNames()
{
Assert.assertEquals(expectedColumnNames, listDataSource.getColumnNames());
Assert.assertEquals(expectedColumnNames, iterableDataSource.getColumnNames());
}
@Test
public void test_getColumnTypes()
{
Assert.assertEquals(expectedColumnTypes, listDataSource.getColumnTypes());
Assert.assertEquals(expectedColumnTypes, iterableDataSource.getColumnTypes());
}
@Test
public void test_getChildren()
{
Assert.assertEquals(Collections.emptyList(), listDataSource.getChildren());
Assert.assertEquals(Collections.emptyList(), iterableDataSource.getChildren());
}
@Test
public void test_getRowSignature()
{
Assert.assertEquals(
ImmutableMap.of(
ColumnHolder.TIME_COLUMN_NAME, ValueType.LONG,
"str", ValueType.STRING,
"double", ValueType.DOUBLE,
"complex", ValueType.COMPLEX
),
listDataSource.getRowSignature()
);
}
@Test
public void test_isCacheable()
{
Assert.assertFalse(listDataSource.isCacheable());
}
@Test
public void test_isGlobal()
{
Assert.assertTrue(listDataSource.isGlobal());
}
@Test
public void test_isConcrete()
{
Assert.assertFalse(listDataSource.isConcrete());
}
@Test
public void test_rowAdapter()
{
final RowAdapter<Object[]> adapter = listDataSource.rowAdapter();
final Object[] row = rows.get(1);
Assert.assertEquals(DateTimes.of("2000").getMillis(), adapter.timestampFunction().applyAsLong(row));
Assert.assertEquals("bar", adapter.columnFunction("str").apply(row));
Assert.assertEquals(1d, adapter.columnFunction("double").apply(row));
Assert.assertEquals(ImmutableMap.of("n", "1"), adapter.columnFunction("complex").apply(row));
}
@Test
public void test_getRows_list()
{
Assert.assertSame(this.rows, listDataSource.getRowsAsList());
}
@Test
public void test_getRows_iterable()
{
final Iterable<Object[]> iterable = iterableDataSource.getRows();
Assert.assertNotSame(this.rows, iterable);
// No iteration yet.
Assert.assertEquals(0, iterationCounter.get());
assertRowsEqual(this.rows, ImmutableList.copyOf(iterable));
// OK, now we've iterated.
Assert.assertEquals(1, iterationCounter.get());
// Read again, we should iterate again.
//noinspection MismatchedQueryAndUpdateOfCollection
final List<Object[]> ignored = Lists.newArrayList(iterable);
Assert.assertEquals(2, iterationCounter.get());
}
@Test
public void test_getRowsAsList_list()
{
Assert.assertSame(this.rows, listDataSource.getRowsAsList());
}
@Test
public void test_getRowsAsList_iterable()
{
final List<Object[]> list = iterableDataSource.getRowsAsList();
Assert.assertEquals(1, iterationCounter.get());
assertRowsEqual(this.rows, list);
// Read again, we should *not* iterate again (in contrast to "test_getRows_iterable").
//noinspection MismatchedQueryAndUpdateOfCollection
final List<Object[]> ignored = Lists.newArrayList(list);
Assert.assertEquals(1, iterationCounter.get());
}
@Test
public void test_withChildren_empty()
{
Assert.assertSame(listDataSource, listDataSource.withChildren(Collections.emptyList()));
}
@Test
public void test_withChildren_nonEmpty()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Cannot accept children");
// Workaround so "withChildren" isn't flagged as unused in the DataSource interface.
((DataSource) listDataSource).withChildren(ImmutableList.of(new TableDataSource("foo")));
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(InlineDataSource.class)
.usingGetClass()
.withNonnullFields("columnNames", "columnTypes", "rows")
.verify();
}
@Test
public void test_toString_iterable()
{
// Verify that toString does not iterate the rows.
final String ignored = iterableDataSource.toString();
Assert.assertEquals(0, iterationCounter.get());
}
@Test
public void test_serde_list() throws Exception
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final InlineDataSource deserialized = (InlineDataSource) jsonMapper.readValue(
jsonMapper.writeValueAsString(listDataSource),
DataSource.class
);
Assert.assertEquals(listDataSource.getColumnNames(), deserialized.getColumnNames());
Assert.assertEquals(listDataSource.getColumnTypes(), deserialized.getColumnTypes());
assertRowsEqual(listDataSource.getRows(), deserialized.getRows());
}
@Test
public void test_serde_iterable() throws Exception
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final InlineDataSource deserialized = (InlineDataSource) jsonMapper.readValue(
jsonMapper.writeValueAsString(iterableDataSource),
DataSource.class
);
// Lazy iterables turn into Lists upon serialization.
Assert.assertEquals(listDataSource.getColumnNames(), deserialized.getColumnNames());
Assert.assertEquals(listDataSource.getColumnTypes(), deserialized.getColumnTypes());
assertRowsEqual(listDataSource.getRows(), deserialized.getRows());
// Should have iterated once.
Assert.assertEquals(1, iterationCounter.get());
}
/**
* This method exists because "equals" on two equivalent Object[] won't return true, so we need to check
* for equality deeply.
*/
private static void assertRowsEqual(final Iterable<Object[]> expectedRows, final Iterable<Object[]> actualRows)
{
if (expectedRows instanceof List && actualRows instanceof List) {
// Only check equality deeply when both rows1 and rows2 are Lists, i.e., non-lazy.
final List<Object[]> expectedRowsList = (List<Object[]>) expectedRows;
final List<Object[]> actualRowsList = (List<Object[]>) actualRows;
final int sz = expectedRowsList.size();
Assert.assertEquals("number of rows", sz, actualRowsList.size());
// Super slow for LinkedLists, but we don't expect those to be used here.
// (They're generally forbidden in Druid except for special cases.)
for (int i = 0; i < sz; i++) {
Assert.assertArrayEquals("row #" + i, expectedRowsList.get(i), actualRowsList.get(i));
}
} else {
// If they're not both Lists, we don't want to iterate them during equality checks, so do a non-deep check.
// This might still return true if whatever class they are has another way of checking equality. But, usually we
// expect this to return false.
Assert.assertEquals("rows", expectedRows, actualRows);
}
}
}

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.JoinType;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
public class JoinDataSourceTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final TableDataSource fooTable = new TableDataSource("foo");
private final TableDataSource barTable = new TableDataSource("bar");
private final LookupDataSource lookylooLookup = new LookupDataSource("lookyloo");
private final JoinDataSource joinTableToLookup = JoinDataSource.create(
fooTable,
lookylooLookup,
"j.",
"x == \"j.x\"",
JoinType.LEFT,
ExprMacroTable.nil()
);
private final JoinDataSource joinTableToTable = JoinDataSource.create(
fooTable,
barTable,
"j.",
"x == \"j.x\"",
JoinType.LEFT,
ExprMacroTable.nil()
);
@Test
public void test_getTableNames_tableToTable()
{
Assert.assertEquals(ImmutableSet.of("foo", "bar"), joinTableToTable.getTableNames());
}
@Test
public void test_getTableNames_tableToLookup()
{
Assert.assertEquals(Collections.singleton("foo"), joinTableToLookup.getTableNames());
}
@Test
public void test_getChildren_tableToTable()
{
Assert.assertEquals(ImmutableList.of(fooTable, barTable), joinTableToTable.getChildren());
}
@Test
public void test_getChildren_tableToLookup()
{
Assert.assertEquals(ImmutableList.of(fooTable, lookylooLookup), joinTableToLookup.getChildren());
}
@Test
public void test_isCacheable_tableToTable()
{
Assert.assertFalse(joinTableToTable.isCacheable());
}
@Test
public void test_isCacheable_lookup()
{
Assert.assertFalse(joinTableToLookup.isCacheable());
}
@Test
public void test_isConcrete_tableToTable()
{
Assert.assertFalse(joinTableToTable.isConcrete());
}
@Test
public void test_isConcrete_tableToLookup()
{
Assert.assertFalse(joinTableToLookup.isConcrete());
}
@Test
public void test_isGlobal_tableToTable()
{
Assert.assertFalse(joinTableToTable.isGlobal());
}
@Test
public void test_isGlobal_tableToLookup()
{
Assert.assertFalse(joinTableToLookup.isGlobal());
}
@Test
public void test_withChildren_empty()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Expected [2] children, got [0]");
final DataSource ignored = joinTableToTable.withChildren(Collections.emptyList());
}
@Test
public void test_withChildren_two()
{
final DataSource transformed = joinTableToTable.withChildren(ImmutableList.of(fooTable, lookylooLookup));
Assert.assertEquals(joinTableToLookup, transformed);
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(JoinDataSource.class)
.usingGetClass()
.withNonnullFields("left", "right", "rightPrefix", "conditionAnalysis", "joinType")
.verify();
}
@Test
public void test_serde() throws Exception
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final JoinDataSource deserialized = (JoinDataSource) jsonMapper.readValue(
jsonMapper.writeValueAsString(joinTableToLookup),
DataSource.class
);
Assert.assertEquals(joinTableToLookup, deserialized);
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
public class LookupDataSourceTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final LookupDataSource lookylooDataSource = new LookupDataSource("lookyloo");
@Test
public void test_getTableNames()
{
Assert.assertEquals(Collections.emptySet(), lookylooDataSource.getTableNames());
}
@Test
public void test_getChildren()
{
Assert.assertEquals(Collections.emptyList(), lookylooDataSource.getChildren());
}
@Test
public void test_isCacheable()
{
Assert.assertFalse(lookylooDataSource.isCacheable());
}
@Test
public void test_isGlobal()
{
Assert.assertTrue(lookylooDataSource.isGlobal());
}
@Test
public void test_isConcrete()
{
Assert.assertFalse(lookylooDataSource.isConcrete());
}
@Test
public void test_withChildren_empty()
{
Assert.assertSame(lookylooDataSource, lookylooDataSource.withChildren(Collections.emptyList()));
}
@Test
public void test_withChildren_nonEmpty()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Cannot accept children");
lookylooDataSource.withChildren(ImmutableList.of(new LookupDataSource("bar")));
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(LookupDataSource.class).usingGetClass().withNonnullFields("lookupName").verify();
}
@Test
public void test_serde() throws Exception
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final LookupDataSource deserialized = (LookupDataSource) jsonMapper.readValue(
jsonMapper.writeValueAsString(lookylooDataSource),
DataSource.class
);
Assert.assertEquals(lookylooDataSource, deserialized);
}
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
public class QueryDataSourceTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final TimeseriesQuery queryOnTable =
Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build();
private final TimeseriesQuery queryOnLookup =
Druids.newTimeseriesQueryBuilder()
.dataSource(new LookupDataSource("lookyloo"))
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build();
private final QueryDataSource queryOnTableDataSource = new QueryDataSource(queryOnTable);
private final QueryDataSource queryOnLookupDataSource = new QueryDataSource(queryOnLookup);
@Test
public void test_getTableNames_table()
{
Assert.assertEquals(Collections.singleton("foo"), queryOnTableDataSource.getTableNames());
}
@Test
public void test_getTableNames_lookup()
{
Assert.assertEquals(Collections.emptySet(), queryOnLookupDataSource.getTableNames());
}
@Test
public void test_getChildren_table()
{
Assert.assertEquals(Collections.singletonList(new TableDataSource("foo")), queryOnTableDataSource.getChildren());
}
@Test
public void test_getChildren_lookup()
{
Assert.assertEquals(
Collections.singletonList(new LookupDataSource("lookyloo")),
queryOnLookupDataSource.getChildren()
);
}
@Test
public void test_isCacheable_table()
{
Assert.assertFalse(queryOnTableDataSource.isCacheable());
}
@Test
public void test_isCacheable_lookup()
{
Assert.assertFalse(queryOnLookupDataSource.isCacheable());
}
@Test
public void test_isConcrete_table()
{
Assert.assertFalse(queryOnTableDataSource.isConcrete());
}
@Test
public void test_isConcrete_lookup()
{
Assert.assertFalse(queryOnLookupDataSource.isConcrete());
}
@Test
public void test_isGlobal_table()
{
Assert.assertFalse(queryOnTableDataSource.isGlobal());
}
@Test
public void test_isGlobal_lookup()
{
Assert.assertTrue(queryOnLookupDataSource.isGlobal());
}
@Test
public void test_withChildren_empty()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Must have exactly one child");
final DataSource ignored = queryOnLookupDataSource.withChildren(Collections.emptyList());
}
@Test
public void test_withChildren_single()
{
final TableDataSource barTable = new TableDataSource("bar");
final QueryDataSource transformed =
(QueryDataSource) queryOnLookupDataSource.withChildren(Collections.singletonList(barTable));
Assert.assertEquals(barTable, transformed.getQuery().getDataSource());
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(QueryDataSource.class).usingGetClass().withNonnullFields("query").verify();
}
@Test
public void test_serde() throws Exception
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final QueryDataSource deserialized = (QueryDataSource) jsonMapper.readValue(
jsonMapper.writeValueAsString(queryOnTableDataSource),
DataSource.class
);
Assert.assertEquals(queryOnTableDataSource, deserialized);
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
public class TableDataSourceTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final TableDataSource fooDataSource = new TableDataSource("foo");
@Test
public void test_getTableNames()
{
Assert.assertEquals(Collections.singleton("foo"), fooDataSource.getTableNames());
}
@Test
public void test_getChildren()
{
Assert.assertEquals(Collections.emptyList(), fooDataSource.getChildren());
}
@Test
public void test_isCacheable()
{
Assert.assertTrue(fooDataSource.isCacheable());
}
@Test
public void test_isGlobal()
{
Assert.assertFalse(fooDataSource.isGlobal());
}
@Test
public void test_isConcrete()
{
Assert.assertTrue(fooDataSource.isConcrete());
}
@Test
public void test_withChildren_empty()
{
Assert.assertSame(fooDataSource, fooDataSource.withChildren(Collections.emptyList()));
}
@Test
public void test_withChildren_nonEmpty()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Cannot accept children");
fooDataSource.withChildren(ImmutableList.of(new TableDataSource("bar")));
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(TableDataSource.class).withNonnullFields("name").verify();
}
@Test
public void test_equals_legacy()
{
final LegacyDataSource legacyFoo = new LegacyDataSource("foo");
final LegacyDataSource legacyBar = new LegacyDataSource("bar");
Assert.assertEquals(legacyFoo, fooDataSource);
Assert.assertEquals(fooDataSource, legacyFoo);
Assert.assertNotEquals(legacyBar, fooDataSource);
Assert.assertNotEquals(fooDataSource, legacyBar);
}
@Test
public void test_serde() throws Exception
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final TableDataSource deserialized = (TableDataSource) jsonMapper.readValue(
jsonMapper.writeValueAsString(fooDataSource),
DataSource.class
);
Assert.assertEquals(fooDataSource, deserialized);
}
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
import java.util.List;
public class UnionDataSourceTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final UnionDataSource unionDataSource = new UnionDataSource(
ImmutableList.of(
new TableDataSource("foo"),
new TableDataSource("bar")
)
);
private final UnionDataSource unionDataSourceWithDuplicates = new UnionDataSource(
ImmutableList.of(
new TableDataSource("bar"),
new TableDataSource("foo"),
new TableDataSource("bar")
)
);
@Test
public void test_getTableNames()
{
Assert.assertEquals(ImmutableSet.of("foo", "bar"), unionDataSource.getTableNames());
}
@Test
public void test_getTableNames_withDuplicates()
{
Assert.assertEquals(ImmutableSet.of("foo", "bar"), unionDataSourceWithDuplicates.getTableNames());
}
@Test
public void test_getChildren()
{
Assert.assertEquals(
ImmutableList.of(new TableDataSource("foo"), new TableDataSource("bar")),
unionDataSource.getChildren()
);
}
@Test
public void test_getChildren_withDuplicates()
{
Assert.assertEquals(
ImmutableList.of(new TableDataSource("bar"), new TableDataSource("foo"), new TableDataSource("bar")),
unionDataSourceWithDuplicates.getChildren()
);
}
@Test
public void test_isCacheable()
{
Assert.assertFalse(unionDataSource.isCacheable());
}
@Test
public void test_isGlobal()
{
Assert.assertFalse(unionDataSource.isGlobal());
}
@Test
public void test_isConcrete()
{
Assert.assertTrue(unionDataSource.isConcrete());
}
@Test
public void test_withChildren_empty()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Expected [2] children, got [0]");
unionDataSource.withChildren(Collections.emptyList());
}
@Test
public void test_withChildren_sameNumber()
{
final List<TableDataSource> newDataSources = ImmutableList.of(
new TableDataSource("baz"),
new TableDataSource("qux")
);
//noinspection unchecked
Assert.assertEquals(
new UnionDataSource(newDataSources),
unionDataSource.withChildren((List) newDataSources)
);
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(UnionDataSource.class).usingGetClass().withNonnullFields("dataSources").verify();
}
@Test
public void test_serde() throws Exception
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final UnionDataSource deserialized = (UnionDataSource) jsonMapper.readValue(
jsonMapper.writeValueAsString(unionDataSource),
DataSource.class
);
Assert.assertEquals(unionDataSource, deserialized);
}
}

View File

@ -44,7 +44,7 @@ public class UnionQueryRunnerTest
{
// verify that table datasource is passed to baseQueryRunner
Assert.assertTrue(queryPlus.getQuery().getDataSource() instanceof TableDataSource);
String dsName = Iterables.getOnlyElement(queryPlus.getQuery().getDataSource().getNames());
String dsName = Iterables.getOnlyElement(queryPlus.getQuery().getDataSource().getTableNames());
if ("ds1".equals(dsName)) {
ds1.compareAndSet(false, true);
return Sequences.simple(Arrays.asList(1, 2, 3));

View File

@ -917,7 +917,7 @@ public class SegmentMetadataQueryTest
Query query = MAPPER.readValue(queryStr, Query.class);
Assert.assertTrue(query instanceof SegmentMetadataQuery);
Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames()));
Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getTableNames()));
Assert.assertEquals(
Intervals.of("2013-12-04T00:00:00.000Z/2013-12-05T00:00:00.000Z"),
query.getIntervals().get(0)
@ -937,7 +937,7 @@ public class SegmentMetadataQueryTest
+ "}";
Query query = MAPPER.readValue(queryStr, Query.class);
Assert.assertTrue(query instanceof SegmentMetadataQuery);
Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames()));
Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getTableNames()));
Assert.assertEquals(Intervals.ETERNITY, query.getIntervals().get(0));
Assert.assertTrue(((SegmentMetadataQuery) query).isUsingDefaultInterval());

View File

@ -0,0 +1,482 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.planning;
import com.google.common.collect.ImmutableList;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinType;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
public class DataSourceAnalysisTest
{
private static final List<Interval> MILLENIUM_INTERVALS = ImmutableList.of(Intervals.of("2000/3000"));
private static final TableDataSource TABLE_FOO = new TableDataSource("foo");
private static final TableDataSource TABLE_BAR = new TableDataSource("bar");
private static final LookupDataSource LOOKUP_LOOKYLOO = new LookupDataSource("lookyloo");
private static final InlineDataSource INLINE = InlineDataSource.fromIterable(
ImmutableList.of("column"),
ImmutableList.of(ValueType.STRING),
ImmutableList.of(new Object[0])
);
@Test
public void testTable()
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(TABLE_FOO);
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(TABLE_FOO, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses());
}
@Test
public void testUnion()
{
final UnionDataSource unionDataSource = new UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR));
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(unionDataSource);
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(unionDataSource, analysis.getDataSource());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses());
}
@Test
public void testQueryOnTable()
{
final QueryDataSource queryDataSource = subquery(TABLE_FOO);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource);
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertTrue(analysis.isQuery());
Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(
Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)),
analysis.getBaseQuerySegmentSpec()
);
Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses());
}
@Test
public void testQueryOnUnion()
{
final UnionDataSource unionDataSource = new UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR));
final QueryDataSource queryDataSource = subquery(unionDataSource);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource);
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertTrue(analysis.isQuery());
Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(
Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)),
analysis.getBaseQuerySegmentSpec()
);
Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses());
}
@Test
public void testLookup()
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(LOOKUP_LOOKYLOO);
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses());
}
@Test
public void testQueryOnLookup()
{
final QueryDataSource queryDataSource = subquery(LOOKUP_LOOKYLOO);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource);
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertTrue(analysis.isQuery());
Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(
Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)),
analysis.getBaseQuerySegmentSpec()
);
Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses());
}
@Test
public void testInline()
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(INLINE);
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(INLINE, analysis.getDataSource());
Assert.assertEquals(INLINE, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses());
}
@Test
public void testJoinSimpleLeftLeaning()
{
// Join of a table onto a variety of simple joinable objects (lookup, inline, subquery) with a left-leaning
// structure (no right children are joins themselves).
final JoinDataSource joinDataSource =
join(
join(
join(
TABLE_FOO,
LOOKUP_LOOKYLOO,
"1.",
JoinType.INNER
),
INLINE,
"2.",
JoinType.LEFT
),
subquery(LOOKUP_LOOKYLOO),
"3.",
JoinType.FULL
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(
ImmutableList.of(
new PreJoinableClause("1.", LOOKUP_LOOKYLOO, JoinType.INNER, joinClause("1.")),
new PreJoinableClause("2.", INLINE, JoinType.LEFT, joinClause("2.")),
new PreJoinableClause("3.", subquery(LOOKUP_LOOKYLOO), JoinType.FULL, joinClause("3."))
),
analysis.getPreJoinableClauses()
);
}
@Test
public void testJoinSimpleRightLeaning()
{
// Join of a table onto a variety of simple joinable objects (lookup, inline, subquery) with a right-leaning
// structure (no left children are joins themselves).
//
// Note that unlike the left-leaning stack, which is fully flattened, this one will not get flattened at all.
final JoinDataSource rightLeaningJoinStack =
join(
LOOKUP_LOOKYLOO,
join(
INLINE,
subquery(LOOKUP_LOOKYLOO),
"1.",
JoinType.LEFT
),
"2.",
JoinType.FULL
);
final JoinDataSource joinDataSource =
join(
TABLE_FOO,
rightLeaningJoinStack,
"3.",
JoinType.RIGHT
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(
ImmutableList.of(
new PreJoinableClause("3.", rightLeaningJoinStack, JoinType.RIGHT, joinClause("3."))
),
analysis.getPreJoinableClauses()
);
}
@Test
public void testJoinOverTableSubquery()
{
final JoinDataSource joinDataSource = join(
TABLE_FOO,
subquery(TABLE_FOO),
"1.",
JoinType.INNER
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(
ImmutableList.of(
new PreJoinableClause("1.", subquery(TABLE_FOO), JoinType.INNER, joinClause("1."))
),
analysis.getPreJoinableClauses()
);
}
@Test
public void testJoinTableUnionToLookup()
{
final UnionDataSource unionDataSource = new UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR));
final JoinDataSource joinDataSource = join(
unionDataSource,
LOOKUP_LOOKYLOO,
"1.",
JoinType.INNER
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(
ImmutableList.of(
new PreJoinableClause("1.", LOOKUP_LOOKYLOO, JoinType.INNER, joinClause("1."))
),
analysis.getPreJoinableClauses()
);
}
@Test
public void testJoinUnderTopLevelSubqueries()
{
final QueryDataSource queryDataSource =
subquery(
subquery(
join(
TABLE_FOO,
LOOKUP_LOOKYLOO,
"1.",
JoinType.INNER
)
)
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource);
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertTrue(analysis.isQuery());
Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(
Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)),
analysis.getBaseQuerySegmentSpec()
);
Assert.assertEquals(
ImmutableList.of(
new PreJoinableClause("1.", LOOKUP_LOOKYLOO, JoinType.INNER, joinClause("1."))
),
analysis.getPreJoinableClauses()
);
}
@Test
public void testJoinLookupToLookup()
{
final JoinDataSource joinDataSource = join(
LOOKUP_LOOKYLOO,
LOOKUP_LOOKYLOO,
"1.",
JoinType.INNER
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(
ImmutableList.of(
new PreJoinableClause("1.", LOOKUP_LOOKYLOO, JoinType.INNER, joinClause("1."))
),
analysis.getPreJoinableClauses()
);
}
@Test
public void testJoinLookupToTable()
{
final JoinDataSource joinDataSource = join(
LOOKUP_LOOKYLOO,
TABLE_FOO,
"1.",
JoinType.INNER
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
Assert.assertEquals(
ImmutableList.of(
new PreJoinableClause("1.", TABLE_FOO, JoinType.INNER, joinClause("1."))
),
analysis.getPreJoinableClauses()
);
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(DataSourceAnalysis.class)
.usingGetClass()
.withNonnullFields("dataSource", "baseDataSource", "preJoinableClauses")
.verify();
}
/**
* Generate a datasource that joins on a column named "x" on both sides.
*/
private static JoinDataSource join(
final DataSource left,
final DataSource right,
final String rightPrefix,
final JoinType joinType
)
{
return JoinDataSource.create(
left,
right,
rightPrefix,
joinClause(rightPrefix).getOriginalExpression(),
joinType,
ExprMacroTable.nil()
);
}
/**
* Generate a join clause that joins on a column named "x" on both sides.
*/
private static JoinConditionAnalysis joinClause(
final String rightPrefix
)
{
return JoinConditionAnalysis.forExpression(
StringUtils.format("x == \"%sx\"", rightPrefix),
rightPrefix,
ExprMacroTable.nil()
);
}
/**
* Generate a datasource that does a subquery on another datasource. The specific kind of query doesn't matter
* much for the purpose of this test class, so it's always the same.
*/
private static QueryDataSource subquery(final DataSource dataSource)
{
return new QueryDataSource(
GroupByQuery.builder()
.setDataSource(dataSource)
.setInterval(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS))
.setGranularity(Granularities.ALL)
.build()
);
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.planning;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinType;
import org.junit.Assert;
import org.junit.Test;
public class PreJoinableClauseTest
{
private final PreJoinableClause clause = new PreJoinableClause(
"j.",
new TableDataSource("foo"),
JoinType.LEFT,
JoinConditionAnalysis.forExpression("x == \"j.x\"", "j.", ExprMacroTable.nil())
);
@Test
public void test_getPrefix()
{
Assert.assertEquals("j.", clause.getPrefix());
}
@Test
public void test_getJoinType()
{
Assert.assertEquals(JoinType.LEFT, clause.getJoinType());
}
@Test
public void test_getCondition()
{
Assert.assertEquals("x == \"j.x\"", clause.getCondition().getOriginalExpression());
}
@Test
public void test_getDataSource()
{
Assert.assertEquals(new TableDataSource("foo"), clause.getDataSource());
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(PreJoinableClause.class)
.usingGetClass()
.withNonnullFields("prefix", "dataSource", "joinType", "condition")
.verify();
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import org.apache.druid.segment.column.ColumnHolder;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class JoinablesTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void test_validatePrefix_null()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Join clause cannot have null or empty prefix");
Joinables.validatePrefix(null);
}
@Test
public void test_validatePrefix_empty()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Join clause cannot have null or empty prefix");
Joinables.validatePrefix("");
}
@Test
public void test_validatePrefix_underscore()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Join clause cannot have prefix[_]");
Joinables.validatePrefix("_");
}
@Test
public void test_validatePrefix_timeColumn()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Join clause cannot have prefix[__time]");
Joinables.validatePrefix(ColumnHolder.TIME_COLUMN_NAME);
}
@Test
public void test_isPrefixedBy()
{
Assert.assertTrue(Joinables.isPrefixedBy("foo", ""));
Assert.assertTrue(Joinables.isPrefixedBy("foo", "f"));
Assert.assertTrue(Joinables.isPrefixedBy("foo", "fo"));
Assert.assertFalse(Joinables.isPrefixedBy("foo", "foo"));
}
}

View File

@ -59,6 +59,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
/**
*
*/
@ManageLifecycle
public class BrokerServerView implements TimelineServerView
@ -293,7 +294,7 @@ public class BrokerServerView implements TimelineServerView
@Override
public VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource)
{
String table = Iterables.getOnlyElement(dataSource.getNames());
String table = Iterables.getOnlyElement(dataSource.getTableNames());
synchronized (lock) {
return timelines.get(table);
}

View File

@ -25,13 +25,52 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class CacheUtil
{
public enum ServerType
{
BROKER {
@Override
boolean willMergeRunners()
{
return false;
}
},
DATA {
@Override
boolean willMergeRunners()
{
return true;
}
};
/**
* Same meaning as the "willMergeRunners" parameter to {@link CacheStrategy#isCacheable}.
*/
abstract boolean willMergeRunners();
}
public static Cache.NamedKey computeResultLevelCacheKey(String resultLevelCacheIdentifier)
{
return new Cache.NamedKey(resultLevelCacheIdentifier, StringUtils.toUtf8(resultLevelCacheIdentifier));
}
public static void populateResultCache(
Cache cache,
Cache.NamedKey key,
byte[] resultBytes
)
{
cache.put(key, resultBytes);
}
public static Cache.NamedKey computeSegmentCacheKey(
String segmentId,
SegmentDescriptor descriptor,
@ -54,64 +93,105 @@ public class CacheUtil
);
}
public static <T> boolean useCacheOnBrokers(
/**
* Returns whether the segment-level cache should be checked for a particular query.
*
* @param query the query to check
* @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query
* @param cacheConfig current active cache config
* @param serverType BROKER or DATA
*/
public static <T> boolean isUseSegmentCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
@Nullable CacheStrategy<T, Object, Query<T>> cacheStrategy,
CacheConfig cacheConfig,
ServerType serverType
)
{
return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false);
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType)
&& QueryContexts.isUseCache(query)
&& cacheConfig.isUseCache();
}
public static <T> boolean populateCacheOnBrokers(
/**
* Returns whether the result-level cache should be populated for a particular query.
*
* @param query the query to check
* @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query
* @param cacheConfig current active cache config
* @param serverType BROKER or DATA
*/
public static <T> boolean isPopulateSegmentCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
@Nullable CacheStrategy<T, Object, Query<T>> cacheStrategy,
CacheConfig cacheConfig,
ServerType serverType
)
{
return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false);
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType)
&& QueryContexts.isPopulateCache(query)
&& cacheConfig.isPopulateCache();
}
public static <T> boolean useCacheOnDataNodes(
/**
* Returns whether the result-level cache should be checked for a particular query.
*
* @param query the query to check
* @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query
* @param cacheConfig current active cache config
* @param serverType BROKER or DATA
*/
public static <T> boolean isUseResultCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
@Nullable CacheStrategy<T, Object, Query<T>> cacheStrategy,
CacheConfig cacheConfig,
ServerType serverType
)
{
return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true);
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType)
&& QueryContexts.isUseResultLevelCache(query)
&& cacheConfig.isUseResultLevelCache();
}
public static <T> boolean populateCacheOnDataNodes(
/**
* Returns whether the result-level cache should be populated for a particular query.
*
* @param query the query to check
* @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query
* @param cacheConfig current active cache config
* @param serverType BROKER or DATA
*/
public static <T> boolean isPopulateResultCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
@Nullable CacheStrategy<T, Object, Query<T>> cacheStrategy,
CacheConfig cacheConfig,
ServerType serverType
)
{
return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true);
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType)
&& QueryContexts.isPopulateResultLevelCache(query)
&& cacheConfig.isPopulateResultLevelCache();
}
private static <T> boolean useCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
/**
* Returns whether a particular query is cacheable. Does not check whether we are actually configured to use or
* populate the cache; that should be done separately.
*
* @param query the query to check
* @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query
* @param cacheConfig current active cache config
* @param serverType BROKER or DATA
*/
static <T> boolean isQueryCacheable(
final Query<T> query,
@Nullable final CacheStrategy<T, Object, Query<T>> cacheStrategy,
final CacheConfig cacheConfig,
final ServerType serverType
)
{
return QueryContexts.isUseCache(query)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
return cacheStrategy != null
&& cacheStrategy.isCacheable(query, serverType.willMergeRunners())
&& cacheConfig.isQueryCacheable(query)
&& query.getDataSource().isCacheable();
}
private static <T> boolean populateCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return QueryContexts.isPopulateCache(query)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);
}
}

View File

@ -241,8 +241,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
this.toolChest = warehouse.getToolChest(query);
this.strategy = toolChest.getCacheStrategy(query);
this.useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig);
this.populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig);
this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
this.populateCache = CacheUtil.isPopulateSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
this.isBySegment = QueryContexts.isBySegment(query);
// Note that enabling this leads to putting uncovered intervals information in the response headers
// and might blow up in some cases https://github.com/apache/druid/issues/2108

View File

@ -77,8 +77,13 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
{
Query<T> query = queryPlus.getQuery();
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
final boolean populateCache = CacheUtil.populateCacheOnDataNodes(query, strategy, cacheConfig);
final boolean useCache = CacheUtil.useCacheOnDataNodes(query, strategy, cacheConfig);
final boolean populateCache = CacheUtil.isPopulateSegmentCache(
query,
strategy,
cacheConfig,
CacheUtil.ServerType.DATA
);
final boolean useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.DATA);
final Cache.NamedKey key;
if (strategy != null && (useCache || populateCache)) {

View File

@ -194,7 +194,7 @@ public class CoordinatorServerView implements InventoryView
public VersionedIntervalTimeline<String, SegmentLoadInfo> getTimeline(DataSource dataSource)
{
String table = Iterables.getOnlyElement(dataSource.getNames());
String table = Iterables.getOnlyElement(dataSource.getTableNames());
synchronized (lock) {
return timelines.get(table);
}

View File

@ -1,90 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
public class ResultLevelCacheUtil
{
private static final Logger log = new Logger(ResultLevelCacheUtil.class);
public static Cache.NamedKey computeResultLevelCacheKey(String resultLevelCacheIdentifier)
{
return new Cache.NamedKey(resultLevelCacheIdentifier, StringUtils.toUtf8(resultLevelCacheIdentifier));
}
public static void populate(
Cache cache,
Cache.NamedKey key,
byte[] resultBytes
)
{
log.debug("Populating results into cache");
cache.put(key, resultBytes);
}
public static <T> boolean useResultLevelCacheOnBrokers(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return useResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false);
}
public static <T> boolean populateResultLevelCacheOnBrokers(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return populateResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false);
}
private static <T> boolean useResultLevelCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return QueryContexts.isUseResultLevelCache(query)
&& strategy != null
&& cacheConfig.isUseResultLevelCache()
&& cacheConfig.isQueryCacheable(query);
}
private static <T> boolean populateResultLevelCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return QueryContexts.isPopulateResultLevelCache(query)
&& strategy != null
&& cacheConfig.isPopulateResultLevelCache()
&& cacheConfig.isQueryCacheable(query);
}
}

View File

@ -25,7 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.ResultLevelCacheUtil;
import org.apache.druid.client.CacheUtil;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.java.util.common.RE;
@ -71,8 +71,13 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
this.cacheConfig = cacheConfig;
this.query = query;
this.strategy = queryToolChest.getCacheStrategy(query);
this.populateResultCache = ResultLevelCacheUtil.populateResultLevelCacheOnBrokers(query, strategy, cacheConfig);
this.useResultCache = ResultLevelCacheUtil.useResultLevelCacheOnBrokers(query, strategy, cacheConfig);
this.populateResultCache = CacheUtil.isPopulateResultCache(
query,
strategy,
cacheConfig,
CacheUtil.ServerType.BROKER
);
this.useResultCache = CacheUtil.isUseResultCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
}
@Override
@ -162,7 +167,7 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
)
{
if (useResultCache && queryCacheKey != null) {
return cache.get(ResultLevelCacheUtil.computeResultLevelCacheKey(queryCacheKey));
return cache.get(CacheUtil.computeResultLevelCacheKey(queryCacheKey));
}
return null;
}
@ -216,7 +221,7 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
ResultLevelCachePopulator resultLevelCachePopulator = new ResultLevelCachePopulator(
cache,
objectMapper,
ResultLevelCacheUtil.computeResultLevelCacheKey(cacheKeyStr),
CacheUtil.computeResultLevelCacheKey(cacheKeyStr),
cacheConfig,
true
);
@ -292,7 +297,7 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
public void populateResults()
{
ResultLevelCacheUtil.populate(
CacheUtil.populateResultCache(
cache,
key,
Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream").toByteArray()

View File

@ -53,6 +53,7 @@ import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
@ -168,6 +169,12 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
return new NoopQueryRunner<>();
}
// Sanity check: we cannot actually handle joins yet, so detect them and throw an error.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.getPreJoinableClauses().isEmpty()) {
throw new ISE("Cannot handle join dataSource");
}
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());

View File

@ -25,6 +25,7 @@ import com.google.inject.Inject;
import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.PostProcessingOperator;
@ -37,10 +38,12 @@ import org.apache.druid.query.ResultLevelCachingQueryRunner;
import org.apache.druid.query.RetryQueryRunner;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.initialization.ServerConfig;
import org.joda.time.Interval;
/**
*
*/
public class ClientQuerySegmentWalker implements QuerySegmentWalker
{
@ -79,12 +82,24 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
// Sanity check: we cannot actually handle joins yet, so detect them and throw an error.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.getPreJoinableClauses().isEmpty()) {
throw new ISE("Cannot handle join dataSource");
}
return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals));
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
// Sanity check: we cannot actually handle joins yet, so detect them and throw an error.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.getPreJoinableClauses().isEmpty()) {
throw new ISE("Cannot handle join dataSource");
}
return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs));
}
@ -93,12 +108,14 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
// This does not adhere to the fluent workflow. See https://github.com/apache/druid/issues/5517
return new ResultLevelCachingQueryRunner<>(makeRunner(query, baseClientRunner, toolChest),
return new ResultLevelCachingQueryRunner<>(
makeRunner(query, baseClientRunner, toolChest),
toolChest,
query,
objectMapper,
cache,
cacheConfig);
cacheConfig
);
}
private <T> QueryRunner<T> makeRunner(

View File

@ -189,7 +189,7 @@ public class QueryLifecycle
AuthorizationUtils.authorizeAllResourceActions(
authenticationResult,
Iterables.transform(
baseQuery.getDataSource().getNames(),
baseQuery.getDataSource().getTableNames(),
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR
),
authorizerMapper
@ -213,7 +213,7 @@ public class QueryLifecycle
AuthorizationUtils.authorizeAllResourceActions(
req,
Iterables.transform(
baseQuery.getDataSource().getNames(),
baseQuery.getDataSource().getTableNames(),
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR
),
authorizerMapper

View File

@ -27,7 +27,6 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryWatcher;
import java.util.List;
import java.util.Set;
public class QueryManager implements QueryWatcher
@ -61,7 +60,7 @@ public class QueryManager implements QueryWatcher
public void registerQuery(Query query, final ListenableFuture future)
{
final String id = query.getId();
final List<String> datasources = query.getDataSource().getNames();
final Set<String> datasources = query.getDataSource().getTableNames();
queries.put(id, future);
queryDatasources.putAll(id, datasources);
future.addListener(

View File

@ -190,7 +190,7 @@ public class QueryResource implements QueryCountStatsProvider
"%s[%s_%s_%s]",
currThreadName,
query.getType(),
query.getDataSource().getNames(),
query.getDataSource().getTableNames(),
queryId
);

View File

@ -53,6 +53,7 @@ import org.apache.druid.query.ReferenceCountingSegmentQueryRunner;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ReferenceCountingSegment;
@ -71,8 +72,6 @@ import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class ServerManager implements QuerySegmentWalker
{
private static final EmittingLogger log = new EmittingLogger(ServerManager.class);
@ -128,6 +127,12 @@ public class ServerManager implements QuerySegmentWalker
throw new ISE("Unknown query type[%s].", query.getClass());
}
// Sanity check: we cannot actually handle joins yet, so detect them and throw an error.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.getPreJoinableClauses().isEmpty()) {
throw new ISE("Cannot handle join dataSource");
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
@ -162,8 +167,7 @@ public class ServerManager implements QuerySegmentWalker
{
@Override
public Iterable<QueryRunner<T>> apply(
@Nullable
final TimelineObjectHolder<String, ReferenceCountingSegment> holder
@Nullable final TimelineObjectHolder<String, ReferenceCountingSegment> holder
)
{
if (holder == null) {
@ -210,7 +214,7 @@ public class ServerManager implements QuerySegmentWalker
private String getDataSourceName(DataSource dataSource)
{
return Iterables.getOnlyElement(dataSource.getNames());
return Iterables.getOnlyElement(dataSource.getTableNames());
}
@Override
@ -224,6 +228,12 @@ public class ServerManager implements QuerySegmentWalker
return new NoopQueryRunner<T>();
}
// Sanity check: we cannot actually handle joins yet, so detect them and throw an error.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.getPreJoinableClauses().isEmpty()) {
throw new ISE("Cannot handle join dataSource");
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
String dataSourceName = getDataSourceName(query.getDataSource());

View File

@ -20,21 +20,16 @@
package org.apache.druid.server.log;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.server.RequestLogLine;
import org.slf4j.MDC;
import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;
public class LoggingRequestLogger implements RequestLogger
{
@ -66,7 +61,7 @@ public class LoggingRequestLogger implements RequestLogger
final Query query = requestLogLine.getQuery();
MDC.put("queryId", query.getId());
MDC.put("sqlQueryId", StringUtils.nullToEmptyNonDruidDataString(query.getSqlQueryId()));
MDC.put("dataSource", findInnerDatasource(query).toString());
MDC.put("dataSource", String.join(",", query.getDataSource().getTableNames()));
MDC.put("queryType", query.getType());
MDC.put("isNested", String.valueOf(!(query.getDataSource() instanceof TableDataSource)));
MDC.put("hasFilters", Boolean.toString(query.hasFilters()));
@ -119,30 +114,6 @@ public class LoggingRequestLogger implements RequestLogger
return setContextMDC;
}
private Object findInnerDatasource(Query query)
{
DataSource _ds = query.getDataSource();
if (_ds instanceof TableDataSource) {
return ((TableDataSource) _ds).getName();
}
if (_ds instanceof QueryDataSource) {
return findInnerDatasource(((QueryDataSource) _ds).getQuery());
}
if (_ds instanceof UnionDataSource) {
return Joiner.on(",")
.join(
((UnionDataSource) _ds)
.getDataSources()
.stream()
.map(TableDataSource::getName)
.collect(Collectors.toList())
);
} else {
// should not come here
return query.getDataSource();
}
}
@Override
public String toString()
{

View File

@ -201,7 +201,7 @@ public class TieredBrokerHostSelector<T>
if (brokerServiceName == null) {
// For Union Queries tier will be selected on the rules for first dataSource.
List<Rule> rules = ruleManager.getRulesWithDefault(Iterables.getFirst(query.getDataSource().getNames(), null));
List<Rule> rules = ruleManager.getRulesWithDefault(Iterables.getFirst(query.getDataSource().getTableNames(), null));
// find the rule that can apply to the entire set of intervals
DateTime now = DateTimes.nowUtc();

View File

@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.Druids;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class CacheUtilTest
{
private final TimeseriesQuery timeseriesQuery =
Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
.intervals("2000/3000")
.granularity(Granularities.ALL)
.build();
@Test
public void test_isQueryCacheable_cacheableOnBroker()
{
Assert.assertTrue(
CacheUtil.isQueryCacheable(
timeseriesQuery,
new DummyCacheStrategy<>(true, true),
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.BROKER
)
);
}
@Test
public void test_isQueryCacheable_cacheableOnDataServer()
{
Assert.assertTrue(
CacheUtil.isQueryCacheable(
timeseriesQuery,
new DummyCacheStrategy<>(true, true),
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.DATA
)
);
}
@Test
public void test_isQueryCacheable_unCacheableOnBroker()
{
Assert.assertFalse(
CacheUtil.isQueryCacheable(
timeseriesQuery,
new DummyCacheStrategy<>(false, true),
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.BROKER
)
);
}
@Test
public void test_isQueryCacheable_unCacheableOnDataServer()
{
Assert.assertFalse(
CacheUtil.isQueryCacheable(
timeseriesQuery,
new DummyCacheStrategy<>(true, false),
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.DATA
)
);
}
@Test
public void test_isQueryCacheable_unCacheableType()
{
Assert.assertFalse(
CacheUtil.isQueryCacheable(
timeseriesQuery,
new DummyCacheStrategy<>(true, false),
makeCacheConfig(ImmutableMap.of("unCacheable", ImmutableList.of("timeseries"))),
CacheUtil.ServerType.BROKER
)
);
}
@Test
public void test_isQueryCacheable_unCacheableDataSource()
{
Assert.assertFalse(
CacheUtil.isQueryCacheable(
timeseriesQuery.withDataSource(new LookupDataSource("lookyloo")),
new DummyCacheStrategy<>(true, true),
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.BROKER
)
);
}
@Test
public void test_isQueryCacheable_nullCacheStrategy()
{
Assert.assertFalse(
CacheUtil.isQueryCacheable(
timeseriesQuery,
null,
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.BROKER
)
);
}
private static CacheConfig makeCacheConfig(final Map<String, Object> properties)
{
return TestHelper.makeJsonMapper().convertValue(properties, CacheConfig.class);
}
private static class DummyCacheStrategy<T, CacheType, QueryType extends Query<T>>
implements CacheStrategy<T, CacheType, QueryType>
{
private final boolean cacheableOnBrokers;
private final boolean cacheableOnDataServers;
public DummyCacheStrategy(boolean cacheableOnBrokers, boolean cacheableOnDataServers)
{
this.cacheableOnBrokers = cacheableOnBrokers;
this.cacheableOnDataServers = cacheableOnDataServers;
}
@Override
public boolean isCacheable(QueryType query, boolean willMergeRunners)
{
return willMergeRunners ? cacheableOnDataServers : cacheableOnBrokers;
}
@Override
public byte[] computeCacheKey(QueryType query)
{
throw new UnsupportedOperationException();
}
@Override
public byte[] computeResultLevelCacheKey(QueryType query)
{
throw new UnsupportedOperationException();
}
@Override
public TypeReference<CacheType> getCacheObjectClazz()
{
throw new UnsupportedOperationException();
}
@Override
public Function<T, CacheType> prepareForCache(boolean isResultLevelCache)
{
throw new UnsupportedOperationException();
}
@Override
public Function<CacheType, T> pullFromCache(boolean isResultLevelCache)
{
throw new UnsupportedOperationException();
}
}
}

View File

@ -41,6 +41,7 @@ import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
/**
* DruidRel that uses a "query" dataSource.
@ -199,9 +200,9 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
}
@Override
public List<String> getDataSourceNames()
public Set<String> getDataSourceNames()
{
return ((DruidRel) sourceRel).getDataSourceNames();
return ((DruidRel<?>) sourceRel).getDataSourceNames();
}
@Override

View File

@ -35,7 +35,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.sql.calcite.table.DruidTable;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Set;
/**
* DruidRel that uses a "table" dataSource.
@ -123,9 +123,9 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
}
@Override
public List<String> getDataSourceNames()
public Set<String> getDataSourceNames()
{
return druidTable.getDataSource().getNames();
return druidTable.getDataSource().getTableNames();
}
@Override

View File

@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode
{
@ -110,7 +110,7 @@ public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode
public abstract T asDruidConvention();
/**
* Get a list of names of datasources read by this DruidRel
* Get the set of names of table datasources read by this DruidRel
*/
public abstract List<String> getDataSourceNames();
public abstract Set<String> getDataSourceNames();
}

View File

@ -171,13 +171,13 @@ public class DruidSemiJoin extends DruidRel<DruidSemiJoin>
}
@Override
public List<String> getDataSourceNames()
public Set<String> getDataSourceNames()
{
final DruidRel<?> druidRight = (DruidRel) this.right;
Set<String> datasourceNames = new LinkedHashSet<>();
datasourceNames.addAll(left.getDataSourceNames());
datasourceNames.addAll(druidRight.getDataSourceNames());
return new ArrayList<>(datasourceNames);
Set<String> dataSourceNames = new LinkedHashSet<>();
dataSourceNames.addAll(left.getDataSourceNames());
dataSourceNames.addAll(druidRight.getDataSourceNames());
return dataSourceNames;
}
@Override

View File

@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class DruidUnionRel extends DruidRel<DruidUnionRel>
@ -166,12 +167,11 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
}
@Override
public List<String> getDataSourceNames()
public Set<String> getDataSourceNames()
{
return rels.stream()
.flatMap(rel -> ((DruidRel<?>) rel).getDataSourceNames().stream())
.distinct()
.collect(Collectors.toList());
.collect(Collectors.toSet());
}
@Override