Analysis refactor (#13501)

Refactor DataSource to have a getAnalysis method()

This removes various parts of the code where while loops and instanceof
checks were being used to walk through the structure of DataSource objects
in order to build a DataSourceAnalysis.  Instead we just ask the DataSource
for its analysis and allow the stack to rebuild whatever structure existed.
This commit is contained in:
somu-imply 2022-12-12 17:35:44 -08:00 committed by GitHub
parent de5a4bafcb
commit 7682b0b6b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 299 additions and 261 deletions

View File

@ -27,7 +27,6 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.Query;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.topn.TopNQuery;
@ -122,9 +121,10 @@ public class DataSourceOptimizer
List<Interval> remainingQueryIntervals = (List<Interval>) query.getIntervals();
for (DerivativeDataSource derivativeDataSource : ImmutableSortedSet.copyOf(derivativesWithRequiredFields)) {
TableDataSource tableDataSource = new TableDataSource(derivativeDataSource.getName());
final List<Interval> derivativeIntervals = remainingQueryIntervals.stream()
.flatMap(interval -> serverView
.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource(derivativeDataSource.getName())))
.getTimeline(tableDataSource.getAnalysis())
.orElseThrow(() -> new ISE("No timeline for dataSource: %s", derivativeDataSource.getName()))
.lookup(interval)
.stream()

View File

@ -289,7 +289,7 @@ public class DataSourcePlan
)
{
final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder();
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(dataSource);
final DataSourceAnalysis analysis = dataSource.getAnalysis();
final DataSourcePlan basePlan = forDataSource(
queryKit,

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import java.util.Collections;
@ -106,6 +107,12 @@ public class InputNumberDataSource implements DataSource
return null;
}
@Override
public DataSourceAnalysis getAnalysis()
{
return new DataSourceAnalysis(this, null, null, Collections.emptyList());
}
@JsonProperty
public int getInputNumber()
{

View File

@ -387,7 +387,7 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
QueryRunner<T> queryRunner = null;
if (runningItem != null) {
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
final Task task = runningItem.getTask();
if (analysis.getBaseTableDataSource().isPresent()

View File

@ -30,7 +30,6 @@ import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
@ -123,7 +122,8 @@ public abstract class BaseQuery<T> implements Query<T>
@VisibleForTesting
public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery<?> query)
{
return DataSourceAnalysis.forDataSource(query.getDataSource())
DataSource queryDataSource = query.getDataSource();
return queryDataSource.getAnalysis()
.getBaseQuerySegmentSpec()
.orElseGet(query::getQuerySegmentSpec);
}

View File

@ -124,4 +124,10 @@ public interface DataSource
*/
byte[] getCacheKey();
/**
* Get the analysis for a data source
*
* @return The {@link DataSourceAnalysis} object for the callee data source
*/
DataSourceAnalysis getAnalysis();
}

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.column.ColumnType;
@ -261,6 +262,12 @@ public class InlineDataSource implements DataSource
return null;
}
@Override
public DataSourceAnalysis getAnalysis()
{
return new DataSourceAnalysis(this, null, null, Collections.emptyList());
}
/**
* Returns the row signature (map of column name to type) for this inline datasource. Note that types may
* be null, meaning we know we have a column with a certain name, but we don't know what its type is.

View File

@ -33,6 +33,7 @@ import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.Triple;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.cache.CacheKeyBuilder;
@ -56,6 +57,7 @@ import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -119,7 +121,7 @@ public class JoinDataSource implements DataSource
);
this.leftFilter = leftFilter;
this.joinableFactoryWrapper = joinableFactoryWrapper;
this.analysis = DataSourceAnalysis.forDataSource(this);
this.analysis = this.getAnalysisForDataSource();
}
/**
@ -457,7 +459,7 @@ public class JoinDataSource implements DataSource
{
final List<PreJoinableClause> clauses = analysis.getPreJoinableClauses();
if (clauses.isEmpty()) {
throw new IAE("No join clauses to build the cache key for data source [%s]", analysis.getDataSource());
throw new IAE("No join clauses to build the cache key for data source [%s]", this);
}
final CacheKeyBuilder keyBuilder;
@ -479,4 +481,52 @@ public class JoinDataSource implements DataSource
}
return keyBuilder.build();
}
private DataSourceAnalysis getAnalysisForDataSource()
{
final Triple<DataSource, DimFilter, List<PreJoinableClause>> flattened = flattenJoin(this);
return new DataSourceAnalysis(flattened.first, null, flattened.second, flattened.third);
}
@Override
public DataSourceAnalysis getAnalysis()
{
return analysis;
}
/**
* Flatten a datasource into two parts: the left-hand side datasource (the 'base' datasource), and a list of join
* clauses, if any.
*
* @throws IllegalArgumentException if dataSource cannot be fully flattened.
*/
private static Triple<DataSource, DimFilter, List<PreJoinableClause>> flattenJoin(final JoinDataSource dataSource)
{
DataSource current = dataSource;
DimFilter currentDimFilter = null;
final List<PreJoinableClause> preJoinableClauses = new ArrayList<>();
while (current instanceof JoinDataSource) {
final JoinDataSource joinDataSource = (JoinDataSource) current;
current = joinDataSource.getLeft();
if (currentDimFilter != null) {
throw new IAE("Left filters are only allowed when left child is direct table access");
}
currentDimFilter = joinDataSource.getLeftFilter();
preJoinableClauses.add(
new PreJoinableClause(
joinDataSource.getRightPrefix(),
joinDataSource.getRight(),
joinDataSource.getJoinType(),
joinDataSource.getConditionAnalysis()
)
);
}
// Join clauses were added in the order we saw them while traversing down, but we need to apply them in the
// going-up order. So reverse them.
Collections.reverse(preJoinableClauses);
return Triple.of(current, currentDimFilter, preJoinableClauses);
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import java.util.Collections;
@ -120,6 +121,12 @@ public class LookupDataSource implements DataSource
return null;
}
@Override
public DataSourceAnalysis getAnalysis()
{
return new DataSourceAnalysis(this, null, null, Collections.emptyList());
}
@Override
public boolean equals(Object o)
{

View File

@ -166,7 +166,8 @@ public class Queries
}
// Verify preconditions and invariants, just in case.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(retVal.getDataSource());
final DataSource retDataSource = retVal.getDataSource();
final DataSourceAnalysis analysis = retDataSource.getAnalysis();
// Sanity check: query must be based on a single table.
if (!analysis.getBaseTableDataSource().isPresent()) {

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import java.util.Collections;
@ -112,6 +113,19 @@ public class QueryDataSource implements DataSource
return null;
}
@Override
public DataSourceAnalysis getAnalysis()
{
final Query<?> subQuery = this.getQuery();
if (!(subQuery instanceof BaseQuery)) {
// We must verify that the subQuery is a BaseQuery, because it is required to make
// "DataSourceAnalysis.getBaseQuerySegmentSpec" work properly.
// All built-in query types are BaseQuery, so we only expect this with funky extension queries.
throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName());
}
final DataSource current = subQuery.getDataSource();
return current.getAnalysis().maybeWithBaseQuery(subQuery);
}
@Override
public String toString()

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import java.util.Collections;
@ -117,6 +118,12 @@ public class TableDataSource implements DataSource
return new byte[0];
}
@Override
public DataSourceAnalysis getAnalysis()
{
return new DataSourceAnalysis(this, null, null, Collections.emptyList());
}
@Override
public String toString()
{

View File

@ -26,8 +26,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@ -129,6 +131,12 @@ public class UnionDataSource implements DataSource
return null;
}
@Override
public DataSourceAnalysis getAnalysis()
{
return new DataSourceAnalysis(this, null, null, Collections.emptyList());
}
@Override
public boolean equals(Object o)
{

View File

@ -50,7 +50,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
{
Query<T> query = queryPlus.getQuery();
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
if (analysis.isConcreteTableBased() && analysis.getBaseUnionDataSource().isPresent()) {
// Union of tables.

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.UnnestSegmentReference;
import org.apache.druid.utils.JvmUtils;
@ -187,6 +188,13 @@ public class UnnestDataSource implements DataSource
return null;
}
@Override
public DataSourceAnalysis getAnalysis()
{
final DataSource current = this.getBase();
return current.getAnalysis();
}
@Override
public boolean equals(Object o)
{

View File

@ -20,12 +20,10 @@
package org.apache.druid.query.planning;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Triple;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
@ -33,8 +31,6 @@ import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -59,8 +55,7 @@ import java.util.Optional;
* </pre>
*
* The base datasource (Db) is returned by {@link #getBaseDataSource()}. The other leaf datasources are returned by
* {@link #getPreJoinableClauses()}. The outer query datasources are available as part of {@link #getDataSource()},
* which just returns the original datasource that was provided for analysis.
* {@link #getPreJoinableClauses()}.
*
* The base datasource (Db) will never be a join, but it can be any other type of datasource (table, query, etc).
* Note that join trees are only flattened if they occur at the top of the overall tree (or underneath an outer query),
@ -78,7 +73,6 @@ import java.util.Optional;
*/
public class DataSourceAnalysis
{
private final DataSource dataSource;
private final DataSource baseDataSource;
@Nullable
private final Query<?> baseQuery;
@ -86,8 +80,7 @@ public class DataSourceAnalysis
private final DimFilter joinBaseTableFilter;
private final List<PreJoinableClause> preJoinableClauses;
private DataSourceAnalysis(
DataSource dataSource,
public DataSourceAnalysis(
DataSource baseDataSource,
@Nullable Query<?> baseQuery,
@Nullable DimFilter joinBaseTableFilter,
@ -97,99 +90,15 @@ public class DataSourceAnalysis
if (baseDataSource instanceof JoinDataSource) {
// The base cannot be a join (this is a class invariant).
// If it happens, it's a bug in the datasource analyzer.
throw new IAE("Base dataSource cannot be a join! Original dataSource was: %s", dataSource);
throw new IAE("Base dataSource cannot be a join! Original base datasource was: %s", baseDataSource);
}
this.dataSource = dataSource;
this.baseDataSource = baseDataSource;
this.baseQuery = baseQuery;
this.joinBaseTableFilter = joinBaseTableFilter;
this.preJoinableClauses = preJoinableClauses;
}
public static DataSourceAnalysis forDataSource(final DataSource dataSource)
{
// Strip outer queries, retaining querySegmentSpecs as we go down (lowest will become the 'baseQuerySegmentSpec').
Query<?> baseQuery = null;
DataSource current = dataSource;
// This needs to be an or condition between QueryDataSource and UnnestDataSource
// As queries can have interleaving query and unnest data sources.
// Ideally if each data source generate their own analysis object we can avoid the or here
// and have cleaner code. Especially as we increase the types of data sources in future
// these or checks will be tedious. Future development should move forDataSource method
// into each data source.
while (current instanceof QueryDataSource || current instanceof UnnestDataSource) {
if (current instanceof QueryDataSource) {
final Query<?> subQuery = ((QueryDataSource) current).getQuery();
if (!(subQuery instanceof BaseQuery)) {
// We must verify that the subQuery is a BaseQuery, because it is required to make "getBaseQuerySegmentSpec"
// work properly. All built-in query types are BaseQuery, so we only expect this with funky extension queries.
throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName());
}
baseQuery = subQuery;
current = subQuery.getDataSource();
} else {
final UnnestDataSource unnestDataSource = (UnnestDataSource) current;
current = unnestDataSource.getBase();
}
}
if (current instanceof JoinDataSource) {
final Triple<DataSource, DimFilter, List<PreJoinableClause>> flattened = flattenJoin((JoinDataSource) current);
return new DataSourceAnalysis(dataSource, flattened.first, baseQuery, flattened.second, flattened.third);
} else {
return new DataSourceAnalysis(dataSource, current, baseQuery, null, Collections.emptyList());
}
}
/**
* Flatten a datasource into two parts: the left-hand side datasource (the 'base' datasource), and a list of join
* clauses, if any.
*
* @throws IllegalArgumentException if dataSource cannot be fully flattened.
*/
private static Triple<DataSource, DimFilter, List<PreJoinableClause>> flattenJoin(final JoinDataSource dataSource)
{
DataSource current = dataSource;
DimFilter currentDimFilter = null;
final List<PreJoinableClause> preJoinableClauses = new ArrayList<>();
while (current instanceof JoinDataSource) {
final JoinDataSource joinDataSource = (JoinDataSource) current;
current = joinDataSource.getLeft();
if (currentDimFilter != null) {
throw new IAE("Left filters are only allowed when left child is direct table access");
}
currentDimFilter = joinDataSource.getLeftFilter();
preJoinableClauses.add(
new PreJoinableClause(
joinDataSource.getRightPrefix(),
joinDataSource.getRight(),
joinDataSource.getJoinType(),
joinDataSource.getConditionAnalysis()
)
);
}
// Join clauses were added in the order we saw them while traversing down, but we need to apply them in the
// going-up order. So reverse them.
Collections.reverse(preJoinableClauses);
return Triple.of(current, currentDimFilter, preJoinableClauses);
}
/**
* Returns the topmost datasource: the original one passed to {@link #forDataSource(DataSource)}.
*/
public DataSource getDataSource()
{
return dataSource;
}
/**
* Returns the base (bottom-leftmost) datasource.
*/
@ -230,7 +139,7 @@ public class DataSourceAnalysis
* the datasource tree. This is the query that will be applied to the base datasource plus any joinables that might
* be present.
*
* @return the query associated with the base datasource if {@link #isQuery()} is true, else empty
* @return the query associated with the base datasource if is true, else empty
*/
public Optional<Query<?>> getBaseQuery()
{
@ -253,13 +162,29 @@ public class DataSourceAnalysis
* <p>
* This {@link QuerySegmentSpec} is taken from the query returned by {@link #getBaseQuery()}.
*
* @return the query segment spec associated with the base datasource if {@link #isQuery()} is true, else empty
* @return the query segment spec associated with the base datasource if is true, else empty
*/
public Optional<QuerySegmentSpec> getBaseQuerySegmentSpec()
{
return getBaseQuery().map(query -> ((BaseQuery<?>) query).getQuerySegmentSpec());
}
/**
* Returns the data source analysis with or without the updated query.
* If the DataSourceAnalysis already has a non-null baseQuery, no update is required
* Else this method creates a new analysis object with the base query provided in the input
*
* @param query the query to add to the analysis if the baseQuery is null
* @return the existing analysis if it has non-null basequery, else a new one with the updated base query
*/
public DataSourceAnalysis maybeWithBaseQuery(Query<?> query)
{
if (!getBaseQuery().isPresent()) {
return new DataSourceAnalysis(baseDataSource, query, joinBaseTableFilter, preJoinableClauses);
}
return this;
}
/**
* Returns join clauses corresponding to joinable leaf datasources (every leaf except the bottom-leftmost).
*/
@ -268,15 +193,6 @@ public class DataSourceAnalysis
return preJoinableClauses;
}
/**
* Returns true if all servers have the ability to compute this datasource. These datasources depend only on
* globally broadcast data, like lookups or inline data or broadcast segments.
*/
public boolean isGlobal()
{
return dataSource.isGlobal();
}
/**
* Returns true if this datasource can be computed by the core Druid query stack via a scan of a concrete base
* datasource. All other datasources involved, if any, must be global.
@ -309,15 +225,6 @@ public class DataSourceAnalysis
.allMatch(ds -> ds instanceof TableDataSource)));
}
/**
* Returns true if this datasource represents a subquery (that is, whether it is a {@link QueryDataSource}).
*/
public boolean isQuery()
{
return dataSource instanceof QueryDataSource;
}
/**
* Returns true if this datasource is made out of a join operation
*/
@ -336,20 +243,19 @@ public class DataSourceAnalysis
return false;
}
DataSourceAnalysis that = (DataSourceAnalysis) o;
return Objects.equals(dataSource, that.dataSource);
return Objects.equals(baseDataSource, that.baseDataSource);
}
@Override
public int hashCode()
{
return Objects.hash(dataSource);
return Objects.hash(baseDataSource);
}
@Override
public String toString()
{
return "DataSourceAnalysis{" +
"dataSource=" + dataSource +
", baseDataSource=" + baseDataSource +
", baseQuery=" + baseQuery +
", preJoinableClauses=" + preJoinableClauses +

View File

@ -62,13 +62,10 @@ public class DataSourceAnalysisTest
@Test
public void testTable()
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(TABLE_FOO);
final DataSourceAnalysis analysis = TABLE_FOO.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(TABLE_FOO, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -82,13 +79,10 @@ public class DataSourceAnalysisTest
public void testUnion()
{
final UnionDataSource unionDataSource = new UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR));
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(unionDataSource);
final DataSourceAnalysis analysis = unionDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(unionDataSource, analysis.getDataSource());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.of(unionDataSource), analysis.getBaseUnionDataSource());
@ -102,13 +96,10 @@ public class DataSourceAnalysisTest
public void testQueryOnTable()
{
final QueryDataSource queryDataSource = subquery(TABLE_FOO);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource);
final DataSourceAnalysis analysis = queryDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertTrue(analysis.isQuery());
Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -126,13 +117,10 @@ public class DataSourceAnalysisTest
{
final UnionDataSource unionDataSource = new UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR));
final QueryDataSource queryDataSource = subquery(unionDataSource);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource);
final DataSourceAnalysis analysis = queryDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertTrue(analysis.isQuery());
Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.of(unionDataSource), analysis.getBaseUnionDataSource());
@ -148,13 +136,10 @@ public class DataSourceAnalysisTest
@Test
public void testLookup()
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(LOOKUP_LOOKYLOO);
final DataSourceAnalysis analysis = LOOKUP_LOOKYLOO.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -168,13 +153,10 @@ public class DataSourceAnalysisTest
public void testQueryOnLookup()
{
final QueryDataSource queryDataSource = subquery(LOOKUP_LOOKYLOO);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource);
final DataSourceAnalysis analysis = queryDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertTrue(analysis.isQuery());
Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -190,13 +172,10 @@ public class DataSourceAnalysisTest
@Test
public void testInline()
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(INLINE);
final DataSourceAnalysis analysis = INLINE.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(INLINE, analysis.getDataSource());
Assert.assertEquals(INLINE, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -230,13 +209,10 @@ public class DataSourceAnalysisTest
JoinType.FULL
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getJoinBaseTableFilter());
@ -277,13 +253,10 @@ public class DataSourceAnalysisTest
JoinType.FULL
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(TrueDimFilter.instance(), analysis.getJoinBaseTableFilter().orElse(null));
@ -331,13 +304,10 @@ public class DataSourceAnalysisTest
JoinType.RIGHT
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getJoinBaseTableFilter());
@ -378,13 +348,10 @@ public class DataSourceAnalysisTest
TrueDimFilter.instance()
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(TrueDimFilter.instance(), analysis.getJoinBaseTableFilter().orElse(null));
@ -411,13 +378,10 @@ public class DataSourceAnalysisTest
TrueDimFilter.instance()
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(TrueDimFilter.instance(), analysis.getJoinBaseTableFilter().orElse(null));
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
@ -442,13 +406,10 @@ public class DataSourceAnalysisTest
JoinType.INNER
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getJoinBaseTableFilter());
Assert.assertEquals(Optional.of(unionDataSource), analysis.getBaseUnionDataSource());
@ -480,13 +441,10 @@ public class DataSourceAnalysisTest
)
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource);
final DataSourceAnalysis analysis = queryDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertTrue(analysis.isQuery());
Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(TrueDimFilter.instance(), analysis.getJoinBaseTableFilter().orElse(null));
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
@ -528,13 +486,10 @@ public class DataSourceAnalysisTest
JoinType.INNER
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -560,13 +515,10 @@ public class DataSourceAnalysisTest
JoinType.INNER
);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -587,10 +539,10 @@ public class DataSourceAnalysisTest
{
EqualsVerifier.forClass(DataSourceAnalysis.class)
.usingGetClass()
.withNonnullFields("dataSource")
.withNonnullFields("baseDataSource")
// These fields are not necessary, because they're wholly determined by "dataSource"
.withIgnoredFields("baseDataSource", "baseQuery", "preJoinableClauses", "joinBaseTableFilter")
.withIgnoredFields("baseQuery", "preJoinableClauses", "joinBaseTableFilter")
.verify();
}

