SQL support for joins on subqueries. (#9545)

* SQL support for joins on subqueries.

Changes to SQL module:

- DruidJoinRule: Allow joins on subqueries (left/right are no longer
  required to be scans or mappings).
- DruidJoinRel: Add cost estimation code for joins on subqueries.
- DruidSemiJoinRule, DruidSemiJoinRel: Removed, since DruidJoinRule can
  handle this case now.
- DruidRel: Remove Nullable annotation from toDruidQuery, because
  it is no longer needed (it was used by DruidSemiJoinRel).
- Update Rules constants to reflect new rules available in our current
  version of Calcite. Some of these are useful for optimizing joins on
  subqueries.
- Rework cost estimation to be in terms of cost per row, and place all
  relevant constants in CostEstimates.

Other changes:

- RowBasedColumnSelectorFactory: Don't set hasMultipleValues. The lack
  of isComplete is enough to let callers know that columns might have
  multiple values, and explicitly setting it to true causes
  ExpressionSelectors to think it definitely has multiple values, and
  treat the inputs as arrays. This behavior interfered with some of the
  new tests that involved queries on lookups.
- QueryContexts: Add maxSubqueryRows parameter, and use it in druid-sql
  tests.

* Fixes for tests.

* Adjustments.
This commit is contained in:
Gian Merlino 2020-03-22 16:43:55 -07:00 committed by GitHub
parent 5f127a1829
commit 54c9325256
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1614 additions and 1162 deletions

View File

@ -1306,6 +1306,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.server.http.unannouncePropagationDelay`|How long to wait for zookeeper unannouncements to propagate before shutting down Jetty. This is a minimum and `druid.server.http.gracefulShutdownTimeout` does not start counting down until after this period elapses.|`PT0S` (do not wait)| |`druid.server.http.unannouncePropagationDelay`|How long to wait for zookeeper unannouncements to propagate before shutting down Jetty. This is a minimum and `druid.server.http.gracefulShutdownTimeout` does not start counting down until after this period elapses.|`PT0S` (do not wait)|
|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](../querying/query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE| |`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](../querying/query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE|
|`druid.server.http.maxRequestHeaderSize`|Maximum size of a request header in bytes. Larger headers consume more memory and can make a server more vulnerable to denial of service attacks.|8 * 1024| |`druid.server.http.maxRequestHeaderSize`|Maximum size of a request header in bytes. Larger headers consume more memory and can make a server more vulnerable to denial of service attacks.|8 * 1024|
|`druid.server.http.maxSubqueryRows`|Maximum number of rows from subqueries per query. These rows are stored in memory.|100000|
|`druid.server.http.enableForwardedRequestCustomizer`|If enabled, adds Jetty ForwardedRequestCustomizer which reads X-Forwarded-* request headers to manipulate servlet request object when Druid is used behind a proxy.|false| |`druid.server.http.enableForwardedRequestCustomizer`|If enabled, adds Jetty ForwardedRequestCustomizer which reads X-Forwarded-* request headers to manipulate servlet request object when Druid is used behind a proxy.|false|
#### Indexer Processing Resources #### Indexer Processing Resources
@ -1632,6 +1633,10 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC| |`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC|
|`druid.sql.planner.serializeComplexValues`|Whether to serialize "complex" output values, false will return the class name instead of the serialized value.|true| |`druid.sql.planner.serializeComplexValues`|Whether to serialize "complex" output values, false will return the class name instead of the serialized value.|true|
> Previous versions of Druid had properties named `druid.sql.planner.maxQueryCount` and `druid.sql.planner.maxSemiJoinRowsInMemory`.
> These properties are no longer available. Since Druid 0.18.0, you can use `druid.server.http.maxSubqueryRows` to control the maximum
> number of rows permitted across all subqueries.
#### Broker Caching #### Broker Caching
You can optionally only configure caching to be enabled on the Broker by setting caching configs here. You can optionally only configure caching to be enabled on the Broker by setting caching configs here.

View File

@ -938,8 +938,6 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|4| |`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|4|
|`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M| |`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M|
|`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true| |`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true|
|`druid.sql.planner.maxQueryCount`|Maximum number of queries to issue, including nested queries. Set to 1 to disable sub-queries, or set to 0 for unlimited.|8|
|`druid.sql.planner.maxSemiJoinRowsInMemory`|Maximum number of rows to keep in memory for executing two-stage semi-join queries like `SELECT * FROM Employee WHERE DeptName IN (SELECT DeptName FROM Dept)`.|100000|
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.md). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.md) instead.|100000| |`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.md). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.md) instead.|100000|
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M| |`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M|
|`druid.sql.planner.useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|true| |`druid.sql.planner.useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|true|
@ -949,6 +947,10 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.planner.metadataSegmentCacheEnable`|Whether to keep a cache of published segments in broker. If true, broker polls coordinator in background to get segments from metadata store and maintains a local cache. If false, coordinator's REST API will be invoked when broker needs published segments info.|false| |`druid.sql.planner.metadataSegmentCacheEnable`|Whether to keep a cache of published segments in broker. If true, broker polls coordinator in background to get segments from metadata store and maintains a local cache. If false, coordinator's REST API will be invoked when broker needs published segments info.|false|
|`druid.sql.planner.metadataSegmentPollPeriod`|How often to poll coordinator for published segments list if `druid.sql.planner.metadataSegmentCacheEnable` is set to true. Poll period is in milliseconds. |60000| |`druid.sql.planner.metadataSegmentPollPeriod`|How often to poll coordinator for published segments list if `druid.sql.planner.metadataSegmentCacheEnable` is set to true. Poll period is in milliseconds. |60000|
> Previous versions of Druid had properties named `druid.sql.planner.maxQueryCount` and `druid.sql.planner.maxSemiJoinRowsInMemory`.
> These properties are no longer available. Since Druid 0.18.0, you can use `druid.server.http.maxSubqueryRows` to control the maximum
> number of rows permitted across all subqueries.
## SQL Metrics ## SQL Metrics
Broker will emit the following metrics for SQL. Broker will emit the following metrics for SQL.

View File

@ -31,6 +31,7 @@ import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nullable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -116,6 +117,7 @@ public class InlineDataSource implements DataSource
return signature.getColumnNames(); return signature.getColumnNames();
} }
@Nullable
@JsonProperty @JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL) @JsonInclude(JsonInclude.Include.NON_NULL)
public List<ValueType> getColumnTypes() public List<ValueType> getColumnTypes()

View File

@ -131,8 +131,15 @@ public interface Query<T>
@Nullable @Nullable
String getId(); String getId();
/**
* Returns a copy of this query with a new subQueryId (see {@link #getSubQueryId()}.
*/
Query<T> withSubQueryId(String subQueryId); Query<T> withSubQueryId(String subQueryId);
/**
* Returns the subQueryId of this query. This is set by ClientQuerySegmentWalker (the entry point for the Broker's
* query stack) on any subqueries that it issues. It is null for the main query.
*/
@Nullable @Nullable
String getSubQueryId(); String getSubQueryId();

View File

@ -47,6 +47,7 @@ public class QueryContexts
public static final String BROKER_PARALLELISM = "parallelMergeParallelism"; public static final String BROKER_PARALLELISM = "parallelMergeParallelism";
public static final String VECTORIZE_KEY = "vectorize"; public static final String VECTORIZE_KEY = "vectorize";
public static final String VECTOR_SIZE_KEY = "vectorSize"; public static final String VECTOR_SIZE_KEY = "vectorSize";
public static final String MAX_SUBQUERY_ROWS_KEY = "maxSubqueryRows";
public static final String JOIN_FILTER_PUSH_DOWN_KEY = "enableJoinFilterPushDown"; public static final String JOIN_FILTER_PUSH_DOWN_KEY = "enableJoinFilterPushDown";
public static final String JOIN_FILTER_REWRITE_ENABLE_KEY = "enableJoinFilterRewrite"; public static final String JOIN_FILTER_REWRITE_ENABLE_KEY = "enableJoinFilterRewrite";
public static final String JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = "enableJoinFilterRewriteValueColumnFilters"; public static final String JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = "enableJoinFilterRewriteValueColumnFilters";
@ -188,6 +189,11 @@ public class QueryContexts
return parseInt(query, VECTOR_SIZE_KEY, defaultSize); return parseInt(query, VECTOR_SIZE_KEY, defaultSize);
} }
public static <T> int getMaxSubqueryRows(Query<T> query, int defaultSize)
{
return parseInt(query, MAX_SUBQUERY_ROWS_KEY, defaultSize);
}
public static <T> int getUncoveredIntervalsLimit(Query<T> query) public static <T> int getUncoveredIntervalsLimit(Query<T> query)
{ {
return getUncoveredIntervalsLimit(query, DEFAULT_UNCOVERED_INTERVALS_LIMIT); return getUncoveredIntervalsLimit(query, DEFAULT_UNCOVERED_INTERVALS_LIMIT);

View File

@ -101,14 +101,13 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
final ValueType valueType = rowSignature.getColumnType(columnName).orElse(null); final ValueType valueType = rowSignature.getColumnType(columnName).orElse(null);
// Do _not_ set isDictionaryEncoded or hasBitmapIndexes, because Row-based columns do not have those things. // Do _not_ set isDictionaryEncoded or hasBitmapIndexes, because Row-based columns do not have those things.
// Do set hasMultipleValues, because we might return multiple values. // Do not set hasMultipleValues, because even though we might return multiple values, setting it affirmatively
// causes expression selectors to always treat us as arrays. If we might have multiple values (i.e. if our type
// is nonnumeric), set isComplete false to compensate.
if (valueType != null) { if (valueType != null) {
return new ColumnCapabilitiesImpl() return new ColumnCapabilitiesImpl()
.setType(valueType) .setType(valueType)
// Non-numeric types might have multiple values
.setHasMultipleValues(!valueType.isNumeric())
// Numeric types should be reported as complete, but not STRING or COMPLEX (because we don't have full info) // Numeric types should be reported as complete, but not STRING or COMPLEX (because we don't have full info)
.setIsComplete(valueType.isNumeric()); .setIsComplete(valueType.isNumeric());
} else { } else {

View File

@ -137,7 +137,7 @@ public class LookupSegmentTest
// Note: the "k" column does not actually have multiple values, but the RowBasedStorageAdapter doesn't allow // Note: the "k" column does not actually have multiple values, but the RowBasedStorageAdapter doesn't allow
// reporting complete single-valued capabilities. It would be good to change this in the future, so query engines // reporting complete single-valued capabilities. It would be good to change this in the future, so query engines
// running on top of lookups can take advantage of singly-valued optimizations. // running on top of lookups can take advantage of singly-valued optimizations.
Assert.assertTrue(capabilities.hasMultipleValues()); Assert.assertFalse(capabilities.hasMultipleValues());
Assert.assertFalse(capabilities.isDictionaryEncoded()); Assert.assertFalse(capabilities.isDictionaryEncoded());
Assert.assertFalse(capabilities.isComplete()); Assert.assertFalse(capabilities.isComplete());
} }
@ -151,7 +151,7 @@ public class LookupSegmentTest
// reporting complete single-valued capabilities. It would be good to change this in the future, so query engines // reporting complete single-valued capabilities. It would be good to change this in the future, so query engines
// running on top of lookups can take advantage of singly-valued optimizations. // running on top of lookups can take advantage of singly-valued optimizations.
Assert.assertEquals(ValueType.STRING, capabilities.getType()); Assert.assertEquals(ValueType.STRING, capabilities.getType());
Assert.assertTrue(capabilities.hasMultipleValues()); Assert.assertFalse(capabilities.hasMultipleValues());
Assert.assertFalse(capabilities.isDictionaryEncoded()); Assert.assertFalse(capabilities.isDictionaryEncoded());
Assert.assertFalse(capabilities.isComplete()); Assert.assertFalse(capabilities.isComplete());
} }

View File

@ -366,10 +366,10 @@ public class RowBasedStorageAdapterTest
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.STRING.name()); final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.STRING.name());
Assert.assertEquals(ValueType.STRING, capabilities.getType()); Assert.assertEquals(ValueType.STRING, capabilities.getType());
// Note: unlike numeric types, STRING-typed columns report that they might have multiple values and that they // Note: unlike numeric types, STRING-typed columns might have multiple values, so they report as incomplete. It
// are incomplete. It would be good in the future to support some way of changing this, when it is known ahead // would be good in the future to support some way of changing this, when it is known ahead of time that
// of time that multi-valuedness is impossible. // multi-valuedness is definitely happening or is definitely impossible.
Assert.assertTrue(capabilities.hasMultipleValues()); Assert.assertFalse(capabilities.hasMultipleValues());
Assert.assertFalse(capabilities.isComplete()); Assert.assertFalse(capabilities.isComplete());
} }
@ -380,10 +380,9 @@ public class RowBasedStorageAdapterTest
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.COMPLEX.name()); final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.COMPLEX.name());
// Note: unlike numeric types, COMPLEX-typed columns report that they might have multiple values and that they // Note: unlike numeric types, COMPLEX-typed columns report that they are incomplete.
// are incomplete.
Assert.assertEquals(ValueType.COMPLEX, capabilities.getType()); Assert.assertEquals(ValueType.COMPLEX, capabilities.getType());
Assert.assertTrue(capabilities.hasMultipleValues()); Assert.assertFalse(capabilities.hasMultipleValues());
Assert.assertFalse(capabilities.isComplete()); Assert.assertFalse(capabilities.isComplete());
} }

View File

@ -100,7 +100,10 @@ import java.util.function.UnaryOperator;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* This is the class on the Broker that is responsible for making native Druid queries to a cluster of data servers.
* *
* The main user of this class is {@link org.apache.druid.server.ClientQuerySegmentWalker}. In tests, its behavior
* is partially mimicked by TestClusterQuerySegmentWalker.
*/ */
public class CachingClusteredClient implements QuerySegmentWalker public class CachingClusteredClient implements QuerySegmentWalker
{ {
@ -480,7 +483,10 @@ public class CachingClusteredClient implements QuerySegmentWalker
return Collections.emptyList(); return Collections.emptyList();
} }
final List<Pair<Interval, byte[]>> alreadyCachedResults = new ArrayList<>(); final List<Pair<Interval, byte[]>> alreadyCachedResults = new ArrayList<>();
Map<SegmentServerSelector, Cache.NamedKey> perSegmentCacheKeys = computePerSegmentCacheKeys(segments, queryCacheKey); Map<SegmentServerSelector, Cache.NamedKey> perSegmentCacheKeys = computePerSegmentCacheKeys(
segments,
queryCacheKey
);
// Pull cached segments from cache and remove from set of segments to query // Pull cached segments from cache and remove from set of segments to query
final Map<Cache.NamedKey, byte[]> cachedValues = computeCachedValues(perSegmentCacheKeys); final Map<Cache.NamedKey, byte[]> cachedValues = computeCachedValues(perSegmentCacheKeys);

View File

@ -35,6 +35,7 @@ import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.PostProcessingOperator; import org.apache.druid.query.PostProcessingOperator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunner;
@ -137,7 +138,14 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query); final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
// First, do an inlining dry run to see if any inlining is necessary, without actually running the queries. // First, do an inlining dry run to see if any inlining is necessary, without actually running the queries.
final DataSource inlineDryRun = inlineIfNecessary(query.getDataSource(), toolChest, new AtomicInteger(), true); final int maxSubqueryRows = QueryContexts.getMaxSubqueryRows(query, serverConfig.getMaxSubqueryRows());
final DataSource inlineDryRun = inlineIfNecessary(
query.getDataSource(),
toolChest,
new AtomicInteger(),
maxSubqueryRows,
true
);
if (!canRunQueryUsingClusterWalker(query.withDataSource(inlineDryRun)) if (!canRunQueryUsingClusterWalker(query.withDataSource(inlineDryRun))
&& !canRunQueryUsingLocalWalker(query.withDataSource(inlineDryRun))) { && !canRunQueryUsingLocalWalker(query.withDataSource(inlineDryRun))) {
@ -151,6 +159,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
query.getDataSource(), query.getDataSource(),
toolChest, toolChest,
new AtomicInteger(), new AtomicInteger(),
maxSubqueryRows,
false false
) )
); );
@ -245,6 +254,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final DataSource dataSource, final DataSource dataSource,
@Nullable final QueryToolChest toolChestIfOutermost, @Nullable final QueryToolChest toolChestIfOutermost,
final AtomicInteger subqueryRowLimitAccumulator, final AtomicInteger subqueryRowLimitAccumulator,
final int maxSubqueryRows,
final boolean dryRun final boolean dryRun
) )
{ {
@ -266,7 +276,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
assert !(current instanceof QueryDataSource); assert !(current instanceof QueryDataSource);
current = inlineIfNecessary(current, null, subqueryRowLimitAccumulator, dryRun); current = inlineIfNecessary(current, null, subqueryRowLimitAccumulator, maxSubqueryRows, dryRun);
while (!stack.isEmpty()) { while (!stack.isEmpty()) {
current = stack.pop().withChildren(Collections.singletonList(current)); current = stack.pop().withChildren(Collections.singletonList(current));
@ -279,7 +289,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
} else { } else {
// Something happened during inlining that means the toolchest is no longer able to handle this subquery. // Something happened during inlining that means the toolchest is no longer able to handle this subquery.
// We need to consider inlining it. // We need to consider inlining it.
return inlineIfNecessary(current, toolChestIfOutermost, subqueryRowLimitAccumulator, dryRun); return inlineIfNecessary(current, toolChestIfOutermost, subqueryRowLimitAccumulator, maxSubqueryRows, dryRun);
} }
} else if (canRunQueryUsingLocalWalker(subQuery) || canRunQueryUsingClusterWalker(subQuery)) { } else if (canRunQueryUsingLocalWalker(subQuery) || canRunQueryUsingClusterWalker(subQuery)) {
// Subquery needs to be inlined. Assign it a subquery id and run it. // Subquery needs to be inlined. Assign it a subquery id and run it.
@ -299,7 +309,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
queryResults, queryResults,
warehouse.getToolChest(subQueryWithId), warehouse.getToolChest(subQueryWithId),
subqueryRowLimitAccumulator, subqueryRowLimitAccumulator,
serverConfig.getMaxSubqueryRows() maxSubqueryRows
); );
} else { } else {
// Cannot inline subquery. Attempt to inline one level deeper, and then try again. // Cannot inline subquery. Attempt to inline one level deeper, and then try again.
@ -310,12 +320,14 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
Iterables.getOnlyElement(dataSource.getChildren()), Iterables.getOnlyElement(dataSource.getChildren()),
null, null,
subqueryRowLimitAccumulator, subqueryRowLimitAccumulator,
maxSubqueryRows,
dryRun dryRun
) )
) )
), ),
toolChestIfOutermost, toolChestIfOutermost,
subqueryRowLimitAccumulator, subqueryRowLimitAccumulator,
maxSubqueryRows,
dryRun dryRun
); );
} }
@ -324,7 +336,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
return dataSource.withChildren( return dataSource.withChildren(
dataSource.getChildren() dataSource.getChildren()
.stream() .stream()
.map(child -> inlineIfNecessary(child, null, subqueryRowLimitAccumulator, dryRun)) .map(child -> inlineIfNecessary(child, null, subqueryRowLimitAccumulator, maxSubqueryRows, dryRun))
.collect(Collectors.toList()) .collect(Collectors.toList())
); );
} }

View File

@ -75,6 +75,8 @@ import java.util.function.Function;
/** /**
* Query handler for Historical processes (see CliHistorical). * Query handler for Historical processes (see CliHistorical).
*
* In tests, this class's behavior is partially mimicked by TestClusterQuerySegmentWalker.
*/ */
public class ServerManager implements QuerySegmentWalker public class ServerManager implements QuerySegmentWalker
{ {

View File

@ -35,15 +35,9 @@ public class PlannerConfig
@JsonProperty @JsonProperty
private Period metadataRefreshPeriod = new Period("PT1M"); private Period metadataRefreshPeriod = new Period("PT1M");
@JsonProperty
private int maxSemiJoinRowsInMemory = 100000;
@JsonProperty @JsonProperty
private int maxTopNLimit = 100000; private int maxTopNLimit = 100000;
@JsonProperty
private int maxQueryCount = 8;
@JsonProperty @JsonProperty
private boolean useApproximateCountDistinct = true; private boolean useApproximateCountDistinct = true;
@ -82,21 +76,11 @@ public class PlannerConfig
return metadataRefreshPeriod; return metadataRefreshPeriod;
} }
public int getMaxSemiJoinRowsInMemory()
{
return maxSemiJoinRowsInMemory;
}
public int getMaxTopNLimit() public int getMaxTopNLimit()
{ {
return maxTopNLimit; return maxTopNLimit;
} }
public int getMaxQueryCount()
{
return maxQueryCount;
}
public boolean isUseApproximateCountDistinct() public boolean isUseApproximateCountDistinct()
{ {
return useApproximateCountDistinct; return useApproximateCountDistinct;
@ -135,9 +119,7 @@ public class PlannerConfig
final PlannerConfig newConfig = new PlannerConfig(); final PlannerConfig newConfig = new PlannerConfig();
newConfig.metadataRefreshPeriod = getMetadataRefreshPeriod(); newConfig.metadataRefreshPeriod = getMetadataRefreshPeriod();
newConfig.maxSemiJoinRowsInMemory = getMaxSemiJoinRowsInMemory();
newConfig.maxTopNLimit = getMaxTopNLimit(); newConfig.maxTopNLimit = getMaxTopNLimit();
newConfig.maxQueryCount = getMaxQueryCount();
newConfig.useApproximateCountDistinct = getContextBoolean( newConfig.useApproximateCountDistinct = getContextBoolean(
context, context,
CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT,
@ -185,9 +167,7 @@ public class PlannerConfig
return false; return false;
} }
final PlannerConfig that = (PlannerConfig) o; final PlannerConfig that = (PlannerConfig) o;
return maxSemiJoinRowsInMemory == that.maxSemiJoinRowsInMemory && return maxTopNLimit == that.maxTopNLimit &&
maxTopNLimit == that.maxTopNLimit &&
maxQueryCount == that.maxQueryCount &&
useApproximateCountDistinct == that.useApproximateCountDistinct && useApproximateCountDistinct == that.useApproximateCountDistinct &&
useApproximateTopN == that.useApproximateTopN && useApproximateTopN == that.useApproximateTopN &&
requireTimeCondition == that.requireTimeCondition && requireTimeCondition == that.requireTimeCondition &&
@ -205,9 +185,7 @@ public class PlannerConfig
return Objects.hash( return Objects.hash(
metadataRefreshPeriod, metadataRefreshPeriod,
maxSemiJoinRowsInMemory,
maxTopNLimit, maxTopNLimit,
maxQueryCount,
useApproximateCountDistinct, useApproximateCountDistinct,
useApproximateTopN, useApproximateTopN,
requireTimeCondition, requireTimeCondition,
@ -224,9 +202,7 @@ public class PlannerConfig
{ {
return "PlannerConfig{" + return "PlannerConfig{" +
"metadataRefreshPeriod=" + metadataRefreshPeriod + "metadataRefreshPeriod=" + metadataRefreshPeriod +
", maxSemiJoinRowsInMemory=" + maxSemiJoinRowsInMemory +
", maxTopNLimit=" + maxTopNLimit + ", maxTopNLimit=" + maxTopNLimit +
", maxQueryCount=" + maxQueryCount +
", useApproximateCountDistinct=" + useApproximateCountDistinct + ", useApproximateCountDistinct=" + useApproximateCountDistinct +
", useApproximateTopN=" + useApproximateTopN + ", useApproximateTopN=" + useApproximateTopN +
", requireTimeCondition=" + requireTimeCondition + ", requireTimeCondition=" + requireTimeCondition +

View File

@ -40,14 +40,18 @@ import org.apache.calcite.rel.rules.AggregateRemoveRule;
import org.apache.calcite.rel.rules.AggregateStarTableRule; import org.apache.calcite.rel.rules.AggregateStarTableRule;
import org.apache.calcite.rel.rules.AggregateValuesRule; import org.apache.calcite.rel.rules.AggregateValuesRule;
import org.apache.calcite.rel.rules.CalcRemoveRule; import org.apache.calcite.rel.rules.CalcRemoveRule;
import org.apache.calcite.rel.rules.ExchangeRemoveConstantKeysRule;
import org.apache.calcite.rel.rules.FilterAggregateTransposeRule; import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
import org.apache.calcite.rel.rules.FilterJoinRule; import org.apache.calcite.rel.rules.FilterJoinRule;
import org.apache.calcite.rel.rules.FilterMergeRule; import org.apache.calcite.rel.rules.FilterMergeRule;
import org.apache.calcite.rel.rules.FilterProjectTransposeRule; import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
import org.apache.calcite.rel.rules.FilterTableScanRule; import org.apache.calcite.rel.rules.FilterTableScanRule;
import org.apache.calcite.rel.rules.IntersectToDistinctRule;
import org.apache.calcite.rel.rules.JoinCommuteRule; import org.apache.calcite.rel.rules.JoinCommuteRule;
import org.apache.calcite.rel.rules.JoinProjectTransposeRule;
import org.apache.calcite.rel.rules.JoinPushExpressionsRule; import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
import org.apache.calcite.rel.rules.JoinPushThroughJoinRule; import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
import org.apache.calcite.rel.rules.MatchRule;
import org.apache.calcite.rel.rules.ProjectFilterTransposeRule; import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
import org.apache.calcite.rel.rules.ProjectMergeRule; import org.apache.calcite.rel.rules.ProjectMergeRule;
import org.apache.calcite.rel.rules.ProjectRemoveRule; import org.apache.calcite.rel.rules.ProjectRemoveRule;
@ -58,9 +62,9 @@ import org.apache.calcite.rel.rules.PruneEmptyRules;
import org.apache.calcite.rel.rules.ReduceExpressionsRule; import org.apache.calcite.rel.rules.ReduceExpressionsRule;
import org.apache.calcite.rel.rules.SortJoinTransposeRule; import org.apache.calcite.rel.rules.SortJoinTransposeRule;
import org.apache.calcite.rel.rules.SortProjectTransposeRule; import org.apache.calcite.rel.rules.SortProjectTransposeRule;
import org.apache.calcite.rel.rules.SortRemoveConstantKeysRule;
import org.apache.calcite.rel.rules.SortRemoveRule; import org.apache.calcite.rel.rules.SortRemoveRule;
import org.apache.calcite.rel.rules.SortUnionTransposeRule; import org.apache.calcite.rel.rules.SortUnionTransposeRule;
import org.apache.calcite.rel.rules.SubQueryRemoveRule;
import org.apache.calcite.rel.rules.TableScanRule; import org.apache.calcite.rel.rules.TableScanRule;
import org.apache.calcite.rel.rules.UnionMergeRule; import org.apache.calcite.rel.rules.UnionMergeRule;
import org.apache.calcite.rel.rules.UnionPullUpConstantsRule; import org.apache.calcite.rel.rules.UnionPullUpConstantsRule;
@ -74,7 +78,6 @@ import org.apache.calcite.tools.RelBuilder;
import org.apache.druid.sql.calcite.rel.QueryMaker; import org.apache.druid.sql.calcite.rel.QueryMaker;
import org.apache.druid.sql.calcite.rule.DruidRelToDruidRule; import org.apache.druid.sql.calcite.rule.DruidRelToDruidRule;
import org.apache.druid.sql.calcite.rule.DruidRules; import org.apache.druid.sql.calcite.rule.DruidRules;
import org.apache.druid.sql.calcite.rule.DruidSemiJoinRule;
import org.apache.druid.sql.calcite.rule.DruidTableScanRule; import org.apache.druid.sql.calcite.rule.DruidTableScanRule;
import org.apache.druid.sql.calcite.rule.ProjectAggregatePruneUnusedCallRule; import org.apache.druid.sql.calcite.rule.ProjectAggregatePruneUnusedCallRule;
import org.apache.druid.sql.calcite.rule.SortCollapseRule; import org.apache.druid.sql.calcite.rule.SortCollapseRule;
@ -86,9 +89,13 @@ public class Rules
public static final int DRUID_CONVENTION_RULES = 0; public static final int DRUID_CONVENTION_RULES = 0;
public static final int BINDABLE_CONVENTION_RULES = 1; public static final int BINDABLE_CONVENTION_RULES = 1;
// Rules from CalcitePrepareImpl's DEFAULT_RULES, minus AggregateExpandDistinctAggregatesRule // Rules from RelOptUtil's registerBaseRules, minus:
// and AggregateReduceFunctionsRule. //
private static final List<RelOptRule> DEFAULT_RULES = // 1) AggregateExpandDistinctAggregatesRule (it'll be added back later if approximate count distinct is disabled)
// 2) AggregateReduceFunctionsRule (it'll be added back for the Bindable rule set, but we don't want it for Druid
// rules since it expands AVG, STDDEV, VAR, etc, and we have aggregators specifically designed for those
// functions).
private static final List<RelOptRule> BASE_RULES =
ImmutableList.of( ImmutableList.of(
AggregateStarTableRule.INSTANCE, AggregateStarTableRule.INSTANCE,
AggregateStarTableRule.INSTANCE2, AggregateStarTableRule.INSTANCE2,
@ -99,40 +106,70 @@ public class Rules
FilterProjectTransposeRule.INSTANCE, FilterProjectTransposeRule.INSTANCE,
FilterJoinRule.FILTER_ON_JOIN, FilterJoinRule.FILTER_ON_JOIN,
JoinPushExpressionsRule.INSTANCE, JoinPushExpressionsRule.INSTANCE,
AggregateCaseToFilterRule.INSTANCE,
FilterAggregateTransposeRule.INSTANCE, FilterAggregateTransposeRule.INSTANCE,
ProjectWindowTransposeRule.INSTANCE, ProjectWindowTransposeRule.INSTANCE,
MatchRule.INSTANCE,
JoinCommuteRule.SWAP_OUTER, JoinCommuteRule.SWAP_OUTER,
JoinPushThroughJoinRule.RIGHT, JoinPushThroughJoinRule.RIGHT,
JoinPushThroughJoinRule.LEFT, JoinPushThroughJoinRule.LEFT,
SortProjectTransposeRule.INSTANCE, SortProjectTransposeRule.INSTANCE,
SortJoinTransposeRule.INSTANCE, SortJoinTransposeRule.INSTANCE,
SortUnionTransposeRule.INSTANCE SortRemoveConstantKeysRule.INSTANCE,
SortUnionTransposeRule.INSTANCE,
ExchangeRemoveConstantKeysRule.EXCHANGE_INSTANCE,
ExchangeRemoveConstantKeysRule.SORT_EXCHANGE_INSTANCE
); );
// Rules from CalcitePrepareImpl's createPlanner. // Rules for scanning via Bindable, embedded directly in RelOptUtil's registerDefaultRules.
private static final List<RelOptRule> MISCELLANEOUS_RULES = private static final List<RelOptRule> DEFAULT_BINDABLE_RULES =
ImmutableList.of( ImmutableList.of(
Bindables.BINDABLE_TABLE_SCAN_RULE, Bindables.BINDABLE_TABLE_SCAN_RULE,
ProjectTableScanRule.INSTANCE, ProjectTableScanRule.INSTANCE,
ProjectTableScanRule.INTERPRETER ProjectTableScanRule.INTERPRETER
); );
// Rules from CalcitePrepareImpl's CONSTANT_REDUCTION_RULES. // Rules from RelOptUtil's registerReductionRules.
private static final List<RelOptRule> CONSTANT_REDUCTION_RULES = private static final List<RelOptRule> REDUCTION_RULES =
ImmutableList.of( ImmutableList.of(
ReduceExpressionsRule.PROJECT_INSTANCE, ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,
ReduceExpressionsRule.JOIN_INSTANCE,
ReduceExpressionsRule.FILTER_INSTANCE, ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,
ReduceExpressionsRule.WINDOW_INSTANCE,
ReduceExpressionsRule.JOIN_INSTANCE,
ValuesReduceRule.FILTER_INSTANCE, ValuesReduceRule.FILTER_INSTANCE,
ValuesReduceRule.PROJECT_FILTER_INSTANCE, ValuesReduceRule.PROJECT_FILTER_INSTANCE,
ValuesReduceRule.PROJECT_INSTANCE, ValuesReduceRule.PROJECT_INSTANCE,
AggregateValuesRule.INSTANCE AggregateValuesRule.INSTANCE
); );
// Rules from VolcanoPlanner's registerAbstractRelationalRules, minus JoinCommuteRule since it's already // Rules from RelOptUtil's registerAbstractRules.
// in DEFAULT_RULES. // Omit DateRangeRules due to https://issues.apache.org/jira/browse/CALCITE-1601
private static final List<RelOptRule> VOLCANO_ABSTRACT_RULES = private static final List<RelOptRule> ABSTRACT_RULES =
ImmutableList.of(
AggregateProjectPullUpConstantsRule.INSTANCE2,
UnionPullUpConstantsRule.INSTANCE,
PruneEmptyRules.UNION_INSTANCE,
PruneEmptyRules.INTERSECT_INSTANCE,
PruneEmptyRules.MINUS_INSTANCE,
PruneEmptyRules.PROJECT_INSTANCE,
PruneEmptyRules.FILTER_INSTANCE,
PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.AGGREGATE_INSTANCE,
PruneEmptyRules.JOIN_LEFT_INSTANCE,
PruneEmptyRules.JOIN_RIGHT_INSTANCE,
PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE,
UnionMergeRule.INSTANCE,
UnionMergeRule.INTERSECT_INSTANCE,
UnionMergeRule.MINUS_INSTANCE,
ProjectToWindowRule.PROJECT,
FilterMergeRule.INSTANCE,
IntersectToDistinctRule.INSTANCE
);
// Rules from RelOptUtil's registerAbstractRelationalRules, except AggregateMergeRule. (It causes
// testDoubleNestedGroupBy2 to fail).
private static final List<RelOptRule> ABSTRACT_RELATIONAL_RULES =
ImmutableList.of( ImmutableList.of(
FilterJoinRule.FILTER_ON_JOIN, FilterJoinRule.FILTER_ON_JOIN,
FilterJoinRule.JOIN, FilterJoinRule.JOIN,
@ -146,30 +183,11 @@ public class Rules
SortRemoveRule.INSTANCE SortRemoveRule.INSTANCE
); );
// Rules from RelOptUtil's registerAbstractRels. // Rules that pull projections up above a join. This lets us eliminate some subqueries.
// Omit DateRangeRules due to https://issues.apache.org/jira/browse/CALCITE-1601 private static final List<RelOptRule> JOIN_PROJECT_TRANSPOSE_RULES =
private static final List<RelOptRule> RELOPTUTIL_ABSTRACT_RULES =
ImmutableList.of( ImmutableList.of(
AggregateProjectPullUpConstantsRule.INSTANCE2, JoinProjectTransposeRule.RIGHT_PROJECT,
UnionPullUpConstantsRule.INSTANCE, JoinProjectTransposeRule.LEFT_PROJECT
PruneEmptyRules.UNION_INSTANCE,
PruneEmptyRules.PROJECT_INSTANCE,
PruneEmptyRules.FILTER_INSTANCE,
PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.AGGREGATE_INSTANCE,
PruneEmptyRules.JOIN_LEFT_INSTANCE,
PruneEmptyRules.JOIN_RIGHT_INSTANCE,
PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE,
UnionMergeRule.INSTANCE,
ProjectToWindowRule.PROJECT,
FilterMergeRule.INSTANCE
);
private static final List<RelOptRule> SUB_QUERY_REMOVE_RULES =
ImmutableList.of(
SubQueryRemoveRule.PROJECT,
SubQueryRemoveRule.FILTER,
SubQueryRemoveRule.JOIN
); );
private Rules() private Rules()
@ -182,7 +200,8 @@ public class Rules
final Program hepProgram = final Program hepProgram =
Programs.sequence( Programs.sequence(
Programs.subQuery(DefaultRelMetadataProvider.INSTANCE), Programs.subQuery(DefaultRelMetadataProvider.INSTANCE),
new DecorrelateAndTrimFieldsProgram() new DecorrelateAndTrimFieldsProgram(),
Programs.hep(REDUCTION_RULES, true, DefaultRelMetadataProvider.INSTANCE)
); );
return ImmutableList.of( return ImmutableList.of(
Programs.sequence(hepProgram, Programs.ofRules(druidConventionRuleSet(plannerContext, queryMaker))), Programs.sequence(hepProgram, Programs.ofRules(druidConventionRuleSet(plannerContext, queryMaker))),
@ -201,10 +220,6 @@ public class Rules
.add(new DruidTableScanRule(queryMaker)) .add(new DruidTableScanRule(queryMaker))
.addAll(DruidRules.rules()); .addAll(DruidRules.rules());
if (plannerContext.getPlannerConfig().getMaxSemiJoinRowsInMemory() > 0) {
retVal.add(DruidSemiJoinRule.instance());
}
return retVal.build(); return retVal.build();
} }
@ -213,6 +228,7 @@ public class Rules
return ImmutableList.<RelOptRule>builder() return ImmutableList.<RelOptRule>builder()
.addAll(baseRuleSet(plannerContext)) .addAll(baseRuleSet(plannerContext))
.addAll(Bindables.RULES) .addAll(Bindables.RULES)
.addAll(DEFAULT_BINDABLE_RULES)
.add(AggregateReduceFunctionsRule.INSTANCE) .add(AggregateReduceFunctionsRule.INSTANCE)
.build(); .build();
} }
@ -223,12 +239,10 @@ public class Rules
final ImmutableList.Builder<RelOptRule> rules = ImmutableList.builder(); final ImmutableList.Builder<RelOptRule> rules = ImmutableList.builder();
// Calcite rules. // Calcite rules.
rules.addAll(DEFAULT_RULES); rules.addAll(BASE_RULES);
rules.addAll(MISCELLANEOUS_RULES); rules.addAll(ABSTRACT_RULES);
rules.addAll(CONSTANT_REDUCTION_RULES); rules.addAll(ABSTRACT_RELATIONAL_RULES);
rules.addAll(VOLCANO_ABSTRACT_RULES); rules.addAll(JOIN_PROJECT_TRANSPOSE_RULES);
rules.addAll(RELOPTUTIL_ABSTRACT_RULES);
rules.addAll(SUB_QUERY_REMOVE_RULES);
if (!plannerConfig.isUseApproximateCountDistinct()) { if (!plannerConfig.isUseApproximateCountDistinct()) {
// For some reason, even though we support grouping sets, using AggregateExpandDistinctAggregatesRule.INSTANCE // For some reason, even though we support grouping sets, using AggregateExpandDistinctAggregatesRule.INSTANCE
@ -236,8 +250,8 @@ public class Rules
rules.add(AggregateExpandDistinctAggregatesRule.JOIN); rules.add(AggregateExpandDistinctAggregatesRule.JOIN);
} }
// Rules that we wrote.
rules.add(SortCollapseRule.instance()); rules.add(SortCollapseRule.instance());
rules.add(AggregateCaseToFilterRule.INSTANCE);
rules.add(ProjectAggregatePruneUnusedCallRule.instance()); rules.add(ProjectAggregatePruneUnusedCallRule.instance());
return rules.build(); return rules.build();

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
/**
* Constants used by {@link PartialDruidQuery#estimateCost} and various
* {@link DruidRel#computeSelfCost(RelOptPlanner, RelMetadataQuery)} implementations.
*/
public class CostEstimates
{
/**
* Per-row base cost. This represents the cost of walking through every row, but not actually reading anything
* from them or computing any aggregations.
*/
static final double COST_BASE = 1;
/**
* Cost to read a value out of a column directly.
*/
static final double COST_COLUMN_READ = 0.05;
/**
* Cost to compute and read an expression.
*/
static final double COST_EXPRESSION = 0.25;
/**
* Cost to compute an aggregation.
*/
static final double COST_AGGREGATION = 0.05;
/**
* Cost per GROUP BY dimension.
*/
static final double COST_DIMENSION = 0.25;
/**
* Multiplier to apply when there is a WHERE filter. Encourages pushing down filters and limits through joins and
* subqueries when possible.
*/
static final double MULTIPLIER_FILTER = 0.1;
/**
* Multiplier to apply when there is an ORDER BY. Encourages avoiding them when possible.
*/
static final double MULTIPLIER_ORDER_BY = 10;
/**
* Multiplier to apply when there is a LIMIT. Encourages pushing down limits when possible.
*/
static final double MULTIPLIER_LIMIT = 0.5;
/**
* Multiplier to apply to an outer query via {@link DruidOuterQueryRel}. Encourages pushing down time-saving
* operations to the lowest level of the query stack, because they'll have bigger impact there.
*/
static final double MULTIPLIER_OUTER_QUERY = 0.1;
/**
* Multiplier to apply to a join when the left-hand side is a subquery. Encourages avoiding subqueries. Subqueries
* inside joins must be inlined, which incurs substantial reduction in scalability, so this high number is justified.
*/
static final double MULTIPLIER_JOIN_SUBQUERY = 1000000000;
private CostEstimates()
{
// No instantiation.
}
}

View File

@ -60,17 +60,38 @@ import java.util.stream.Collectors;
public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel> public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
{ {
private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__join__"); private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__join__");
private static final double COST_FACTOR = 100.0;
private final PartialDruidQuery partialQuery; private final PartialDruidQuery partialQuery;
private final Join joinRel; private final Join joinRel;
private RelNode left; private RelNode left;
private RelNode right; private RelNode right;
/**
* True if {@link #left} requires a subquery.
*
* This is useful to store in a variable because {@link #left} is sometimes not actually a {@link DruidRel} when
* {@link #computeSelfCost} is called. (It might be a {@link org.apache.calcite.plan.volcano.RelSubset}.)
*
* @see #computeLeftRequiresSubquery(DruidRel)
*/
private final boolean leftRequiresSubquery;
/**
* True if {@link #right} requires a subquery.
*
* This is useful to store in a variable because {@link #left} is sometimes not actually a {@link DruidRel} when
* {@link #computeSelfCost} is called. (It might be a {@link org.apache.calcite.plan.volcano.RelSubset}.)
*
* @see #computeLeftRequiresSubquery(DruidRel)
*/
private final boolean rightRequiresSubquery;
private DruidJoinQueryRel( private DruidJoinQueryRel(
RelOptCluster cluster, RelOptCluster cluster,
RelTraitSet traitSet, RelTraitSet traitSet,
Join joinRel, Join joinRel,
boolean leftRequiresSubquery,
boolean rightRequiresSubquery,
PartialDruidQuery partialQuery, PartialDruidQuery partialQuery,
QueryMaker queryMaker QueryMaker queryMaker
) )
@ -79,17 +100,28 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
this.joinRel = joinRel; this.joinRel = joinRel;
this.left = joinRel.getLeft(); this.left = joinRel.getLeft();
this.right = joinRel.getRight(); this.right = joinRel.getRight();
this.leftRequiresSubquery = leftRequiresSubquery;
this.rightRequiresSubquery = rightRequiresSubquery;
this.partialQuery = partialQuery; this.partialQuery = partialQuery;
} }
public static DruidJoinQueryRel create(final Join joinRel, final QueryMaker queryMaker) /**
* Create an instance from a Join that is based on two {@link DruidRel} inputs.
*/
public static DruidJoinQueryRel create(
final Join joinRel,
final DruidRel<?> left,
final DruidRel<?> right
)
{ {
return new DruidJoinQueryRel( return new DruidJoinQueryRel(
joinRel.getCluster(), joinRel.getCluster(),
joinRel.getTraitSet(), joinRel.getTraitSet(),
joinRel, joinRel,
computeLeftRequiresSubquery(left),
computeRightRequiresSubquery(right),
PartialDruidQuery.create(joinRel), PartialDruidQuery.create(joinRel),
queryMaker left.getQueryMaker()
); );
} }
@ -117,6 +149,8 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
getCluster(), getCluster(),
getTraitSet().plusAll(newQueryBuilder.getRelTraits()), getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
joinRel, joinRel,
leftRequiresSubquery,
rightRequiresSubquery,
newQueryBuilder, newQueryBuilder,
getQueryMaker() getQueryMaker()
); );
@ -141,18 +175,20 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
final RowSignature rightSignature = rightQuery.getOutputRowSignature(); final RowSignature rightSignature = rightQuery.getOutputRowSignature();
final DataSource rightDataSource; final DataSource rightDataSource;
// Left rel: allow direct embedding of scans/mappings including those of joins. if (computeLeftRequiresSubquery(leftDruidRel)) {
if (DruidRels.isScanOrMapping(leftDruidRel, true)) { assert leftRequiresSubquery;
leftDataSource = leftQuery.getDataSource();
} else {
leftDataSource = new QueryDataSource(leftQuery.getQuery()); leftDataSource = new QueryDataSource(leftQuery.getQuery());
} else {
assert !leftRequiresSubquery;
leftDataSource = leftQuery.getDataSource();
} }
// Right rel: allow direct embedding of scans/mappings, excluding joins (those must be done as subqueries). if (computeRightRequiresSubquery(rightDruidRel)) {
if (DruidRels.isScanOrMapping(rightDruidRel, false)) { assert rightRequiresSubquery;
rightDataSource = rightQuery.getDataSource();
} else {
rightDataSource = new QueryDataSource(rightQuery.getQuery()); rightDataSource = new QueryDataSource(rightQuery.getQuery());
} else {
assert !rightRequiresSubquery;
rightDataSource = rightQuery.getDataSource();
} }
final Pair<String, RowSignature> prefixSignaturePair = computeJoinRowSignature(leftSignature, rightSignature); final Pair<String, RowSignature> prefixSignaturePair = computeJoinRowSignature(leftSignature, rightSignature);
@ -214,6 +250,8 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
.map(input -> RelOptRule.convert(input, DruidConvention.instance())) .map(input -> RelOptRule.convert(input, DruidConvention.instance()))
.collect(Collectors.toList()) .collect(Collectors.toList())
), ),
leftRequiresSubquery,
rightRequiresSubquery,
partialQuery, partialQuery,
getQueryMaker() getQueryMaker()
); );
@ -252,6 +290,8 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
getCluster(), getCluster(),
traitSet, traitSet,
joinRel.copy(joinRel.getTraitSet(), inputs), joinRel.copy(joinRel.getTraitSet(), inputs),
leftRequiresSubquery,
rightRequiresSubquery,
getPartialDruidQuery(), getPartialDruidQuery(),
getQueryMaker() getQueryMaker()
); );
@ -297,9 +337,9 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq) public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq)
{ {
return planner.getCostFactory() return planner.getCostFactory()
.makeCost(mq.getRowCount(left), 0, 0) .makeCost(partialQuery.estimateCost(), 0, 0)
.plus(planner.getCostFactory().makeCost(mq.getRowCount(right), 0, 0)) .multiplyBy(leftRequiresSubquery ? CostEstimates.MULTIPLIER_JOIN_SUBQUERY : 1)
.multiplyBy(COST_FACTOR); .multiplyBy(rightRequiresSubquery ? CostEstimates.MULTIPLIER_JOIN_SUBQUERY : 1);
} }
private static JoinType toDruidJoinType(JoinRelType calciteJoinType) private static JoinType toDruidJoinType(JoinRelType calciteJoinType)
@ -318,6 +358,19 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
} }
} }
private static boolean computeLeftRequiresSubquery(final DruidRel<?> left)
{
// Left requires a subquery unless it's a scan or mapping on top of any table or a join.
return !DruidRels.isScanOrMapping(left, true);
}
private static boolean computeRightRequiresSubquery(final DruidRel<?> right)
{
// Right requires a subquery unless it's a scan or mapping on top of a global datasource.
return !(DruidRels.isScanOrMapping(right, false)
&& DruidRels.dataSourceIfLeafRel(right).filter(DataSource::isGlobal).isPresent());
}
/** /**
* Returns a Pair of "rightPrefix" (for JoinDataSource) and the signature of rows that will result from * Returns a Pair of "rightPrefix" (for JoinDataSource) and the signature of rows that will result from
* applying that prefix. * applying that prefix.

View File

@ -40,7 +40,6 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.table.RowSignatures; import org.apache.druid.sql.calcite.table.RowSignatures;
import javax.annotation.Nullable;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -120,16 +119,12 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
return 1 + ((DruidRel) sourceRel).getQueryCount(); return 1 + ((DruidRel) sourceRel).getQueryCount();
} }
@Nullable
@Override @Override
public DruidQuery toDruidQuery(final boolean finalizeAggregations) public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{ {
// Must finalize aggregations on subqueries. // Must finalize aggregations on subqueries.
final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery(true); final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery(true);
if (subQuery == null) {
return null;
}
final GroupByQuery groupByQuery = subQuery.toGroupByQuery(); final GroupByQuery groupByQuery = subQuery.toGroupByQuery();
if (groupByQuery == null) { if (groupByQuery == null) {
@ -234,6 +229,8 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
@Override @Override
public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq) public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq)
{ {
return planner.getCostFactory().makeCost(mq.getRowCount(sourceRel), 0, 0).multiplyBy(10); return planner.getCostFactory()
.makeCost(partialQuery.estimateCost(), 0, 0)
.multiplyBy(CostEstimates.MULTIPLIER_OUTER_QUERY);
} }
} }

View File

@ -34,7 +34,6 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.DruidTable;
import javax.annotation.Nonnull;
import java.util.Set; import java.util.Set;
/** /**
@ -42,15 +41,6 @@ import java.util.Set;
*/ */
public class DruidQueryRel extends DruidRel<DruidQueryRel> public class DruidQueryRel extends DruidRel<DruidQueryRel>
{ {
// Factors used for computing cost (see computeSelfCost). These are intended to encourage pushing down filters
// and limits through stacks of nested queries when possible.
private static final double COST_BASE = 1.0;
private static final double COST_PER_COLUMN = 0.001;
private static final double COST_FILTER_MULTIPLIER = 0.1;
private static final double COST_GROUPING_MULTIPLIER = 0.5;
private static final double COST_LIMIT_MULTIPLIER = 0.5;
private static final double COST_HAVING_MULTIPLIER = 5.0;
private final RelOptTable table; private final RelOptTable table;
private final DruidTable druidTable; private final DruidTable druidTable;
private final PartialDruidQuery partialQuery; private final PartialDruidQuery partialQuery;
@ -91,7 +81,6 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
} }
@Override @Override
@Nonnull
public DruidQuery toDruidQuery(final boolean finalizeAggregations) public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{ {
return partialQuery.build( return partialQuery.build(
@ -163,6 +152,11 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
return getQueryMaker().runQuery(toDruidQuery(false)); return getQueryMaker().runQuery(toDruidQuery(false));
} }
public DruidTable getDruidTable()
{
return druidTable;
}
@Override @Override
public RelOptTable getTable() public RelOptTable getTable()
{ {
@ -195,38 +189,6 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
@Override @Override
public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq) public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq)
{ {
double cost = COST_BASE; return planner.getCostFactory().makeCost(partialQuery.estimateCost(), 0, 0);
if (partialQuery.getSelectProject() != null) {
cost += COST_PER_COLUMN * partialQuery.getSelectProject().getChildExps().size();
}
if (partialQuery.getWhereFilter() != null) {
cost *= COST_FILTER_MULTIPLIER;
}
if (partialQuery.getAggregate() != null) {
cost *= COST_GROUPING_MULTIPLIER;
cost += COST_PER_COLUMN * partialQuery.getAggregate().getGroupSet().size();
cost += COST_PER_COLUMN * partialQuery.getAggregate().getAggCallList().size();
}
if (partialQuery.getAggregateProject() != null) {
cost += COST_PER_COLUMN * partialQuery.getAggregateProject().getChildExps().size();
}
if (partialQuery.getSort() != null && partialQuery.getSort().fetch != null) {
cost *= COST_LIMIT_MULTIPLIER;
}
if (partialQuery.getSortProject() != null) {
cost += COST_PER_COLUMN * partialQuery.getSortProject().getChildExps().size();
}
if (partialQuery.getHavingFilter() != null) {
cost *= COST_HAVING_MULTIPLIER;
}
return planner.getCostFactory().makeCost(cost, 0, 0);
} }
} }

View File

@ -68,30 +68,23 @@ public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode
} }
/** /**
* Convert this DruidRel to a DruidQuery. This may be an expensive operation. For example, DruidSemiJoin needs to * Convert this DruidRel to a DruidQuery. This must be an inexpensive operation, since it is done often throughout
* execute the right-hand side query in order to complete this method. * query planning.
* *
* This method may return null if it knows that this rel will yield an empty result set. * This method must not return null.
* *
* @param finalizeAggregations true if this query should include explicit finalization for all of its * @param finalizeAggregations true if this query should include explicit finalization for all of its
* aggregators, where required. Useful for subqueries where Druid's native query layer * aggregators, where required. Useful for subqueries where Druid's native query layer
* does not do this automatically. * does not do this automatically.
* *
* @return query, or null if it is known in advance that this rel will yield an empty result set.
*
* @throws CannotBuildQueryException * @throws CannotBuildQueryException
*/ */
@Nullable
public abstract DruidQuery toDruidQuery(boolean finalizeAggregations); public abstract DruidQuery toDruidQuery(boolean finalizeAggregations);
/** /**
* Convert this DruidRel to a DruidQuery for purposes of explaining. This must be an inexpensive operation. For * Convert this DruidRel to a DruidQuery for purposes of explaining. This must be an inexpensive operation.
* example, DruidSemiJoin will use a dummy dataSource in order to complete this method, rather than executing
* the right-hand side query.
* *
* This method may not return null. * This method must not return null.
*
* @return query
* *
* @throws CannotBuildQueryException * @throws CannotBuildQueryException
*/ */

View File

@ -1,392 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
/**
* DruidRel that has a main query, and also a subquery "right" that is used to filter the main query.
*/
public class DruidSemiJoin extends DruidRel<DruidSemiJoin>
{
private final List<RexNode> leftExpressions;
private final List<Integer> rightKeys;
private final int maxSemiJoinRowsInMemory;
private DruidRel<?> left;
private RelNode right;
private DruidSemiJoin(
final RelOptCluster cluster,
final RelTraitSet traitSet,
final DruidRel<?> left,
final RelNode right,
final List<RexNode> leftExpressions,
final List<Integer> rightKeys,
final int maxSemiJoinRowsInMemory,
final QueryMaker queryMaker
)
{
super(cluster, traitSet, queryMaker);
this.left = left;
this.right = right;
this.leftExpressions = ImmutableList.copyOf(leftExpressions);
this.rightKeys = ImmutableList.copyOf(rightKeys);
this.maxSemiJoinRowsInMemory = maxSemiJoinRowsInMemory;
}
public static DruidSemiJoin create(
final DruidRel left,
final DruidRel right,
final List<Integer> leftKeys,
final List<Integer> rightKeys,
final PlannerContext plannerContext
)
{
final ImmutableList.Builder<RexNode> listBuilder = ImmutableList.builder();
final PartialDruidQuery leftPartialQuery = left.getPartialDruidQuery();
if (leftPartialQuery.stage().compareTo(PartialDruidQuery.Stage.AGGREGATE) >= 0) {
throw new ISE("LHS must not be an Aggregate");
}
if (leftPartialQuery.getSelectProject() != null) {
for (int key : leftKeys) {
listBuilder.add(leftPartialQuery.getSelectProject().getChildExps().get(key));
}
} else {
for (int key : leftKeys) {
listBuilder.add(RexInputRef.of(key, leftPartialQuery.getRowType()));
}
}
return new DruidSemiJoin(
left.getCluster(),
left.getTraitSet(),
left,
right,
listBuilder.build(),
rightKeys,
plannerContext.getPlannerConfig().getMaxSemiJoinRowsInMemory(),
left.getQueryMaker()
);
}
@Override
public PartialDruidQuery getPartialDruidQuery()
{
return left.getPartialDruidQuery();
}
@Override
public DruidSemiJoin withPartialQuery(final PartialDruidQuery newQueryBuilder)
{
return new DruidSemiJoin(
getCluster(),
getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
left.withPartialQuery(newQueryBuilder),
right,
leftExpressions,
rightKeys,
maxSemiJoinRowsInMemory,
getQueryMaker()
);
}
@Nullable
@Override
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
final DruidRel rel = getLeftRelWithFilter();
return rel != null ? rel.toDruidQuery(finalizeAggregations) : null;
}
@Override
public DruidQuery toDruidQueryForExplaining()
{
return left.toDruidQueryForExplaining();
}
@Override
public DruidSemiJoin asDruidConvention()
{
return new DruidSemiJoin(
getCluster(),
getTraitSet().replace(DruidConvention.instance()),
left,
RelOptRule.convert(right, DruidConvention.instance()),
leftExpressions,
rightKeys,
maxSemiJoinRowsInMemory,
getQueryMaker()
);
}
@Override
public Set<String> getDataSourceNames()
{
final DruidRel<?> druidRight = (DruidRel) this.right;
Set<String> dataSourceNames = new LinkedHashSet<>();
dataSourceNames.addAll(left.getDataSourceNames());
dataSourceNames.addAll(druidRight.getDataSourceNames());
return dataSourceNames;
}
@Override
public int getQueryCount()
{
return left.getQueryCount() + ((DruidRel) right).getQueryCount();
}
@Override
public Sequence<Object[]> runQuery()
{
final DruidRel<?> rel = getLeftRelWithFilter();
if (rel != null) {
return rel.runQuery();
} else {
return Sequences.empty();
}
}
@Override
protected RelDataType deriveRowType()
{
return left.getRowType();
}
@Override
public List<RelNode> getInputs()
{
return ImmutableList.of(right);
}
@Override
public void replaceInput(int ordinalInParent, RelNode p)
{
if (ordinalInParent != 0) {
throw new IndexOutOfBoundsException(StringUtils.format("Invalid ordinalInParent[%s]", ordinalInParent));
}
// 'right' is the only one Calcite concerns. See getInputs().
this.right = p;
}
@Override
public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs)
{
return new DruidSemiJoin(
getCluster(),
getTraitSet(),
left,
Iterables.getOnlyElement(inputs),
leftExpressions,
rightKeys,
maxSemiJoinRowsInMemory,
getQueryMaker()
);
}
@Override
public RelWriter explainTerms(RelWriter pw)
{
final String queryString;
try {
queryString = getQueryMaker().getJsonMapper().writeValueAsString(toDruidQueryForExplaining().getQuery());
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return super.explainTerms(pw)
.input("right", right)
.item("query", queryString)
.item("leftExpressions", leftExpressions)
.item("rightKeys", rightKeys);
}
@Override
public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq)
{
return right.computeSelfCost(planner, mq).plus(left.computeSelfCost(planner, mq).multiplyBy(50));
}
/**
* Returns a copy of the left rel with the filter applied from the right-hand side. This is an expensive operation
* since it actually executes the right-hand side query.
*/
private DruidRel<?> getLeftRelWithFilter()
{
final DruidRel<?> druidRight = (DruidRel) this.right;
// Build list of acceptable values from right side.
final Set<List<String>> valuess = new HashSet<>();
final List<RexNode> conditions = druidRight.runQuery().accumulate(
new ArrayList<>(),
new Accumulator<List<RexNode>, Object[]>()
{
int numRows;
@Override
public List<RexNode> accumulate(final List<RexNode> theConditions, final Object[] row)
{
final List<String> values = new ArrayList<>(rightKeys.size());
for (int i : rightKeys) {
final Object value = row[i];
if (value == null) {
// NULLs are not supposed to match NULLs in a join. So ignore them.
continue;
}
final String stringValue = DimensionHandlerUtils.convertObjectToString(value);
values.add(stringValue);
}
if (valuess.add(values)) {
if (++numRows > maxSemiJoinRowsInMemory) {
throw new ResourceLimitExceededException(
StringUtils.format("maxSemiJoinRowsInMemory[%,d] exceeded", maxSemiJoinRowsInMemory)
);
}
final List<RexNode> subConditions = new ArrayList<>();
for (int i = 0; i < values.size(); i++) {
final String value = values.get(i);
// NULLs are not supposed to match NULLs in a join. So ignore them.
if (value != null) {
subConditions.add(
getCluster().getRexBuilder().makeCall(
SqlStdOperatorTable.EQUALS,
leftExpressions.get(i),
getCluster().getRexBuilder().makeLiteral(value)
)
);
}
theConditions.add(makeAnd(subConditions));
}
}
return theConditions;
}
}
);
valuess.clear();
if (!conditions.isEmpty()) {
// Add a filter to the left side.
final PartialDruidQuery leftPartialQuery = left.getPartialDruidQuery();
final Filter whereFilter = leftPartialQuery.getWhereFilter();
final Filter newWhereFilter;
if (whereFilter != null) {
newWhereFilter = whereFilter.copy(
whereFilter.getTraitSet(),
whereFilter.getInput(),
RexUtil.flatten(
getCluster().getRexBuilder(),
makeAnd(ImmutableList.of(whereFilter.getCondition(), makeOr(conditions)))
)
);
} else {
newWhereFilter = LogicalFilter.create(
leftPartialQuery.getScan(),
makeOr(conditions) // already in flattened form
);
}
PartialDruidQuery newPartialQuery = PartialDruidQuery.create(leftPartialQuery.getScan())
.withWhereFilter(newWhereFilter)
.withSelectProject(leftPartialQuery.getSelectProject());
if (leftPartialQuery.getAggregate() != null) {
newPartialQuery = newPartialQuery.withAggregate(leftPartialQuery.getAggregate());
}
if (leftPartialQuery.getHavingFilter() != null) {
newPartialQuery = newPartialQuery.withHavingFilter(leftPartialQuery.getHavingFilter());
}
if (leftPartialQuery.getAggregateProject() != null) {
newPartialQuery = newPartialQuery.withAggregateProject(leftPartialQuery.getAggregateProject());
}
if (leftPartialQuery.getSort() != null) {
newPartialQuery = newPartialQuery.withSort(leftPartialQuery.getSort());
}
if (leftPartialQuery.getSortProject() != null) {
newPartialQuery = newPartialQuery.withSortProject(leftPartialQuery.getSortProject());
}
return left.withPartialQuery(newPartialQuery);
} else {
return null;
}
}
private RexNode makeAnd(final List<RexNode> conditions)
{
if (conditions.size() == 1) {
return Iterables.getOnlyElement(conditions);
} else {
return getCluster().getRexBuilder().makeCall(SqlStdOperatorTable.AND, conditions);
}
}
private RexNode makeOr(final List<RexNode> conditions)
{
if (conditions.size() == 1) {
return Iterables.getOnlyElement(conditions);
} else {
return getCluster().getRexBuilder().makeCall(SqlStdOperatorTable.OR, conditions);
}
}
}

View File

@ -115,7 +115,6 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Nullable
@Override @Override
public DruidQuery toDruidQuery(final boolean finalizeAggregations) public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{ {
@ -195,7 +194,7 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
@Override @Override
public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq) public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq)
{ {
return planner.getCostFactory().makeCost(rels.stream().mapToDouble(mq::getRowCount).sum(), 0, 0); return planner.getCostFactory().makeCost(CostEstimates.COST_BASE, 0, 0);
} }
public int getLimit() public int getLimit()

View File

@ -32,6 +32,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil; import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.tools.RelBuilder;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.DataSource; import org.apache.druid.query.DataSource;
@ -396,6 +397,61 @@ public class PartialDruidQuery
} }
} }
/**
* Estimates the per-row cost of running this query.
*/
public double estimateCost()
{
double cost = CostEstimates.COST_BASE;
if (getSelectProject() != null) {
for (final RexNode rexNode : getSelectProject().getChildExps()) {
if (rexNode.isA(SqlKind.INPUT_REF)) {
cost += CostEstimates.COST_COLUMN_READ;
} else {
cost += CostEstimates.COST_EXPRESSION;
}
}
}
if (getWhereFilter() != null) {
// We assume filters are free and have a selectivity of CostEstimates.MULTIPLIER_FILTER. They aren't actually
// free, but we want to encourage filters, so let's go with it.
cost *= CostEstimates.MULTIPLIER_FILTER;
}
if (getAggregate() != null) {
if (getSelectProject() == null) {
// No projection before aggregation, that means the aggregate operator is reading things directly.
// Account for the costs.
cost += CostEstimates.COST_COLUMN_READ * getAggregate().getGroupSet().size();
}
cost += CostEstimates.COST_DIMENSION * getAggregate().getGroupSet().size();
cost += CostEstimates.COST_AGGREGATION * getAggregate().getAggCallList().size();
}
if (getSort() != null) {
if (!getSort().collation.getFieldCollations().isEmpty()) {
cost *= CostEstimates.MULTIPLIER_ORDER_BY;
}
if (getSort().fetch != null) {
cost *= CostEstimates.MULTIPLIER_LIMIT;
}
}
if (getAggregateProject() != null) {
cost += CostEstimates.COST_EXPRESSION * getAggregateProject().getChildExps().size();
}
if (getSortProject() != null) {
cost += CostEstimates.COST_EXPRESSION * getSortProject().getChildExps().size();
}
return cost;
}
private void validateStage(final Stage stage) private void validateStage(final Stage stage)
{ {
if (!canAccept(stage)) { if (!canAccept(stage)) {

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.math.expr.ExprType; import org.apache.druid.math.expr.ExprType;
@ -258,7 +259,12 @@ public class Projection
for (int i = 0; i < expressions.size(); i++) { for (int i = 0; i < expressions.size(); i++) {
final DruidExpression expression = expressions.get(i); final DruidExpression expression = expressions.get(i);
if (expression.isDirectColumnAccess()) {
final SqlTypeName sqlTypeName = project.getRowType().getFieldList().get(i).getType().getSqlTypeName();
if (expression.isDirectColumnAccess()
&& inputRowSignature.getColumnType(expression.getDirectColumn()).orElse(null)
== Calcites.getValueTypeForSqlTypeName(sqlTypeName)) {
// Refer to column directly when it's a direct access with matching type.
rowOrder.add(expression.getDirectColumn()); rowOrder.add(expression.getDirectColumn());
} else { } else {
final VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( final VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(

View File

@ -31,10 +31,8 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableBitSet;
import org.apache.druid.query.DataSource;
import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel; import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
import org.apache.druid.sql.calcite.rel.DruidRel; import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.DruidRels;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -64,21 +62,12 @@ public class DruidJoinRule extends RelOptRule
public boolean matches(RelOptRuleCall call) public boolean matches(RelOptRuleCall call)
{ {
final Join join = call.rel(0); final Join join = call.rel(0);
final DruidRel<?> left = call.rel(1);
final DruidRel<?> right = call.rel(2); final DruidRel<?> right = call.rel(2);
// 1) Condition must be handleable. // 1) Condition must be handleable.
// 2) Left must be a scan or a join. // 2) Right cannot be a join; we want to generate left-heavy trees.
// 3) If left is not a join, it must be concrete. return canHandleCondition(join.getCondition(), join.getLeft().getRowType())
// 4) Right must be a scan (and *cannot* be a join; we want to generate left-heavy trees). && !(right instanceof DruidJoinQueryRel);
// 5) Right must be global.
return
canHandleCondition(join.getCondition(), join.getLeft().getRowType())
&& DruidRels.isScanOrMapping(left, true)
&& DruidRels.isScanOrMapping(right, false)
&& (left instanceof DruidJoinQueryRel
|| DruidRels.dataSourceIfLeafRel(left).filter(DataSource::isConcrete).isPresent())
&& DruidRels.dataSourceIfLeafRel(right).filter(DataSource::isGlobal).isPresent();
} }
@Override @Override
@ -86,11 +75,10 @@ public class DruidJoinRule extends RelOptRule
{ {
final Join join = call.rel(0); final Join join = call.rel(0);
final DruidRel<?> left = call.rel(1); final DruidRel<?> left = call.rel(1);
final DruidRel<?> right = call.rel(2);
// Preconditions were already verified in "matches". // Preconditions were already verified in "matches".
call.transformTo( call.transformTo(DruidJoinQueryRel.create(join, left, right));
DruidJoinQueryRel.create(join, left.getQueryMaker())
);
} }
/** /**

View File

@ -1,179 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rule;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.DruidSemiJoin;
import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
/**
* Planner rule adapted from Calcite 1.11.0's SemiJoinRule.
*
* This rule identifies a JOIN where the right-hand side is being used like a filter. Requirements are:
*
* 1) Right-hand side is grouping on the join key
* 2) No fields from the right-hand side are selected
* 3) Join is INNER (right-hand side acting as filter) or LEFT (right-hand side can be ignored)
*
* This is used instead of Calcite's built in rule because that rule's un-doing of aggregation is unproductive (we'd
* just want to add it back again). Also, this rule operates on DruidRels.
*/
public class DruidSemiJoinRule extends RelOptRule
{
private static final Predicate<Join> IS_LEFT_OR_INNER =
join -> {
final JoinRelType joinType = join.getJoinType();
return joinType == JoinRelType.LEFT || joinType == JoinRelType.INNER;
};
private static final Predicate<DruidRel> IS_GROUP_BY = druidRel ->
druidRel.getPartialDruidQuery() != null && druidRel.getPartialDruidQuery().getAggregate() != null;
private static final DruidSemiJoinRule INSTANCE = new DruidSemiJoinRule();
private DruidSemiJoinRule()
{
super(
operand(
Project.class,
operand(
Join.class,
null,
IS_LEFT_OR_INNER,
some(
operand(
DruidRel.class,
null,
Predicates.and(DruidRules.CAN_BUILD_ON, Predicates.not(IS_GROUP_BY)),
any()
),
operand(DruidRel.class, null, IS_GROUP_BY, any())
)
)
)
);
}
public static DruidSemiJoinRule instance()
{
return INSTANCE;
}
@Override
public void onMatch(RelOptRuleCall call)
{
final Project project = call.rel(0);
final Join join = call.rel(1);
final DruidRel left = call.rel(2);
final DruidRel right = call.rel(3);
final ImmutableBitSet bits =
RelOptUtil.InputFinder.bits(project.getProjects(), null);
final ImmutableBitSet rightBits =
ImmutableBitSet.range(
left.getRowType().getFieldCount(),
join.getRowType().getFieldCount()
);
if (bits.intersects(rightBits)) {
return;
}
final JoinInfo joinInfo = join.analyzeCondition();
// Rule requires that aggregate key to be the same as the join key.
// By the way, neither a super-set nor a sub-set would work.
if (!joinInfo.isEqui() ||
joinInfo.rightSet().cardinality() != right.getPartialDruidQuery().getAggregate().getGroupCount()) {
return;
}
final PartialDruidQuery rightQuery = right.getPartialDruidQuery();
final Project rightProject = rightQuery.getSortProject() != null ?
rightQuery.getSortProject() :
rightQuery.getAggregateProject();
int i = 0;
for (int joinRef : joinInfo.rightSet()) {
final int aggregateRef;
if (rightProject == null) {
aggregateRef = joinRef;
} else {
final RexNode projectExp = rightProject.getChildExps().get(joinRef);
if (projectExp.isA(SqlKind.INPUT_REF)) {
aggregateRef = ((RexInputRef) projectExp).getIndex();
} else {
// Project expression is not part of the grouping key.
return;
}
}
if (aggregateRef != i++) {
return;
}
}
final RelBuilder relBuilder = call.builder();
if (join.getJoinType() == JoinRelType.LEFT) {
// Join can be eliminated since the right-hand side cannot have any effect (nothing is being selected,
// and LEFT means even if there is no match, a left-hand row will still be included).
relBuilder.push(left);
} else {
final DruidSemiJoin druidSemiJoin = DruidSemiJoin.create(
left,
right,
joinInfo.leftKeys,
joinInfo.rightKeys,
left.getPlannerContext()
);
// Check maxQueryCount.
final PlannerConfig plannerConfig = left.getPlannerContext().getPlannerConfig();
if (plannerConfig.getMaxQueryCount() > 0 && druidSemiJoin.getQueryCount() > plannerConfig.getMaxQueryCount()) {
return;
}
relBuilder.push(druidSemiJoin);
}
call.transformTo(
relBuilder
.project(project.getProjects(), project.getRowType().getFieldNames())
.build()
);
}
}

View File

@ -142,22 +142,6 @@ public class BaseCalciteQueryTest extends CalciteTestBase
return false; return false;
} }
}; };
public static final PlannerConfig PLANNER_CONFIG_SINGLE_NESTING_ONLY = new PlannerConfig()
{
@Override
public int getMaxQueryCount()
{
return 2;
}
};
public static final PlannerConfig PLANNER_CONFIG_NO_SUBQUERIES = new PlannerConfig()
{
@Override
public int getMaxQueryCount()
{
return 1;
}
};
public static final PlannerConfig PLANNER_CONFIG_LOS_ANGELES = new PlannerConfig() public static final PlannerConfig PLANNER_CONFIG_LOS_ANGELES = new PlannerConfig()
{ {
@Override @Override
@ -166,14 +150,6 @@ public class BaseCalciteQueryTest extends CalciteTestBase
return DateTimes.inferTzFromString("America/Los_Angeles"); return DateTimes.inferTzFromString("America/Los_Angeles");
} }
}; };
public static final PlannerConfig PLANNER_CONFIG_SEMI_JOIN_ROWS_LIMIT = new PlannerConfig()
{
@Override
public int getMaxSemiJoinRowsInMemory()
{
return 2;
}
};
public static final String DUMMY_SQL_ID = "dummy"; public static final String DUMMY_SQL_ID = "dummy";
public static final String LOS_ANGELES = "America/Los_Angeles"; public static final String LOS_ANGELES = "America/Los_Angeles";

View File

@ -602,14 +602,13 @@ public class CalciteParameterQueryTest extends BaseCalciteQueryTest
and( and(
bound("l1", "3", null, true, false, null, StringComparators.NUMERIC), bound("l1", "3", null, true, false, null, StringComparators.NUMERIC),
selector("f1", useDefault ? "0.0" : null, null) selector("f1", useDefault ? "0.0" : null, null)
) )
) )
.aggregators(aggregators(new CountAggregatorFactory("a0"))) .aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(TIMESERIES_CONTEXT_DEFAULT) .context(TIMESERIES_CONTEXT_DEFAULT)
.build() .build()
) : ImmutableList.of(), ) : ImmutableList.of(),
useDefault ? ImmutableList.of() : ImmutableList.of(new Object[]{0L}), ImmutableList.of(),
ImmutableList.of(new SqlParameter(SqlType.BIGINT, 3L), new SqlParameter(SqlType.VARCHAR, "wat")) ImmutableList.of(new SqlParameter(SqlType.BIGINT, 3L), new SqlParameter(SqlType.VARCHAR, "wat"))
); );
} }

View File

@ -28,7 +28,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Key; import com.google.inject.Key;
import com.google.inject.Module;
import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.BrokerSegmentWatcherConfig;
@ -207,7 +206,7 @@ public class CalciteTests
private static final String TIMESTAMP_COLUMN = "t"; private static final String TIMESTAMP_COLUMN = "t";
private static final Injector INJECTOR = Guice.createInjector( private static final Injector INJECTOR = Guice.createInjector(
(Module) binder -> { binder -> {
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper()); binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
// This Module is just to get a LookupExtractorFactoryContainerProvider with a usable "lookyloo" lookup. // This Module is just to get a LookupExtractorFactoryContainerProvider with a usable "lookyloo" lookup.
@ -217,7 +216,8 @@ public class CalciteTests
ImmutableMap.of( ImmutableMap.of(
"a", "xa", "a", "xa",
"abc", "xabc", "abc", "xabc",
"nosuchkey", "mysteryvalue" "nosuchkey", "mysteryvalue",
"6", "x6"
) )
); );
binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(lookupProvider); binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(lookupProvider);

View File

@ -32,11 +32,14 @@ import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer; import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.InlineSegmentWrangler;
import org.apache.druid.segment.LookupSegmentWrangler;
import org.apache.druid.segment.MapSegmentWrangler; import org.apache.druid.segment.MapSegmentWrangler;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.join.InlineJoinableFactory; import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.LookupJoinableFactory; import org.apache.druid.segment.join.LookupJoinableFactory;
@ -107,7 +110,12 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
), ),
QueryStackTests.createLocalQuerySegmentWalker( QueryStackTests.createLocalQuerySegmentWalker(
conglomerate, conglomerate,
new MapSegmentWrangler(ImmutableMap.of()), new MapSegmentWrangler(
ImmutableMap.<Class<? extends DataSource>, SegmentWrangler>builder()
.put(InlineDataSource.class, new InlineSegmentWrangler())
.put(LookupDataSource.class, new LookupSegmentWrangler(lookupProvider))
.build()
),
joinableFactoryToUse joinableFactoryToUse
), ),
conglomerate, conglomerate,