SQL: Avoid "intervals" for non-table-based datasources. (#14336)

In these other cases, stick to plain "filter". This simplifies lots of
logic downstream, and doesn't hurt since we don't have intervals-specific
optimizations outside of tables.

Fixes an issue where we couldn't properly filter on a column from an
external datasource if it was named __time.
This commit is contained in:
Gian Merlino 2023-06-28 21:27:11 -07:00 committed by GitHub
parent c798d3fb2e
commit a6cabbe10f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 127 additions and 73 deletions

View File

@ -512,8 +512,11 @@ public class DataSourcePlan
* interval {@link Intervals#ETERNITY}. If not, throw an {@link UnsupportedOperationException}.
*
* Anywhere this appears is a place that we do not support using the "intervals" parameter of a query
* (i.e., {@link org.apache.druid.query.BaseQuery#getQuerySegmentSpec()}) for time filtering. Ideally,
* we'd support this everywhere it appears, but we can get away without it for now.
* (i.e., {@link org.apache.druid.query.BaseQuery#getQuerySegmentSpec()}) for time filtering.
*
* We don't need to support this for anything that is not {@link DataSourceAnalysis#isTableBased()}, because
* the SQL layer avoids "intervals" in other cases. See
* {@link org.apache.druid.sql.calcite.rel.DruidQuery#canUseIntervalFiltering(DataSource)}.
*/
private static void checkQuerySegmentSpecIsEternity(
final DataSource dataSource,

View File

@ -173,7 +173,9 @@ public class MSQInsertTest extends MSQTestBase
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"__time\", \"type\": \"long\"}, {\"name\": \"flags\", \"type\": \"string\"}]'\n"
+ " )\n"
+ ") PARTITIONED BY day")
+ ")\n"
+ "WHERE __time > TIMESTAMP '1999-01-01 00:00:00'\n"
+ "PARTITIONED BY day")
.setQueryContext(context)
.setExpectedResultRows(expectedRows)
.setExpectedDataSource("foo1")

View File

@ -96,7 +96,7 @@ public interface DataSource
* query stack. For example, {@link QueryDataSource} must be executed first and substituted with its results.
*
* @see DataSourceAnalysis#isConcreteBased() which uses this
* @see DataSourceAnalysis#isConcreteTableBased() which uses this
* @see DataSourceAnalysis#isConcreteAndTableBased() which uses this
*/
boolean isConcrete();

View File

@ -52,7 +52,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
if (analysis.isConcreteTableBased() && analysis.getBaseUnionDataSource().isPresent()) {
if (analysis.isConcreteAndTableBased() && analysis.getBaseUnionDataSource().isPresent()) {
// Union of tables.
final UnionDataSource unionDataSource = analysis.getBaseUnionDataSource().get();

View File

@ -110,7 +110,7 @@ public class DataSourceAnalysis
/**
* If {@link #getBaseDataSource()} is a {@link TableDataSource}, returns it. Otherwise, returns an empty Optional.
*
* Note that this can return empty even if {@link #isConcreteTableBased()} is true. This happens if the base
* Note that this can return empty even if {@link #isConcreteAndTableBased()} is true. This happens if the base
* datasource is a {@link UnionDataSource} of {@link TableDataSource}.
*/
public Optional<TableDataSource> getBaseTableDataSource()
@ -175,6 +175,7 @@ public class DataSourceAnalysis
* Else this method creates a new analysis object with the base query provided in the input
*
* @param query the query to add to the analysis if the baseQuery is null
*
* @return the existing analysis if it has non-null basequery, else a new one with the updated base query
*/
public DataSourceAnalysis maybeWithBaseQuery(Query<?> query)
@ -204,25 +205,38 @@ public class DataSourceAnalysis
}
/**
* Returns true if this datasource is concrete-based (see {@link #isConcreteBased()}, and the base datasource is a
* {@link TableDataSource} or a {@link UnionDataSource} composed entirely of {@link TableDataSource}
* or an {@link UnnestDataSource} composed entirely of {@link TableDataSource} . This is an
* important property, because it corresponds to datasources that can be handled by Druid's distributed query stack.
* Returns whether this datasource is one of:
*
* <ul>
* <li>{@link TableDataSource}</li>
* <li>{@link UnionDataSource} composed entirely of {@link TableDataSource}</li>
* <li>{@link UnnestDataSource} composed entirely of {@link TableDataSource}</li>
* </ul>
*/
public boolean isConcreteTableBased()
public boolean isTableBased()
{
return (baseDataSource instanceof TableDataSource
|| (baseDataSource instanceof UnionDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds instanceof TableDataSource))
|| (baseDataSource instanceof UnnestDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds instanceof TableDataSource)));
}
/**
* Returns true if this datasource is both (see {@link #isConcreteBased()} and {@link #isTableBased()}.
* This is an important property, because it corresponds to datasources that can be handled by Druid's distributed
* query stack.
*/
public boolean isConcreteAndTableBased()
{
// At the time of writing this comment, UnionDataSource children are required to be tables, so the instanceof
// check is redundant. But in the future, we will likely want to support unions of things other than tables,
// so check anyway for future-proofing.
return isConcreteBased() && (baseDataSource instanceof TableDataSource
|| (baseDataSource instanceof UnionDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds instanceof TableDataSource))
|| (baseDataSource instanceof UnnestDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds instanceof TableDataSource)));
return isConcreteBased() && isTableBased();
}
/**

View File

@ -65,7 +65,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = TABLE_FOO.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -82,7 +83,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = unionDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.of(unionDataSource), analysis.getBaseUnionDataSource());
@ -99,7 +101,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = queryDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -120,7 +123,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = queryDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.of(unionDataSource), analysis.getBaseUnionDataSource());
@ -139,7 +143,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = LOOKUP_LOOKYLOO.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isTableBased());
Assert.assertFalse(analysis.isConcreteAndTableBased());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -156,7 +161,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = queryDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isTableBased());
Assert.assertFalse(analysis.isConcreteAndTableBased());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -175,7 +181,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = INLINE.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isTableBased());
Assert.assertFalse(analysis.isConcreteAndTableBased());
Assert.assertEquals(INLINE, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -212,7 +219,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getJoinBaseTableFilter());
@ -256,7 +264,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(TrueDimFilter.instance(), analysis.getJoinBaseTableFilter().orElse(null));
@ -307,7 +316,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getJoinBaseTableFilter());
@ -351,7 +361,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
Assert.assertEquals(TrueDimFilter.instance(), analysis.getJoinBaseTableFilter().orElse(null));
@ -381,7 +392,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertFalse(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(TrueDimFilter.instance(), analysis.getJoinBaseTableFilter().orElse(null));
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
@ -409,7 +421,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getJoinBaseTableFilter());
Assert.assertEquals(Optional.of(unionDataSource), analysis.getBaseUnionDataSource());
@ -444,7 +457,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = queryDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(TrueDimFilter.instance(), analysis.getJoinBaseTableFilter().orElse(null));
Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource());
@ -489,7 +503,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isTableBased());
Assert.assertFalse(analysis.isConcreteAndTableBased());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@ -518,7 +533,8 @@ public class DataSourceAnalysisTest
final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertFalse(analysis.isTableBased());
Assert.assertFalse(analysis.isConcreteAndTableBased());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());

View File

@ -272,7 +272,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
// 2) Must be based on globally available data (so we have a copy here on the Broker).
// 3) If there is an outer query, it must be handleable by the query toolchest (the local walker does not handle
// subqueries on its own).
return analysis.isConcreteBased() && !analysis.isConcreteTableBased() && dataSourceFromQuery.isGlobal()
return analysis.isConcreteBased() && !analysis.isConcreteAndTableBased() && dataSourceFromQuery.isGlobal()
&& (!(dataSourceFromQuery instanceof QueryDataSource)
|| toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery()));
}
@ -290,7 +290,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
// 1) Must be based on a concrete table (the only shape the Druid cluster can handle).
// 2) If there is an outer query, it must be handleable by the query toolchest (the cluster walker does not handle
// subqueries on its own).
return analysis.isConcreteTableBased()
return analysis.isConcreteAndTableBased()
&& (!(dataSourceFromQuery instanceof QueryDataSource)
|| toolChest.canPerformSubquery(((QueryDataSource) dataSourceFromQuery).getQuery()));
}

View File

@ -95,7 +95,7 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
return (queryPlus, responseContext) -> {
final DataSourceAnalysis analysis = queryPlus.getQuery().getDataSource().getAnalysis();
if (!analysis.isConcreteTableBased()) {
if (!analysis.isConcreteAndTableBased()) {
throw new ISE("Cannot handle datasource: %s", queryPlus.getQuery().getDataSource());
}
@ -121,7 +121,7 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
if (!analysis.isConcreteTableBased()) {
if (!analysis.isConcreteAndTableBased()) {
throw new ISE("Cannot handle datasource: %s", dataSourceFromQuery);
}

View File

@ -774,7 +774,6 @@ public class DruidQuery
* Returns a pair of DataSource and Filtration object created on the query filter. In case the, data source is
* a join datasource, the datasource may be altered and left filter of join datasource may
* be rid of time filters.
* TODO: should we optimize the base table filter just like we do with query filters
*/
@VisibleForTesting
static Pair<DataSource, Filtration> getFiltration(
@ -784,40 +783,58 @@ public class DruidQuery
JoinableFactoryWrapper joinableFactoryWrapper
)
{
if (!(dataSource instanceof JoinDataSource)) {
return Pair.of(dataSource, toFiltration(filter, virtualColumnRegistry));
if (!canUseIntervalFiltering(dataSource)) {
return Pair.of(dataSource, toFiltration(filter, virtualColumnRegistry.getFullRowSignature(), false));
} else if (dataSource instanceof JoinDataSource && ((JoinDataSource) dataSource).getLeftFilter() != null) {
final JoinDataSource joinDataSource = (JoinDataSource) dataSource;
// If the join is left or inner, we can pull the intervals up to the query. This is done
// so that broker can prune the segments to query.
final Filtration leftFiltration = Filtration.create(joinDataSource.getLeftFilter())
.optimize(virtualColumnRegistry.getFullRowSignature());
// Adds the intervals from the join left filter to query filtration
final Filtration queryFiltration = Filtration.create(filter, leftFiltration.getIntervals())
.optimize(virtualColumnRegistry.getFullRowSignature());
final JoinDataSource newDataSource = JoinDataSource.create(
joinDataSource.getLeft(),
joinDataSource.getRight(),
joinDataSource.getRightPrefix(),
joinDataSource.getConditionAnalysis(),
joinDataSource.getJoinType(),
leftFiltration.getDimFilter(),
joinableFactoryWrapper
);
return Pair.of(newDataSource, queryFiltration);
} else {
return Pair.of(dataSource, toFiltration(filter, virtualColumnRegistry.getFullRowSignature(), true));
}
JoinDataSource joinDataSource = (JoinDataSource) dataSource;
if (joinDataSource.getLeftFilter() == null) {
return Pair.of(dataSource, toFiltration(filter, virtualColumnRegistry));
}
//TODO: We should avoid promoting the time filter as interval for right outer and full outer joins. This is not
// done now as we apply the intervals to left base table today irrespective of the join type.
// If the join is left or inner, we can pull the intervals up to the query. This is done
// so that broker can prune the segments to query.
Filtration leftFiltration = Filtration.create(joinDataSource.getLeftFilter())
.optimize(virtualColumnRegistry.getFullRowSignature());
// Adds the intervals from the join left filter to query filtration
Filtration queryFiltration = Filtration.create(filter, leftFiltration.getIntervals())
.optimize(virtualColumnRegistry.getFullRowSignature());
JoinDataSource newDataSource = JoinDataSource.create(
joinDataSource.getLeft(),
joinDataSource.getRight(),
joinDataSource.getRightPrefix(),
joinDataSource.getConditionAnalysis(),
joinDataSource.getJoinType(),
leftFiltration.getDimFilter(),
joinableFactoryWrapper
);
return Pair.of(newDataSource, queryFiltration);
}
private static Filtration toFiltration(DimFilter filter, VirtualColumnRegistry virtualColumnRegistry)
/**
* Whether the given datasource can make use of "intervals" based filtering. The is true for anything based on
* regular tables ({@link org.apache.druid.query.TableDataSource}).
*/
private static boolean canUseIntervalFiltering(final DataSource dataSource)
{
return Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
return dataSource.getAnalysis().isTableBased();
}
private static Filtration toFiltration(
final DimFilter filter,
final RowSignature rowSignature,
final boolean useIntervals
)
{
final Filtration filtration = Filtration.create(filter);
if (useIntervals) {
return filtration.optimize(rowSignature);
} else {
return filtration.optimizeFilterOnly(rowSignature);
}
}
/**
@ -837,7 +854,7 @@ public class DruidQuery
return true;
}
if (dataSource.getAnalysis().isConcreteTableBased()) {
if (dataSource.getAnalysis().isConcreteAndTableBased()) {
// Always OK: queries on concrete tables (regular Druid datasources) use segment-based storage adapters
// (IncrementalIndex or QueryableIndex). These clip query interval to data interval, making wide query
// intervals safer. They do not have special checks for granularity and interval safety.
@ -1430,7 +1447,9 @@ public class DruidQuery
}
if (!plannerContext.featureAvailable(EngineFeature.SCAN_ORDER_BY_NON_TIME) && !orderByColumns.isEmpty()) {
if (orderByColumns.size() > 1 || !ColumnHolder.TIME_COLUMN_NAME.equals(orderByColumns.get(0).getColumnName())) {
if (orderByColumns.size() > 1
|| orderByColumns.stream()
.anyMatch(orderBy -> !orderBy.getColumnName().equals(ColumnHolder.TIME_COLUMN_NAME))) {
// We cannot handle this ordering, but we encounter this ordering as part of the exploration of the volcano
// planner, which means that the query that we are looking right now might only be doing this as one of the
// potential branches of exploration rather than being a semantic requirement of the query itself. So, it is