View File

@ -21,8 +21,10 @@ package org.apache.druid.segment.join;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@ -89,4 +91,10 @@ public class NoopDataSource implements DataSource
{
return new byte[]{};
}
@Override
public DataSourceAnalysis getAnalysis()
{
return new DataSourceAnalysis(this, null, null, Collections.emptyList());
}
}

View File

@ -353,7 +353,7 @@ public class BrokerServerView implements TimelineServerView
{
final TableDataSource table =
analysis.getBaseTableDataSource()
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
.orElseThrow(() -> new ISE("Cannot handle base datasource: %s", analysis.getBaseDataSource()));
synchronized (lock) {
return Optional.ofNullable(timelines.get(table.getName()));

View File

@ -279,7 +279,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
this.query = queryPlus.getQuery();
this.toolChest = warehouse.getToolChest(query);
this.strategy = toolChest.getCacheStrategy(query);
this.dataSourceAnalysis = DataSourceAnalysis.forDataSource(query.getDataSource());
this.dataSourceAnalysis = query.getDataSource().getAnalysis();
this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
this.populateCache = CacheUtil.isPopulateSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);

View File

@ -57,7 +57,7 @@ public class ServerViewUtil
int numCandidates
)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(datasource);
final DataSourceAnalysis analysis = datasource.getAnalysis();
final Optional<? extends TimelineLookup<String, ServerSelector>> maybeTimeline = serverView.getTimeline(analysis);
if (!maybeTimeline.isPresent()) {
return Collections.emptyList();

View File

@ -74,7 +74,7 @@ public class BroadcastTableJoinableFactory implements JoinableFactory
private Optional<ReferenceCountingIndexedTable> getOnlyIndexedTable(DataSource dataSource)
{
GlobalTableDataSource broadcastDataSource = (GlobalTableDataSource) dataSource;
DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(dataSource);
DataSourceAnalysis analysis = dataSource.getAnalysis();
return segmentManager.getIndexedTables(analysis).flatMap(tables -> {
Iterator<ReferenceCountingIndexedTable> tableIterator = tables.iterator();
if (!tableIterator.hasNext()) {

View File

@ -36,6 +36,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.CPUTimeMetricQueryRunner;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
@ -146,11 +147,12 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
// We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final DataSource dataSourceFromQuery = query.getDataSource();
final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
// Sanity check: make sure the query is based on the table we're meant to handle.
if (!analysis.getBaseTableDataSource().filter(ds -> dataSource.equals(ds.getName())).isPresent()) {
throw new ISE("Cannot handle datasource: %s", analysis.getDataSource());
throw new ISE("Cannot handle datasource: %s", dataSourceFromQuery);
}
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
@ -163,12 +165,12 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
// Make sure this query type can handle the subquery, if present.
if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) {
throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
if ((dataSourceFromQuery instanceof QueryDataSource) && !toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery())) {
throw new ISE("Cannot handle subquery: %s", dataSourceFromQuery);
}
// segmentMapFn maps each base Segment into a joined Segment if necessary.
final Function<SegmentReference, SegmentReference> segmentMapFn = analysis.getDataSource()
final Function<SegmentReference, SegmentReference> segmentMapFn = dataSourceFromQuery
.createSegmentMapFunction(
query,
cpuTimeAccumulator

View File

@ -360,11 +360,11 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
@VisibleForTesting
<T> DatasourceBundle getBundle(final Query<T> query)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
final TableDataSource table =
analysis.getBaseTableDataSource()
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", query.getDataSource()));
final DatasourceBundle bundle;

View File

@ -37,7 +37,6 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.LocatedSegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.http.security.DatasourceResourceFilter;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizationUtils;
@ -156,7 +155,7 @@ public class ClientInfoResource
}
final Optional<? extends TimelineLookup<String, ServerSelector>> maybeTimeline =
timelineServerView.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource(dataSourceName)));
timelineServerView.getTimeline((new TableDataSource(dataSourceName)).getAnalysis());
final Optional<Iterable<TimelineObjectHolder<String, ServerSelector>>> maybeServersLookup =
maybeTimeline.map(timeline -> timeline.lookup(theInterval));
if (!maybeServersLookup.isPresent() || Iterables.isEmpty(maybeServersLookup.get())) {

View File

@ -239,16 +239,17 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
*/
private <T> boolean canRunQueryUsingLocalWalker(Query<T> query)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final DataSource dataSourceFromQuery = query.getDataSource();
final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
// 1) Must be based on a concrete datasource that is not a table.
// 2) Must be based on globally available data (so we have a copy here on the Broker).
// 3) If there is an outer query, it must be handleable by the query toolchest (the local walker does not handle
// subqueries on its own).
return analysis.isConcreteBased() && !analysis.isConcreteTableBased() && analysis.isGlobal()
&& (!analysis.isQuery()
|| toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery()));
return analysis.isConcreteBased() && !analysis.isConcreteTableBased() && dataSourceFromQuery.isGlobal()
&& (!(dataSourceFromQuery instanceof QueryDataSource)
|| toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery()));
}
/**
@ -257,15 +258,16 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
*/
private <T> boolean canRunQueryUsingClusterWalker(Query<T> query)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final DataSource dataSourceFromQuery = query.getDataSource();
final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
// 1) Must be based on a concrete table (the only shape the Druid cluster can handle).
// 2) If there is an outer query, it must be handleable by the query toolchest (the cluster walker does not handle
// subqueries on its own).
return analysis.isConcreteTableBased()
&& (!analysis.isQuery()
|| toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery()));
&& (!(dataSourceFromQuery instanceof QueryDataSource)
|| toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery()));
}

