From d49e53e6c28c6cb548de72bb6eba6fa39f117b03 Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Tue, 23 May 2017 00:57:51 -0700 Subject: [PATCH] Timeout and maxScatterGatherBytes handling for queries run by Druid SQL (#4305) * Timeout and maxScatterGatherBytes handling for queries run by Druid SQL * Address PR comments * Fix contexts in CalciteQueryTest * Fix contexts in QuantileSqlAggregatorTest --- .../druid/benchmark/query/SqlBenchmark.java | 4 +- .../sql/QuantileSqlAggregatorTest.java | 16 ++++- .../io/druid/client/DirectDruidClient.java | 27 +++++++ .../java/io/druid/server/QueryResource.java | 13 ++-- .../sql/calcite/planner/PlannerContext.java | 7 ++ .../sql/calcite/planner/PlannerFactory.java | 8 ++- .../io/druid/sql/calcite/rel/QueryMaker.java | 71 ++++++++++++++----- .../druid/sql/calcite/schema/DruidSchema.java | 23 +++++- .../sql/avatica/DruidAvaticaHandlerTest.java | 6 +- .../druid/sql/avatica/DruidStatementTest.java | 3 +- .../druid/sql/calcite/CalciteQueryTest.java | 34 +++++++-- .../sql/calcite/http/SqlResourceTest.java | 6 +- .../sql/calcite/schema/DruidSchemaTest.java | 4 +- .../druid/sql/calcite/util/CalciteTests.java | 4 +- 14 files changed, 180 insertions(+), 46 deletions(-) diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java index 07d4daeb829..6a38eec88e0 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java @@ -47,6 +47,7 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.TestHelper; import io.druid.segment.column.ValueType; import io.druid.segment.serde.ComplexMetrics; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidPlanner; import io.druid.sql.calcite.planner.PlannerConfig; @@ -177,7 +178,8 @@ public class SqlBenchmark Calcites.createRootSchema(druidSchema), walker, CalciteTests.createOperatorTable(), - plannerConfig + plannerConfig, + new ServerConfig() ); groupByQuery = GroupByQuery .builder() diff --git a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java index e4eb972d320..6ccebebaeee 100644 --- a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java +++ b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.Iterables; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.QueryContexts; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -42,6 +43,7 @@ import io.druid.segment.IndexBuilder; import io.druid.segment.QueryableIndex; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.aggregation.SqlAggregator; import io.druid.sql.calcite.expression.SqlExtractionOperator; import io.druid.sql.calcite.filtration.Filtration; @@ -134,7 +136,7 @@ public class QuantileSqlAggregatorTest ImmutableSet.of(new QuantileSqlAggregator()), ImmutableSet.of() ); - plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig); + plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig()); } @After @@ -200,7 +202,11 @@ public class QuantileSqlAggregatorTest new QuantilePostAggregator("a6", "a4:agg", 0.999f), new QuantilePostAggregator("a7", "a7:agg", 0.50f) )) - .context(ImmutableMap.of("skipEmptyBuckets", true)) + .context(ImmutableMap.of( + "skipEmptyBuckets", true, + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE + )) .build(), Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) ); @@ -260,7 +266,11 @@ public class QuantileSqlAggregatorTest new QuantilePostAggregator("a5", "a5:agg", 0.999f), new QuantilePostAggregator("a6", "a4:agg", 0.999f) )) - .context(ImmutableMap.of("skipEmptyBuckets", true)) + .context(ImmutableMap.of( + "skipEmptyBuckets", true, + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE + )) .build(), Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) ); diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 34d65696499..cc1bbd8dba2 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -30,6 +30,7 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.base.Charsets; import com.google.common.base.Throwables; +import com.google.common.collect.MapMaker; import com.google.common.collect.Maps; import com.google.common.io.ByteSource; import com.google.common.util.concurrent.FutureCallback; @@ -64,6 +65,7 @@ import io.druid.query.QueryWatcher; import io.druid.query.ResourceLimitExceededException; import io.druid.query.Result; import io.druid.query.aggregation.MetricManipulatorFns; +import io.druid.server.initialization.ServerConfig; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.handler.codec.http.HttpChunk; @@ -114,6 +116,31 @@ public class DirectDruidClient implements QueryRunner private final AtomicInteger openConnections; private final boolean isSmile; + public static > QueryType withDefaultTimeoutAndMaxScatterGatherBytes(final QueryType query, ServerConfig serverConfig) + { + return (QueryType) QueryContexts.withMaxScatterGatherBytes( + QueryContexts.withDefaultTimeout( + (Query) query, + serverConfig.getDefaultQueryTimeout() + ), + serverConfig.getMaxScatterGatherBytes() + ); + } + + public static Map makeResponseContextForQuery(Query query, long startTimeMillis) + { + final Map responseContext = new MapMaker().makeMap(); + responseContext.put( + DirectDruidClient.QUERY_FAIL_TIME, + startTimeMillis + QueryContexts.getTimeout(query) + ); + responseContext.put( + DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, + new AtomicLong() + ); + return responseContext; + } + public DirectDruidClient( QueryToolChestWarehouse warehouse, QueryWatcher queryWatcher, diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 40aa8d52bc6..2ff2766b6c3 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.MapMaker; import com.google.common.io.CountingOutputStream; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; @@ -41,7 +40,6 @@ import io.druid.java.util.common.guava.Yielders; import io.druid.query.DruidMetrics; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; -import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; import io.druid.query.QueryPlus; @@ -187,7 +185,6 @@ public class QueryResource implements QueryCountStatsProvider final String currThreadName = Thread.currentThread().getName(); try { - final Map responseContext = new MapMaker().makeMap(); query = context.getObjectMapper().readValue(in, Query.class); queryId = query.getId(); @@ -195,14 +192,12 @@ public class QueryResource implements QueryCountStatsProvider queryId = UUID.randomUUID().toString(); query = query.withId(queryId); } - query = QueryContexts.withDefaultTimeout(query, config.getDefaultQueryTimeout()); - query = QueryContexts.withMaxScatterGatherBytes(query, config.getMaxScatterGatherBytes()); - responseContext.put( - DirectDruidClient.QUERY_FAIL_TIME, - System.currentTimeMillis() + QueryContexts.getTimeout(query) + query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(query, config); + final Map responseContext = DirectDruidClient.makeResponseContextForQuery( + query, + System.currentTimeMillis() ); - responseContext.put(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); toolChest = warehouse.getToolChest(query); diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/io/druid/sql/calcite/planner/PlannerContext.java index 9927412e63a..ac4f84b6423 100644 --- a/sql/src/main/java/io/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/io/druid/sql/calcite/planner/PlannerContext.java @@ -42,6 +42,7 @@ public class PlannerContext private final PlannerConfig plannerConfig; private final DateTime localNow; + private final long queryStartTimeMillis; private final Map queryContext; private PlannerContext( @@ -53,6 +54,7 @@ public class PlannerContext this.plannerConfig = Preconditions.checkNotNull(plannerConfig, "plannerConfig"); this.queryContext = queryContext != null ? ImmutableMap.copyOf(queryContext) : ImmutableMap.of(); this.localNow = Preconditions.checkNotNull(localNow, "localNow"); + this.queryStartTimeMillis = System.currentTimeMillis(); } public static PlannerContext create( @@ -106,6 +108,11 @@ public class PlannerContext return queryContext; } + public long getQueryStartTimeMillis() + { + return queryStartTimeMillis; + } + public DataContext createDataContext(final JavaTypeFactory typeFactory) { class DruidDataContext implements DataContext diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java b/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java index 85dba6686a0..e51c3ca43b2 100644 --- a/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java +++ b/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java @@ -21,6 +21,7 @@ package io.druid.sql.calcite.planner; import com.google.inject.Inject; import io.druid.query.QuerySegmentWalker; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.rel.QueryMaker; import io.druid.sql.calcite.schema.DruidSchema; import org.apache.calcite.avatica.util.Casing; @@ -44,25 +45,28 @@ public class PlannerFactory private final QuerySegmentWalker walker; private final DruidOperatorTable operatorTable; private final PlannerConfig plannerConfig; + private final ServerConfig serverConfig; @Inject public PlannerFactory( final SchemaPlus rootSchema, final QuerySegmentWalker walker, final DruidOperatorTable operatorTable, - final PlannerConfig plannerConfig + final PlannerConfig plannerConfig, + final ServerConfig serverConfig ) { this.rootSchema = rootSchema; this.walker = walker; this.operatorTable = operatorTable; this.plannerConfig = plannerConfig; + this.serverConfig = serverConfig; } public DruidPlanner createPlanner(final Map queryContext) { final PlannerContext plannerContext = PlannerContext.create(plannerConfig, queryContext); - final QueryMaker queryMaker = new QueryMaker(walker, plannerContext); + final QueryMaker queryMaker = new QueryMaker(walker, plannerContext, serverConfig); final FrameworkConfig frameworkConfig = Frameworks .newConfigBuilder() .parserConfig( diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java index c0eba66180d..5be2b554cea 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java @@ -23,9 +23,9 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import com.google.common.primitives.Doubles; import com.google.common.primitives.Ints; +import io.druid.client.DirectDruidClient; import io.druid.common.guava.GuavaUtils; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; @@ -47,6 +47,7 @@ import io.druid.query.topn.DimensionAndMetricValueExtractor; import io.druid.query.topn.TopNQuery; import io.druid.query.topn.TopNResultValue; import io.druid.segment.column.Column; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.PlannerContext; import io.druid.sql.calcite.table.RowSignature; @@ -69,14 +70,17 @@ public class QueryMaker { private final QuerySegmentWalker walker; private final PlannerContext plannerContext; + private final ServerConfig serverConfig; public QueryMaker( final QuerySegmentWalker walker, - final PlannerContext plannerContext + final PlannerContext plannerContext, + final ServerConfig serverConfig ) { this.walker = walker; this.plannerContext = plannerContext; + this.serverConfig = serverConfig; } public PlannerContext getPlannerContext() @@ -179,12 +183,15 @@ public class QueryMaker @Override public Sequence next() { - final SelectQuery queryWithPagination = baseQuery.withPagingSpec( - new PagingSpec( - pagingIdentifiers.get(), - plannerContext.getPlannerConfig().getSelectThreshold(), - true - ) + final SelectQuery queryWithPagination = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( + baseQuery.withPagingSpec( + new PagingSpec( + pagingIdentifiers.get(), + plannerContext.getPlannerConfig().getSelectThreshold(), + true + ) + ), + serverConfig ); Hook.QUERY_PLAN.run(queryWithPagination); @@ -194,7 +201,13 @@ public class QueryMaker return Sequences.concat( Sequences.map( - QueryPlus.wrap(queryWithPagination).run(walker, Maps.newHashMap()), + QueryPlus.wrap(queryWithPagination) + .run(walker, + DirectDruidClient.makeResponseContextForQuery( + queryWithPagination, + plannerContext.getQueryStartTimeMillis() + ) + ), new Function, Sequence>() { @Override @@ -255,9 +268,14 @@ public class QueryMaker private Sequence executeTimeseries( final DruidQueryBuilder queryBuilder, - final TimeseriesQuery query + final TimeseriesQuery baseQuery ) { + final TimeseriesQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( + baseQuery, + serverConfig + ); + final List fieldList = queryBuilder.getRowType().getFieldList(); final List dimensions = queryBuilder.getGrouping().getDimensions(); final String timeOutputName = dimensions.isEmpty() ? null : Iterables.getOnlyElement(dimensions).getOutputName(); @@ -265,7 +283,11 @@ public class QueryMaker Hook.QUERY_PLAN.run(query); return Sequences.map( - QueryPlus.wrap(query).run(walker, Maps.newHashMap()), + QueryPlus.wrap(query) + .run( + walker, + DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis()) + ), new Function, Object[]>() { @Override @@ -291,16 +313,25 @@ public class QueryMaker private Sequence executeTopN( final DruidQueryBuilder queryBuilder, - final TopNQuery query + final TopNQuery baseQuery ) { + final TopNQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( + baseQuery, + serverConfig + ); + final List fieldList = queryBuilder.getRowType().getFieldList(); Hook.QUERY_PLAN.run(query); return Sequences.concat( Sequences.map( - QueryPlus.wrap(query).run(walker, Maps.newHashMap()), + QueryPlus.wrap(query) + .run( + walker, + DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis()) + ), new Function, Sequence>() { @Override @@ -328,15 +359,23 @@ public class QueryMaker private Sequence executeGroupBy( final DruidQueryBuilder queryBuilder, - final GroupByQuery query + final GroupByQuery baseQuery ) { + final GroupByQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( + baseQuery, + serverConfig + ); + final List fieldList = queryBuilder.getRowType().getFieldList(); Hook.QUERY_PLAN.run(query); - return Sequences.map( - QueryPlus.wrap(query).run(walker, Maps.newHashMap()), + QueryPlus.wrap(query) + .run( + walker, + DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis()) + ), new Function() { @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java index e5cd0da46c1..d54a07f2f03 100644 --- a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java @@ -31,6 +31,7 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; +import io.druid.client.DirectDruidClient; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; import io.druid.client.ServerView; @@ -50,6 +51,7 @@ import io.druid.query.metadata.metadata.SegmentAnalysis; import io.druid.query.metadata.metadata.SegmentMetadataQuery; import io.druid.segment.column.ValueType; import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.table.DruidTable; import io.druid.sql.calcite.table.RowSignature; @@ -82,6 +84,7 @@ public class DruidSchema extends AbstractSchema private final ViewManager viewManager; private final ExecutorService cacheExec; private final ConcurrentMap tables; + private final ServerConfig serverConfig; // For awaitInitialization. private final CountDownLatch initializationLatch = new CountDownLatch(1); @@ -100,7 +103,8 @@ public class DruidSchema extends AbstractSchema final QuerySegmentWalker walker, final TimelineServerView serverView, final PlannerConfig config, - final ViewManager viewManager + final ViewManager viewManager, + final ServerConfig serverConfig ) { this.walker = Preconditions.checkNotNull(walker, "walker"); @@ -109,6 +113,7 @@ public class DruidSchema extends AbstractSchema this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager"); this.cacheExec = ScheduledExecutors.fixed(1, "DruidSchema-Cache-%d"); this.tables = Maps.newConcurrentMap(); + this.serverConfig = serverConfig; } @LifecycleStart @@ -295,7 +300,7 @@ public class DruidSchema extends AbstractSchema private DruidTable computeTable(final String dataSource) { - final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery( + SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery( new TableDataSource(dataSource), null, null, @@ -306,7 +311,19 @@ public class DruidSchema extends AbstractSchema true ); - final Sequence sequence = QueryPlus.wrap(segmentMetadataQuery).run(walker, Maps.newHashMap()); + segmentMetadataQuery = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( + segmentMetadataQuery, + serverConfig + ); + + final Sequence sequence = QueryPlus.wrap(segmentMetadataQuery) + .run( + walker, + DirectDruidClient.makeResponseContextForQuery( + segmentMetadataQuery, + System.currentTimeMillis() + ) + ); final List results = Sequences.toList(sequence, Lists.newArrayList()); if (results.isEmpty()) { return null; diff --git a/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java index 52a5bda8a1d..f2e7432e462 100644 --- a/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -32,6 +32,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.java.util.common.Pair; import io.druid.server.DruidNode; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; @@ -116,7 +117,10 @@ public class DruidAvaticaHandlerTest ); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final DruidAvaticaHandler handler = new DruidAvaticaHandler( - new DruidMeta(new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig), AVATICA_CONFIG), + new DruidMeta( + new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig()), + AVATICA_CONFIG + ), new DruidNode("dummy", "dummy", 1), new AvaticaMonitor() ); diff --git a/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java index 1f0ee59c441..6cea2ff2e5f 100644 --- a/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java +++ b/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java @@ -21,6 +21,7 @@ package io.druid.sql.avatica; import com.google.common.base.Function; import com.google.common.collect.Lists; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; @@ -65,7 +66,7 @@ public class DruidStatementTest ) ); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); - plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig); + plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig()); } @After diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index 1ef1fe6d209..4e4afd6e8fa 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -29,6 +29,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; import io.druid.query.Druids; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -81,6 +82,7 @@ import io.druid.query.topn.NumericTopNMetricSpec; import io.druid.query.topn.TopNQueryBuilder; import io.druid.segment.column.Column; import io.druid.segment.column.ValueType; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.filtration.Filtration; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; @@ -169,35 +171,47 @@ public class CalciteQueryTest private static final String LOS_ANGELES = "America/Los_Angeles"; private static final Map QUERY_CONTEXT_DEFAULT = ImmutableMap.of( - PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z" + PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); private static final Map QUERY_CONTEXT_DONT_SKIP_EMPTY_BUCKETS = ImmutableMap.of( PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", - "skipEmptyBuckets", false + "skipEmptyBuckets", false, + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); private static final Map QUERY_CONTEXT_NO_TOPN = ImmutableMap.of( PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", - PlannerConfig.CTX_KEY_USE_APPROXIMATE_TOPN, "false" + PlannerConfig.CTX_KEY_USE_APPROXIMATE_TOPN, "false", + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); private static final Map QUERY_CONTEXT_LOS_ANGELES = ImmutableMap.of( PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", - PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES + PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES, + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); // Matches QUERY_CONTEXT_DEFAULT public static final Map TIMESERIES_CONTEXT_DEFAULT = ImmutableMap.of( PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", - "skipEmptyBuckets", true + "skipEmptyBuckets", true, + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); // Matches QUERY_CONTEXT_LOS_ANGELES public static final Map TIMESERIES_CONTEXT_LOS_ANGELES = ImmutableMap.of( PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES, - "skipEmptyBuckets", true + "skipEmptyBuckets", true, + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); private static final PagingSpec FIRST_PAGING_SPEC = new PagingSpec(null, 1000, true); @@ -4223,7 +4237,13 @@ public class CalciteQueryTest final SchemaPlus rootSchema = Calcites.createRootSchema(druidSchema); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); - final PlannerFactory plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig); + final PlannerFactory plannerFactory = new PlannerFactory( + rootSchema, + walker, + operatorTable, + plannerConfig, + new ServerConfig() + ); viewManager.createView( plannerFactory, diff --git a/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java index 392d19fbf74..88492861d0c 100644 --- a/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java @@ -28,6 +28,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.query.QueryInterruptedException; import io.druid.query.ResourceLimitExceededException; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; @@ -77,7 +78,10 @@ public class SqlResourceTest CalciteTests.createMockSchema(walker, plannerConfig) ); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); - resource = new SqlResource(JSON_MAPPER, new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig)); + resource = new SqlResource( + JSON_MAPPER, + new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig()) + ); } @After diff --git a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java index 0d368186fc7..982c5badba6 100644 --- a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java @@ -32,6 +32,7 @@ import io.druid.segment.IndexBuilder; import io.druid.segment.QueryableIndex; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.table.DruidTable; @@ -150,7 +151,8 @@ public class DruidSchemaTest walker, new TestServerInventoryView(walker.getSegments()), PLANNER_CONFIG_DEFAULT, - new NoopViewManager() + new NoopViewManager(), + new ServerConfig() ); schema.start(); diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java index e3096d3a3b6..bf835505892 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java @@ -76,6 +76,7 @@ import io.druid.segment.IndexBuilder; import io.druid.segment.QueryableIndex; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.aggregation.SqlAggregator; import io.druid.sql.calcite.expression.SqlExtractionOperator; import io.druid.sql.calcite.planner.DruidOperatorTable; @@ -380,7 +381,8 @@ public class CalciteTests walker, new TestServerInventoryView(walker.getSegments()), plannerConfig, - viewManager + viewManager, + new ServerConfig() ); schema.start();