mirror of https://github.com/apache/druid.git
Timeout and maxScatterGatherBytes handling for queries run by Druid SQL (#4305)
* Timeout and maxScatterGatherBytes handling for queries run by Druid SQL * Address PR comments * Fix contexts in CalciteQueryTest * Fix contexts in QuantileSqlAggregatorTest
This commit is contained in:
parent
22e5f52d00
commit
d49e53e6c2
|
@ -47,6 +47,7 @@ import io.druid.segment.QueryableIndex;
|
|||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.DruidPlanner;
|
||||
import io.druid.sql.calcite.planner.PlannerConfig;
|
||||
|
@ -177,7 +178,8 @@ public class SqlBenchmark
|
|||
Calcites.createRootSchema(druidSchema),
|
||||
walker,
|
||||
CalciteTests.createOperatorTable(),
|
||||
plannerConfig
|
||||
plannerConfig,
|
||||
new ServerConfig()
|
||||
);
|
||||
groupByQuery = GroupByQuery
|
||||
.builder()
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.google.common.collect.Iterables;
|
|||
import io.druid.java.util.common.granularity.Granularities;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.QueryContexts;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
|
@ -42,6 +43,7 @@ import io.druid.segment.IndexBuilder;
|
|||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.sql.calcite.aggregation.SqlAggregator;
|
||||
import io.druid.sql.calcite.expression.SqlExtractionOperator;
|
||||
import io.druid.sql.calcite.filtration.Filtration;
|
||||
|
@ -134,7 +136,7 @@ public class QuantileSqlAggregatorTest
|
|||
ImmutableSet.<SqlAggregator>of(new QuantileSqlAggregator()),
|
||||
ImmutableSet.<SqlExtractionOperator>of()
|
||||
);
|
||||
plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig);
|
||||
plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig());
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -200,7 +202,11 @@ public class QuantileSqlAggregatorTest
|
|||
new QuantilePostAggregator("a6", "a4:agg", 0.999f),
|
||||
new QuantilePostAggregator("a7", "a7:agg", 0.50f)
|
||||
))
|
||||
.context(ImmutableMap.<String, Object>of("skipEmptyBuckets", true))
|
||||
.context(ImmutableMap.<String, Object>of(
|
||||
"skipEmptyBuckets", true,
|
||||
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
|
||||
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
|
||||
))
|
||||
.build(),
|
||||
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
|
||||
);
|
||||
|
@ -260,7 +266,11 @@ public class QuantileSqlAggregatorTest
|
|||
new QuantilePostAggregator("a5", "a5:agg", 0.999f),
|
||||
new QuantilePostAggregator("a6", "a4:agg", 0.999f)
|
||||
))
|
||||
.context(ImmutableMap.<String, Object>of("skipEmptyBuckets", true))
|
||||
.context(ImmutableMap.<String, Object>of(
|
||||
"skipEmptyBuckets", true,
|
||||
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
|
||||
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
|
||||
))
|
||||
.build(),
|
||||
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
|
||||
);
|
||||
|
|
|
@ -30,6 +30,7 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
|||
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.ByteSource;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
|
@ -64,6 +65,7 @@ import io.druid.query.QueryWatcher;
|
|||
import io.druid.query.ResourceLimitExceededException;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.MetricManipulatorFns;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBufferInputStream;
|
||||
import org.jboss.netty.handler.codec.http.HttpChunk;
|
||||
|
@ -114,6 +116,31 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
private final AtomicInteger openConnections;
|
||||
private final boolean isSmile;
|
||||
|
||||
public static <T, QueryType extends Query<T>> QueryType withDefaultTimeoutAndMaxScatterGatherBytes(final QueryType query, ServerConfig serverConfig)
|
||||
{
|
||||
return (QueryType) QueryContexts.withMaxScatterGatherBytes(
|
||||
QueryContexts.withDefaultTimeout(
|
||||
(Query) query,
|
||||
serverConfig.getDefaultQueryTimeout()
|
||||
),
|
||||
serverConfig.getMaxScatterGatherBytes()
|
||||
);
|
||||
}
|
||||
|
||||
public static Map<String, Object> makeResponseContextForQuery(Query query, long startTimeMillis)
|
||||
{
|
||||
final Map<String, Object> responseContext = new MapMaker().makeMap();
|
||||
responseContext.put(
|
||||
DirectDruidClient.QUERY_FAIL_TIME,
|
||||
startTimeMillis + QueryContexts.getTimeout(query)
|
||||
);
|
||||
responseContext.put(
|
||||
DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED,
|
||||
new AtomicLong()
|
||||
);
|
||||
return responseContext;
|
||||
}
|
||||
|
||||
public DirectDruidClient(
|
||||
QueryToolChestWarehouse warehouse,
|
||||
QueryWatcher queryWatcher,
|
||||
|
|
|
@ -25,7 +25,6 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.google.common.io.CountingOutputStream;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
@ -41,7 +40,6 @@ import io.druid.java.util.common.guava.Yielders;
|
|||
import io.druid.query.DruidMetrics;
|
||||
import io.druid.query.GenericQueryMetricsFactory;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryContexts;
|
||||
import io.druid.query.QueryInterruptedException;
|
||||
import io.druid.query.QueryMetrics;
|
||||
import io.druid.query.QueryPlus;
|
||||
|
@ -187,7 +185,6 @@ public class QueryResource implements QueryCountStatsProvider
|
|||
|
||||
final String currThreadName = Thread.currentThread().getName();
|
||||
try {
|
||||
final Map<String, Object> responseContext = new MapMaker().makeMap();
|
||||
|
||||
query = context.getObjectMapper().readValue(in, Query.class);
|
||||
queryId = query.getId();
|
||||
|
@ -195,14 +192,12 @@ public class QueryResource implements QueryCountStatsProvider
|
|||
queryId = UUID.randomUUID().toString();
|
||||
query = query.withId(queryId);
|
||||
}
|
||||
query = QueryContexts.withDefaultTimeout(query, config.getDefaultQueryTimeout());
|
||||
query = QueryContexts.withMaxScatterGatherBytes(query, config.getMaxScatterGatherBytes());
|
||||
|
||||
responseContext.put(
|
||||
DirectDruidClient.QUERY_FAIL_TIME,
|
||||
System.currentTimeMillis() + QueryContexts.getTimeout(query)
|
||||
query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(query, config);
|
||||
final Map<String, Object> responseContext = DirectDruidClient.makeResponseContextForQuery(
|
||||
query,
|
||||
System.currentTimeMillis()
|
||||
);
|
||||
responseContext.put(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
|
||||
|
||||
toolChest = warehouse.getToolChest(query);
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ public class PlannerContext
|
|||
|
||||
private final PlannerConfig plannerConfig;
|
||||
private final DateTime localNow;
|
||||
private final long queryStartTimeMillis;
|
||||
private final Map<String, Object> queryContext;
|
||||
|
||||
private PlannerContext(
|
||||
|
@ -53,6 +54,7 @@ public class PlannerContext
|
|||
this.plannerConfig = Preconditions.checkNotNull(plannerConfig, "plannerConfig");
|
||||
this.queryContext = queryContext != null ? ImmutableMap.copyOf(queryContext) : ImmutableMap.<String, Object>of();
|
||||
this.localNow = Preconditions.checkNotNull(localNow, "localNow");
|
||||
this.queryStartTimeMillis = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public static PlannerContext create(
|
||||
|
@ -106,6 +108,11 @@ public class PlannerContext
|
|||
return queryContext;
|
||||
}
|
||||
|
||||
public long getQueryStartTimeMillis()
|
||||
{
|
||||
return queryStartTimeMillis;
|
||||
}
|
||||
|
||||
public DataContext createDataContext(final JavaTypeFactory typeFactory)
|
||||
{
|
||||
class DruidDataContext implements DataContext
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.sql.calcite.planner;
|
|||
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.query.QuerySegmentWalker;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.sql.calcite.rel.QueryMaker;
|
||||
import io.druid.sql.calcite.schema.DruidSchema;
|
||||
import org.apache.calcite.avatica.util.Casing;
|
||||
|
@ -44,25 +45,28 @@ public class PlannerFactory
|
|||
private final QuerySegmentWalker walker;
|
||||
private final DruidOperatorTable operatorTable;
|
||||
private final PlannerConfig plannerConfig;
|
||||
private final ServerConfig serverConfig;
|
||||
|
||||
@Inject
|
||||
public PlannerFactory(
|
||||
final SchemaPlus rootSchema,
|
||||
final QuerySegmentWalker walker,
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerConfig plannerConfig
|
||||
final PlannerConfig plannerConfig,
|
||||
final ServerConfig serverConfig
|
||||
)
|
||||
{
|
||||
this.rootSchema = rootSchema;
|
||||
this.walker = walker;
|
||||
this.operatorTable = operatorTable;
|
||||
this.plannerConfig = plannerConfig;
|
||||
this.serverConfig = serverConfig;
|
||||
}
|
||||
|
||||
public DruidPlanner createPlanner(final Map<String, Object> queryContext)
|
||||
{
|
||||
final PlannerContext plannerContext = PlannerContext.create(plannerConfig, queryContext);
|
||||
final QueryMaker queryMaker = new QueryMaker(walker, plannerContext);
|
||||
final QueryMaker queryMaker = new QueryMaker(walker, plannerContext, serverConfig);
|
||||
final FrameworkConfig frameworkConfig = Frameworks
|
||||
.newConfigBuilder()
|
||||
.parserConfig(
|
||||
|
|
|
@ -23,9 +23,9 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.primitives.Doubles;
|
||||
import com.google.common.primitives.Ints;
|
||||
import io.druid.client.DirectDruidClient;
|
||||
import io.druid.common.guava.GuavaUtils;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
|
@ -47,6 +47,7 @@ import io.druid.query.topn.DimensionAndMetricValueExtractor;
|
|||
import io.druid.query.topn.TopNQuery;
|
||||
import io.druid.query.topn.TopNResultValue;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import io.druid.sql.calcite.table.RowSignature;
|
||||
|
@ -69,14 +70,17 @@ public class QueryMaker
|
|||
{
|
||||
private final QuerySegmentWalker walker;
|
||||
private final PlannerContext plannerContext;
|
||||
private final ServerConfig serverConfig;
|
||||
|
||||
public QueryMaker(
|
||||
final QuerySegmentWalker walker,
|
||||
final PlannerContext plannerContext
|
||||
final PlannerContext plannerContext,
|
||||
final ServerConfig serverConfig
|
||||
)
|
||||
{
|
||||
this.walker = walker;
|
||||
this.plannerContext = plannerContext;
|
||||
this.serverConfig = serverConfig;
|
||||
}
|
||||
|
||||
public PlannerContext getPlannerContext()
|
||||
|
@ -179,12 +183,15 @@ public class QueryMaker
|
|||
@Override
|
||||
public Sequence<Object[]> next()
|
||||
{
|
||||
final SelectQuery queryWithPagination = baseQuery.withPagingSpec(
|
||||
final SelectQuery queryWithPagination = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
|
||||
baseQuery.withPagingSpec(
|
||||
new PagingSpec(
|
||||
pagingIdentifiers.get(),
|
||||
plannerContext.getPlannerConfig().getSelectThreshold(),
|
||||
true
|
||||
)
|
||||
),
|
||||
serverConfig
|
||||
);
|
||||
|
||||
Hook.QUERY_PLAN.run(queryWithPagination);
|
||||
|
@ -194,7 +201,13 @@ public class QueryMaker
|
|||
|
||||
return Sequences.concat(
|
||||
Sequences.map(
|
||||
QueryPlus.wrap(queryWithPagination).run(walker, Maps.<String, Object>newHashMap()),
|
||||
QueryPlus.wrap(queryWithPagination)
|
||||
.run(walker,
|
||||
DirectDruidClient.makeResponseContextForQuery(
|
||||
queryWithPagination,
|
||||
plannerContext.getQueryStartTimeMillis()
|
||||
)
|
||||
),
|
||||
new Function<Result<SelectResultValue>, Sequence<Object[]>>()
|
||||
{
|
||||
@Override
|
||||
|
@ -255,9 +268,14 @@ public class QueryMaker
|
|||
|
||||
private Sequence<Object[]> executeTimeseries(
|
||||
final DruidQueryBuilder queryBuilder,
|
||||
final TimeseriesQuery query
|
||||
final TimeseriesQuery baseQuery
|
||||
)
|
||||
{
|
||||
final TimeseriesQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
|
||||
baseQuery,
|
||||
serverConfig
|
||||
);
|
||||
|
||||
final List<RelDataTypeField> fieldList = queryBuilder.getRowType().getFieldList();
|
||||
final List<DimensionSpec> dimensions = queryBuilder.getGrouping().getDimensions();
|
||||
final String timeOutputName = dimensions.isEmpty() ? null : Iterables.getOnlyElement(dimensions).getOutputName();
|
||||
|
@ -265,7 +283,11 @@ public class QueryMaker
|
|||
Hook.QUERY_PLAN.run(query);
|
||||
|
||||
return Sequences.map(
|
||||
QueryPlus.wrap(query).run(walker, Maps.<String, Object>newHashMap()),
|
||||
QueryPlus.wrap(query)
|
||||
.run(
|
||||
walker,
|
||||
DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis())
|
||||
),
|
||||
new Function<Result<TimeseriesResultValue>, Object[]>()
|
||||
{
|
||||
@Override
|
||||
|
@ -291,16 +313,25 @@ public class QueryMaker
|
|||
|
||||
private Sequence<Object[]> executeTopN(
|
||||
final DruidQueryBuilder queryBuilder,
|
||||
final TopNQuery query
|
||||
final TopNQuery baseQuery
|
||||
)
|
||||
{
|
||||
final TopNQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
|
||||
baseQuery,
|
||||
serverConfig
|
||||
);
|
||||
|
||||
final List<RelDataTypeField> fieldList = queryBuilder.getRowType().getFieldList();
|
||||
|
||||
Hook.QUERY_PLAN.run(query);
|
||||
|
||||
return Sequences.concat(
|
||||
Sequences.map(
|
||||
QueryPlus.wrap(query).run(walker, Maps.<String, Object>newHashMap()),
|
||||
QueryPlus.wrap(query)
|
||||
.run(
|
||||
walker,
|
||||
DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis())
|
||||
),
|
||||
new Function<Result<TopNResultValue>, Sequence<Object[]>>()
|
||||
{
|
||||
@Override
|
||||
|
@ -328,15 +359,23 @@ public class QueryMaker
|
|||
|
||||
private Sequence<Object[]> executeGroupBy(
|
||||
final DruidQueryBuilder queryBuilder,
|
||||
final GroupByQuery query
|
||||
final GroupByQuery baseQuery
|
||||
)
|
||||
{
|
||||
final GroupByQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
|
||||
baseQuery,
|
||||
serverConfig
|
||||
);
|
||||
|
||||
final List<RelDataTypeField> fieldList = queryBuilder.getRowType().getFieldList();
|
||||
|
||||
Hook.QUERY_PLAN.run(query);
|
||||
|
||||
return Sequences.map(
|
||||
QueryPlus.wrap(query).run(walker, Maps.<String, Object>newHashMap()),
|
||||
QueryPlus.wrap(query)
|
||||
.run(
|
||||
walker,
|
||||
DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis())
|
||||
),
|
||||
new Function<io.druid.data.input.Row, Object[]>()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -31,6 +31,7 @@ import com.google.common.collect.Sets;
|
|||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.client.DirectDruidClient;
|
||||
import io.druid.client.DruidDataSource;
|
||||
import io.druid.client.DruidServer;
|
||||
import io.druid.client.ServerView;
|
||||
|
@ -50,6 +51,7 @@ import io.druid.query.metadata.metadata.SegmentAnalysis;
|
|||
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.sql.calcite.planner.PlannerConfig;
|
||||
import io.druid.sql.calcite.table.DruidTable;
|
||||
import io.druid.sql.calcite.table.RowSignature;
|
||||
|
@ -82,6 +84,7 @@ public class DruidSchema extends AbstractSchema
|
|||
private final ViewManager viewManager;
|
||||
private final ExecutorService cacheExec;
|
||||
private final ConcurrentMap<String, Table> tables;
|
||||
private final ServerConfig serverConfig;
|
||||
|
||||
// For awaitInitialization.
|
||||
private final CountDownLatch initializationLatch = new CountDownLatch(1);
|
||||
|
@ -100,7 +103,8 @@ public class DruidSchema extends AbstractSchema
|
|||
final QuerySegmentWalker walker,
|
||||
final TimelineServerView serverView,
|
||||
final PlannerConfig config,
|
||||
final ViewManager viewManager
|
||||
final ViewManager viewManager,
|
||||
final ServerConfig serverConfig
|
||||
)
|
||||
{
|
||||
this.walker = Preconditions.checkNotNull(walker, "walker");
|
||||
|
@ -109,6 +113,7 @@ public class DruidSchema extends AbstractSchema
|
|||
this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager");
|
||||
this.cacheExec = ScheduledExecutors.fixed(1, "DruidSchema-Cache-%d");
|
||||
this.tables = Maps.newConcurrentMap();
|
||||
this.serverConfig = serverConfig;
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
|
@ -295,7 +300,7 @@ public class DruidSchema extends AbstractSchema
|
|||
|
||||
private DruidTable computeTable(final String dataSource)
|
||||
{
|
||||
final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
|
||||
SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
|
||||
new TableDataSource(dataSource),
|
||||
null,
|
||||
null,
|
||||
|
@ -306,7 +311,19 @@ public class DruidSchema extends AbstractSchema
|
|||
true
|
||||
);
|
||||
|
||||
final Sequence<SegmentAnalysis> sequence = QueryPlus.wrap(segmentMetadataQuery).run(walker, Maps.newHashMap());
|
||||
segmentMetadataQuery = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
|
||||
segmentMetadataQuery,
|
||||
serverConfig
|
||||
);
|
||||
|
||||
final Sequence<SegmentAnalysis> sequence = QueryPlus.wrap(segmentMetadataQuery)
|
||||
.run(
|
||||
walker,
|
||||
DirectDruidClient.makeResponseContextForQuery(
|
||||
segmentMetadataQuery,
|
||||
System.currentTimeMillis()
|
||||
)
|
||||
);
|
||||
final List<SegmentAnalysis> results = Sequences.toList(sequence, Lists.<SegmentAnalysis>newArrayList());
|
||||
if (results.isEmpty()) {
|
||||
return null;
|
||||
|
|
|
@ -32,6 +32,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
|
|||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import io.druid.java.util.common.Pair;
|
||||
import io.druid.server.DruidNode;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerConfig;
|
||||
|
@ -116,7 +117,10 @@ public class DruidAvaticaHandlerTest
|
|||
);
|
||||
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
|
||||
final DruidAvaticaHandler handler = new DruidAvaticaHandler(
|
||||
new DruidMeta(new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig), AVATICA_CONFIG),
|
||||
new DruidMeta(
|
||||
new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig()),
|
||||
AVATICA_CONFIG
|
||||
),
|
||||
new DruidNode("dummy", "dummy", 1),
|
||||
new AvaticaMonitor()
|
||||
);
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.sql.avatica;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerConfig;
|
||||
|
@ -65,7 +66,7 @@ public class DruidStatementTest
|
|||
)
|
||||
);
|
||||
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
|
||||
plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig);
|
||||
plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig());
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -29,6 +29,7 @@ import io.druid.java.util.common.guava.Sequences;
|
|||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryContexts;
|
||||
import io.druid.query.QueryDataSource;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
|
@ -81,6 +82,7 @@ import io.druid.query.topn.NumericTopNMetricSpec;
|
|||
import io.druid.query.topn.TopNQueryBuilder;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.sql.calcite.filtration.Filtration;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
|
@ -169,35 +171,47 @@ public class CalciteQueryTest
|
|||
private static final String LOS_ANGELES = "America/Los_Angeles";
|
||||
|
||||
private static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.<String, Object>of(
|
||||
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z"
|
||||
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
|
||||
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
|
||||
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
|
||||
);
|
||||
|
||||
private static final Map<String, Object> QUERY_CONTEXT_DONT_SKIP_EMPTY_BUCKETS = ImmutableMap.<String, Object>of(
|
||||
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
|
||||
"skipEmptyBuckets", false
|
||||
"skipEmptyBuckets", false,
|
||||
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
|
||||
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
|
||||
);
|
||||
|
||||
private static final Map<String, Object> QUERY_CONTEXT_NO_TOPN = ImmutableMap.<String, Object>of(
|
||||
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
|
||||
PlannerConfig.CTX_KEY_USE_APPROXIMATE_TOPN, "false"
|
||||
PlannerConfig.CTX_KEY_USE_APPROXIMATE_TOPN, "false",
|
||||
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
|
||||
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
|
||||
);
|
||||
|
||||
private static final Map<String, Object> QUERY_CONTEXT_LOS_ANGELES = ImmutableMap.<String, Object>of(
|
||||
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
|
||||
PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES
|
||||
PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES,
|
||||
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
|
||||
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
|
||||
);
|
||||
|
||||
// Matches QUERY_CONTEXT_DEFAULT
|
||||
public static final Map<String, Object> TIMESERIES_CONTEXT_DEFAULT = ImmutableMap.<String, Object>of(
|
||||
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
|
||||
"skipEmptyBuckets", true
|
||||
"skipEmptyBuckets", true,
|
||||
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
|
||||
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
|
||||
);
|
||||
|
||||
// Matches QUERY_CONTEXT_LOS_ANGELES
|
||||
public static final Map<String, Object> TIMESERIES_CONTEXT_LOS_ANGELES = ImmutableMap.<String, Object>of(
|
||||
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
|
||||
PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES,
|
||||
"skipEmptyBuckets", true
|
||||
"skipEmptyBuckets", true,
|
||||
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
|
||||
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
|
||||
);
|
||||
private static final PagingSpec FIRST_PAGING_SPEC = new PagingSpec(null, 1000, true);
|
||||
|
||||
|
@ -4223,7 +4237,13 @@ public class CalciteQueryTest
|
|||
final SchemaPlus rootSchema = Calcites.createRootSchema(druidSchema);
|
||||
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
|
||||
|
||||
final PlannerFactory plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig);
|
||||
final PlannerFactory plannerFactory = new PlannerFactory(
|
||||
rootSchema,
|
||||
walker,
|
||||
operatorTable,
|
||||
plannerConfig,
|
||||
new ServerConfig()
|
||||
);
|
||||
|
||||
viewManager.createView(
|
||||
plannerFactory,
|
||||
|
|
|
@ -28,6 +28,7 @@ import io.druid.java.util.common.ISE;
|
|||
import io.druid.java.util.common.Pair;
|
||||
import io.druid.query.QueryInterruptedException;
|
||||
import io.druid.query.ResourceLimitExceededException;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerConfig;
|
||||
|
@ -77,7 +78,10 @@ public class SqlResourceTest
|
|||
CalciteTests.createMockSchema(walker, plannerConfig)
|
||||
);
|
||||
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
|
||||
resource = new SqlResource(JSON_MAPPER, new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig));
|
||||
resource = new SqlResource(
|
||||
JSON_MAPPER,
|
||||
new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig())
|
||||
);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -32,6 +32,7 @@ import io.druid.segment.IndexBuilder;
|
|||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.PlannerConfig;
|
||||
import io.druid.sql.calcite.table.DruidTable;
|
||||
|
@ -150,7 +151,8 @@ public class DruidSchemaTest
|
|||
walker,
|
||||
new TestServerInventoryView(walker.getSegments()),
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopViewManager()
|
||||
new NoopViewManager(),
|
||||
new ServerConfig()
|
||||
);
|
||||
|
||||
schema.start();
|
||||
|
|
|
@ -76,6 +76,7 @@ import io.druid.segment.IndexBuilder;
|
|||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.sql.calcite.aggregation.SqlAggregator;
|
||||
import io.druid.sql.calcite.expression.SqlExtractionOperator;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
|
@ -380,7 +381,8 @@ public class CalciteTests
|
|||
walker,
|
||||
new TestServerInventoryView(walker.getSegments()),
|
||||
plannerConfig,
|
||||
viewManager
|
||||
viewManager,
|
||||
new ServerConfig()
|
||||
);
|
||||
|
||||
schema.start();
|
||||
|
|
Loading…
Reference in New Issue