View File

@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
@ -47,7 +48,7 @@ import java.util.stream.StreamSupport;
* Processor that computes Druid queries, single-threaded.
*
* The datasource for the query must satisfy {@link DataSourceAnalysis#isConcreteBased()} and
* {@link DataSourceAnalysis#isGlobal()}. Its base datasource must also be handleable by the provided
* {@link DataSource#isGlobal()}. Its base datasource must also be handleable by the provided
* {@link SegmentWrangler}.
*
* Mainly designed to be used by {@link ClientQuerySegmentWalker}.
@ -79,10 +80,11 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final DataSource dataSourceFromQuery = query.getDataSource();
final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
if (!analysis.isConcreteBased() || !analysis.isGlobal()) {
throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource());
if (!analysis.isConcreteBased() || !dataSourceFromQuery.isGlobal()) {
throw new IAE("Cannot query dataSource locally: %s", dataSourceFromQuery);
}
// wrap in ReferenceCountingSegment, these aren't currently managed by SegmentManager so reference tracking doesn't
@ -93,10 +95,11 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
final AtomicLong cpuAccumulator = new AtomicLong(0L);
final Function<SegmentReference, SegmentReference> segmentMapFn =
analysis
.getDataSource()
.createSegmentMapFunction(query, cpuAccumulator);
final Function<SegmentReference, SegmentReference> segmentMapFn = dataSourceFromQuery
.createSegmentMapFunction(
query,
cpuAccumulator
);
final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(query);

View File

@ -238,7 +238,7 @@ public class SegmentManager
private TableDataSource getTableDataSource(DataSourceAnalysis analysis)
{
return analysis.getBaseTableDataSource()
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getBaseDataSource()));
}
public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)

