From ef45a1551e96dc1a0528bfdba2b29e97fe1d1a42 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 22 Mar 2022 18:33:57 +0530 Subject: [PATCH] Convert inQueryThreshold into query context parameter. (#12357) Added Calcites InQueryThreshold as a query context parameter. Setting this parameter appropriately reduces the time taken for queries with large number of values in their IN conditions. --- docs/querying/query-context.md | 1 + .../org/apache/druid/query/QueryContexts.java | 18 +++++++ .../apache/druid/query/QueryContextsTest.java | 7 +++ .../sql/calcite/planner/PlannerFactory.java | 3 +- .../sql/calcite/CalciteJoinQueryTest.java | 53 +++++++++++++++++++ .../druid/sql/calcite/CalciteQueryTest.java | 35 ++++++++++++ website/.spelling | 1 + 7 files changed, 117 insertions(+), 1 deletion(-) diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index 58110a4e352..b1ae3fc5930 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -63,6 +63,7 @@ Unless otherwise noted, the following parameters apply to all query types. |enableJoinLeftTableScanDirect|`false`|This flag applies to queries which have joins. For joins, where left child is a simple scan with a filter, by default, druid will run the scan as a query and the join the results to the right child on broker. Setting this flag to true overrides that behavior and druid will attempt to push the join to data servers instead. Please note that the flag could be applicable to queries even if there is no explicit join. since queries can internally translated into a join by the SQL planner.| |debug| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:
- Log the stack trace of the exception (if any) produced by the query | |maxNumericInFilters|`-1`|Max limit for the amount of numeric values that can be compared for a string type dimension when the entire SQL WHERE clause of a query translates only to an [OR](../querying/filters.md#or) of [Bound filter](../querying/filters.md#bound-filter). By default, Druid does not restrict the amount of of numeric Bound Filters on String columns, although this situation may block other queries from running. Set this property to a smaller value to prevent Druid from running queries that have prohibitively long segment processing times. The optimal limit requires some trial and error; we recommend starting with 100. Users who submit a query that exceeds the limit of `maxNumericInFilters` should instead rewrite their queries to use strings in the `WHERE` clause instead of numbers. For example, `WHERE someString IN (‘123’, ‘456’)`. This value cannot exceed the set system configuration `druid.sql.planner.maxNumericInFilters`. This value is ignored if `druid.sql.planner.maxNumericInFilters` is not set explicitly.| +|inSubQueryThreshold|`2147483647`| Threshold for minimum number of values in an IN clause to convert the query to a JOIN operation on an inlined table rather than a predicate. A threshold of 0 forces usage of an inline table in all cases; a threshold of [Integer.MAX_VALUE] forces usage of OR in all cases. | ## Parameters by query type Some query types offer context parameters specific to that query type. diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index fe47d53450a..5768c6a0670 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -69,6 +69,7 @@ public class QueryContexts public static final String ENABLE_DEBUG = "debug"; public static final String BY_SEGMENT_KEY = "bySegment"; public static final String BROKER_SERVICE_NAME = "brokerService"; + public static final String IN_SUB_QUERY_THRESHOLD_KEY = "inSubQueryThreshold"; public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; @@ -91,6 +92,7 @@ public class QueryContexts public static final boolean DEFAULT_USE_FILTER_CNF = false; public static final boolean DEFAULT_SECONDARY_PARTITION_PRUNING = true; public static final boolean DEFAULT_ENABLE_DEBUG = false; + public static final int DEFAULT_IN_SUB_QUERY_THRESHOLD = Integer.MAX_VALUE; @SuppressWarnings("unused") // Used by Jackson serialization public enum Vectorize @@ -335,6 +337,16 @@ public class QueryContexts return parseBoolean(queryContext, ENABLE_DEBUG, DEFAULT_ENABLE_DEBUG); } + public static int getInSubQueryThreshold(Map context) + { + return getInSubQueryThreshold(context, DEFAULT_IN_SUB_QUERY_THRESHOLD); + } + + public static int getInSubQueryThreshold(Map context, int defaultValue) + { + return parseInt(context, IN_SUB_QUERY_THRESHOLD_KEY, defaultValue); + } + public static Query withMaxScatterGatherBytes(Query query, long maxScatterGatherBytesLimit) { Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY); @@ -441,6 +453,12 @@ public class QueryContexts return val == null ? defaultValue : Numbers.parseInt(val); } + static int parseInt(Map context, String key, int defaultValue) + { + final Object val = context.get(key); + return val == null ? defaultValue : Numbers.parseInt(val); + } + static boolean parseBoolean(Query query, String key, boolean defaultValue) { final Object val = query.getContextValue(key); diff --git a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java index 3b693aa799e..7cf1f94efb3 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java @@ -133,6 +133,13 @@ public class QueryContextsTest Assert.assertTrue(QueryContexts.isSecondaryPartitionPruningEnabled(query)); } + @Test + public void testDefaultInSubQueryThreshold() + { + Assert.assertEquals(QueryContexts.DEFAULT_IN_SUB_QUERY_THRESHOLD, + QueryContexts.getInSubQueryThreshold(ImmutableMap.of())); + } + @Test public void testGetEnableJoinLeftScanDirect() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java index 0631995f74f..6fb4e82e003 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java @@ -38,6 +38,7 @@ import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.ValidationException; import org.apache.druid.guice.annotations.Json; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.QueryContexts; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.NoopEscalator; @@ -150,7 +151,7 @@ public class PlannerFactory .withExpand(false) .withDecorrelationEnabled(false) .withTrimUnusedFields(false) - .withInSubQueryThreshold(Integer.MAX_VALUE) + .withInSubQueryThreshold(QueryContexts.getInSubQueryThreshold(plannerContext.getQueryContext())) .build(); return Frameworks .newConfigBuilder() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index f1cfb4990a5..e7e24c159dd 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -4988,4 +4988,57 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest ImmutableList.of(new Object[]{4.0F, 4.0F}) ); } + + @Test + public void testPlanWithInFilterMoreThanInSubQueryThreshold() throws Exception + { + String query = "SELECT l1 FROM numfoo WHERE l1 IN (4842, 4844, 4845, 14905, 4853, 29064)"; + + Map queryContext = new HashMap<>(QUERY_CONTEXT_DEFAULT); + queryContext.put(QueryContexts.IN_SUB_QUERY_THRESHOLD_KEY, 3); + + testQuery( + PLANNER_CONFIG_DEFAULT, + queryContext, + DEFAULT_PARAMETERS, + query, + CalciteTests.REGULAR_USER_AUTH_RESULT, + ImmutableList.of( + Druids.newScanQueryBuilder() + .dataSource( + JoinDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE3), + InlineDataSource.fromIterable( + ImmutableList.of( + new Object[]{4842L}, + new Object[]{4844L}, + new Object[]{4845L}, + new Object[]{14905L}, + new Object[]{4853L}, + new Object[]{29064L} + ), + RowSignature.builder() + .add("ROW_VALUE", ColumnType.LONG) + .build() + ), + "j0.", + "(\"l1\" == \"j0.ROW_VALUE\")", + JoinType.INNER, + null, + ExprMacroTable.nil() + ) + ) + .columns("l1") + .intervals(querySegmentSpec(Filtration.eternity())) + .context(queryContext) + .legacy(false) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .build() + ), + (sql, result) -> { + // Ignore the results, only need to check that the type of query is a join. + }, + null + ); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index fe82cd1c4e7..ad68fd8e453 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -13850,4 +13850,39 @@ public class CalciteQueryTest extends BaseCalciteQueryTest "Possible error: SQL requires 'UNION' but only 'UNION ALL' is supported." ); } + + @Test + public void testPlanWithInFilterLessThanInSubQueryThreshold() throws Exception + { + String query = "SELECT l1 FROM numfoo WHERE l1 IN (4842, 4844, 4845, 14905, 4853, 29064)"; + + testQuery( + PLANNER_CONFIG_DEFAULT, + QUERY_CONTEXT_DEFAULT, + DEFAULT_PARAMETERS, + query, + CalciteTests.REGULAR_USER_AUTH_RESULT, + ImmutableList.of( + Druids.newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .columns("l1") + .intervals(querySegmentSpec(Filtration.eternity())) + .context(QUERY_CONTEXT_DEFAULT) + .legacy(false) + .filters( + in( + "l1", + ImmutableList.of("4842", "4844", "4845", "14905", "4853", "29064"), + null + ) + ) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .build() + ), + (sql, result) -> { + // Ignore the results, only need to check that the type of query is a filter. + }, + null + ); + } } diff --git a/website/.spelling b/website/.spelling index 25b64b407dd..b627fb1a5cd 100644 --- a/website/.spelling +++ b/website/.spelling @@ -294,6 +294,7 @@ influxdb ingestionSpec injective inlined +inSubQueryThreshold interruptible jackson-jq javadoc