View File

@ -35,6 +35,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.CPUTimeMetricQueryRunner;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
@ -123,7 +124,7 @@ public class ServerManager implements QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline;
final Optional<VersionedIntervalTimeline<String, ReferenceCountingSegment>> maybeTimeline =
segmentManager.getTimeline(analysis);
@ -165,19 +166,20 @@ public class ServerManager implements QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
final DataSource dataSourceFromQuery = query.getDataSource();
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
final QueryUnsupportedException e = new QueryUnsupportedException(
StringUtils.format("Unknown query type, [%s]", query.getClass())
);
log.makeAlert(e, "Error while executing a query[%s]", query.getId())
.addData("dataSource", query.getDataSource())
.addData("dataSource", dataSourceFromQuery)
.emit();
throw e;
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline;
@ -185,8 +187,9 @@ public class ServerManager implements QuerySegmentWalker
segmentManager.getTimeline(analysis);
// Make sure this query type can handle the subquery, if present.
if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) {
throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
if ((dataSourceFromQuery instanceof QueryDataSource)
&& !toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery())) {
throw new ISE("Cannot handle subquery: %s", dataSourceFromQuery);
}
if (maybeTimeline.isPresent()) {
@ -195,11 +198,11 @@ public class ServerManager implements QuerySegmentWalker
return new ReportTimelineMissingSegmentQueryRunner<>(Lists.newArrayList(specs));
}
final Function<SegmentReference, SegmentReference> segmentMapFn =
query.getDataSource()
dataSourceFromQuery
.createSegmentMapFunction(query, cpuTimeAccumulator);
// We compute the datasource's cache key here itself so it doesn't need to be re-computed for every segment
final Optional<byte[]> cacheKeyPrefix = Optional.ofNullable(query.getDataSource().getCacheKey());
final Optional<byte[]> cacheKeyPrefix = Optional.ofNullable(dataSourceFromQuery.getCacheKey());
final FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
.create(specs)

View File

@ -40,7 +40,6 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
@ -110,7 +109,7 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view"))
(new TableDataSource("test_broker_server_view")).getAnalysis()
).get();
List<TimelineObjectHolder<String, ServerSelector>> serverLookupRes = timeline.lookup(intervals);
Assert.assertEquals(1, serverLookupRes.size());
@ -172,7 +171,7 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view"))
(new TableDataSource("test_broker_server_view")).getAnalysis()
).get();
assertValues(
Arrays.asList(
@ -195,7 +194,7 @@ public class BrokerServerViewTest extends CuratorTestBase
segmentRemovedLatch = new CountDownLatch(4);
timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view"))
(new TableDataSource("test_broker_server_view")).getAnalysis()
).get();
assertValues(
Arrays.asList(
@ -274,7 +273,7 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view"))
(new TableDataSource("test_broker_server_view")).getAnalysis()
).get();
assertValues(
@ -298,7 +297,7 @@ public class BrokerServerViewTest extends CuratorTestBase
segmentRemovedLatch = new CountDownLatch(5);
timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view"))
(new TableDataSource("test_broker_server_view")).getAnalysis()
).get();
// expect same set of segments as before
@ -354,7 +353,7 @@ public class BrokerServerViewTest extends CuratorTestBase
// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource()))
(new TableDataSource(segment1.getDataSource())).getAnalysis()
).get();
// Verify that the timeline has no entry for the interval of segment 1
@ -414,7 +413,7 @@ public class BrokerServerViewTest extends CuratorTestBase
// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource()))
(new TableDataSource(segment1.getDataSource())).getAnalysis()
).get();
// Verify that the timeline has no entry for the interval of segment 1
@ -476,7 +475,7 @@ public class BrokerServerViewTest extends CuratorTestBase
// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource()))
(new TableDataSource(segment1.getDataSource())).getAnalysis()
).get();
// Verify that the timeline has no entry for the interval of segment 1

View File

@ -140,7 +140,7 @@ public class SimpleServerView implements TimelineServerView
{
final TableDataSource table =
analysis.getBaseTableDataSource()
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getBaseDataSource()));
return Optional.ofNullable(timelines.get(table.getName()));
}

View File

@ -25,7 +25,6 @@ import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
@ -386,7 +385,7 @@ public class SegmentManagerTest
{
Assert.assertEquals(
Optional.empty(),
segmentManager.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource("nonExisting")))
segmentManager.getTimeline((new TableDataSource("nonExisting")).getAnalysis())
);
}

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Queries;
@ -92,7 +93,7 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
// Strange, but true. Required to get authentic behavior with UnionDataSources. (Although, it would be great if
// this wasn't required.)
return (queryPlus, responseContext) -> {
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryPlus.getQuery().getDataSource());
final DataSourceAnalysis analysis = queryPlus.getQuery().getDataSource().getAnalysis();
if (!analysis.isConcreteTableBased()) {
throw new ISE("Cannot handle datasource: %s", queryPlus.getQuery().getDataSource());
@ -112,15 +113,16 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
final DataSource dataSourceFromQuery = query.getDataSource();
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
if (!analysis.isConcreteTableBased()) {
throw new ISE("Cannot handle datasource: %s", query.getDataSource());
throw new ISE("Cannot handle datasource: %s", dataSourceFromQuery);
}
final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
@ -128,12 +130,12 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
// Make sure this query type can handle the subquery, if present.
if (analysis.isQuery()
&& !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) {
throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
if ((dataSourceFromQuery instanceof QueryDataSource)
&& !toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery())) {
throw new ISE("Cannot handle subquery: %s", dataSourceFromQuery);
}
final Function<SegmentReference, SegmentReference> segmentMapFn = analysis.getDataSource().createSegmentMapFunction(
final Function<SegmentReference, SegmentReference> segmentMapFn = dataSourceFromQuery.createSegmentMapFunction(
query,
new AtomicLong()
);

View File

@ -29,8 +29,10 @@ import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.client.cache.LocalCacheProvider;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.Pair;
@ -50,6 +52,7 @@ import org.apache.druid.query.Druids;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
@ -66,7 +69,6 @@ import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.search.SearchQuery;
import org.apache.druid.query.search.SearchResultValue;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
@ -140,7 +142,7 @@ public class ServerManagerTest
public void setUp()
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
NullHandling.initializeForTests();
queryWaitLatch = new CountDownLatch(1);
queryWaitYieldLatch = new CountDownLatch(1);
queryNotifyLatch = new CountDownLatch(1);
@ -469,6 +471,25 @@ public class ServerManagerTest
Assert.assertSame(NoopQueryRunner.class, queryRunner.getClass());
}
@Test(expected = ISE.class)
public void testGetQueryRunnerForSegmentsWhenTimelineIsMissingReportingMissingSegmentsOnQueryDataSource()
{
final Interval interval = Intervals.of("0000-01-01/P1D");
final SearchQuery query = searchQueryWithQueryDataSource("unknown_datasource", interval, Granularities.ALL);
final List<SegmentDescriptor> unknownSegments = Collections.singletonList(
new SegmentDescriptor(interval, "unknown_version", 0)
);
final QueryRunner<Result<SearchResultValue>> queryRunner = serverManager.getQueryRunnerForSegments(
query,
unknownSegments
);
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList();
Assert.assertTrue(results.isEmpty());
Assert.assertNotNull(responseContext.getMissingSegments());
Assert.assertEquals(unknownSegments, responseContext.getMissingSegments());
}
@Test
public void testGetQueryRunnerForSegmentsWhenTimelineIsMissingReportingMissingSegments()
{
@ -533,7 +554,7 @@ public class ServerManagerTest
final Interval interval = Intervals.of("P1d/2011-04-01");
final SearchQuery query = searchQuery("test", interval, Granularities.ALL);
final Optional<VersionedIntervalTimeline<String, ReferenceCountingSegment>> maybeTimeline = segmentManager
.getTimeline(DataSourceAnalysis.forDataSource(query.getDataSource()));
.getTimeline(query.getDataSource().getAnalysis());
Assert.assertTrue(maybeTimeline.isPresent());
final List<TimelineObjectHolder<String, ReferenceCountingSegment>> holders = maybeTimeline.get().lookup(interval);
final List<SegmentDescriptor> closedSegments = new ArrayList<>();
@ -638,6 +659,30 @@ public class ServerManagerTest
.build();
}
private SearchQuery searchQueryWithQueryDataSource(String datasource, Interval interval, Granularity granularity)
{
final ImmutableList<SegmentDescriptor> descriptors = ImmutableList.of(
new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0),
new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1)
);
return Druids.newSearchQueryBuilder()
.dataSource(
new QueryDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource(datasource)
.intervals(new MultipleSpecificSegmentSpec(descriptors))
.granularity(Granularities.ALL)
.build()
)
)
.intervals(Collections.singletonList(interval))
.granularity(granularity)
.limit(10000)
.query("wow")
.build();
}
private Future assertQueryable(
Granularity granularity,
String dataSource,

View File

@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.column.RowSignature;
@ -144,6 +145,12 @@ public class ExternalDataSource implements DataSource
return null;
}
@Override
public DataSourceAnalysis getAnalysis()
{
return new DataSourceAnalysis(this, null, null, Collections.emptyList());
}
@Override
public boolean equals(Object o)
{

View File

@ -65,7 +65,6 @@ import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.timeseries.TimeseriesQuery;
@ -842,7 +841,7 @@ public class DruidQuery
return true;
}
if (DataSourceAnalysis.forDataSource(dataSource).isConcreteTableBased()) {
if (dataSource.getAnalysis().isConcreteTableBased()) {
// Always OK: queries on concrete tables (regular Druid datasources) use segment-based storage adapters
// (IncrementalIndex or QueryableIndex). These clip query interval to data interval, making wide query
// intervals safer. They do not have special checks for granularity and interval safety.

View File

@ -45,7 +45,6 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.DimensionHandlerUtils;
@ -167,10 +166,10 @@ public class NativeQueryMaker implements QueryMaker
private List<Interval> findBaseDataSourceIntervals(Query<?> query)
{
return DataSourceAnalysis.forDataSource(query.getDataSource())
.getBaseQuerySegmentSpec()
.map(QuerySegmentSpec::getIntervals)
.orElseGet(query::getIntervals);
return query.getDataSource().getAnalysis()
.getBaseQuerySegmentSpec()
.map(QuerySegmentSpec::getIntervals)
.orElseGet(query::getIntervals);
}
@SuppressWarnings("unchecked")

View File

@ -47,7 +47,6 @@ import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@ -71,7 +70,6 @@ import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
@ -218,7 +216,7 @@ public class SegmentDataCacheConcurrencyTest extends SegmentMetadataCacheCommon
for (int i = 0; i < 1000; i++) {
boolean hasTimeline = exec.submit(
() -> serverView.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource(DATASOURCE)))
() -> serverView.getTimeline((new TableDataSource(DATASOURCE)).getAnalysis())
.isPresent()
).get(100, TimeUnit.MILLISECONDS);
Assert.assertTrue(hasTimeline);