Cluster wide default query context setting (#10208)

* Cluster wide default query context setting

* Cluster wide default query context setting

* Cluster wide default query context setting

* add docs

* fix docs

* update props

* fix checkstyle

* fix checkstyle

* fix checkstyle

* update docs

* address comments

* fix checkstyle

* fix checkstyle

* fix checkstyle

* fix checkstyle

* fix checkstyle

* fix NPE
This commit is contained in:
Maytas Monsereenusorn 2020-07-29 15:19:18 -07:00 committed by GitHub
parent 63c1746fe4
commit 574b062f1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 305 additions and 180 deletions

View File

@ -40,7 +40,6 @@ import org.apache.druid.offheap.OffheapBufferGenerator;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
@ -399,7 +398,6 @@ public class GroupByTypeInterfaceBenchmark
new GroupByStrategyV2(
druidProcessingConfig,
configSupplier,
QueryConfig::new,
bufferPool,
mergePool,
new ObjectMapper(new SmileFactory()),

View File

@ -61,7 +61,6 @@ import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@ -373,7 +372,6 @@ public class CachingClusteredClientBenchmark
new GroupByStrategyV2(
processingConfig,
configSupplier,
QueryConfig::new,
bufferPool,
mergeBufferPool,
mapper,

View File

@ -44,7 +44,6 @@ import org.apache.druid.offheap.OffheapBufferGenerator;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
@ -562,7 +561,6 @@ public class GroupByBenchmark
new GroupByStrategyV2(
druidProcessingConfig,
configSupplier,
QueryConfig::new,
bufferPool,
mergePool,
new ObjectMapper(new SmileFactory()),

View File

@ -1769,14 +1769,35 @@ If there is an L1 miss and L2 hit, it will also populate L1.
This section describes configurations that control behavior of Druid's query types, applicable to Broker, Historical, and MiddleManager processes.
### Query vectorization config
### Overriding default query context values
The following configurations are to set the default behavior for query vectorization.
Any [Query Context General Parameter](../querying/query-context.html#general-parameters) default value can be
overridden by setting runtime property in the format of `druid.query.default.context.{query_context_key}`.
`druid.query.default.context.{query_context_key}` runtime property prefix applies to all current and future
query context keys, the same as how query context parameter passed with the query works. Note that the runtime property
value can be overridden if value for the same key is explicitly specify in the query contexts.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.vectorize`|See [Vectorization parameters](../querying/query-context.html#vectorization-parameters) for details. This value can be overridden by `vectorize` in the query contexts.|`true`|
|`druid.query.vectorSize`|See [Vectorization parameters](../querying/query-context.html#vectorization-parameters) for details. This value can be overridden by `vectorSize` in the query contexts.|`512`|
The precedence chain for query context values is as follows:
hard-coded default value in Druid code <- runtime property not prefixed with `druid.query.default.context`
<- runtime property prefixed with `druid.query.default.context` <- context parameter in the query
Note that not all query context key has a runtime property not prefixed with `druid.query.default.context` that can
override the hard-coded default value. For example, `maxQueuedBytes` has `druid.broker.http.maxQueuedBytes`
but `joinFilterRewriteMaxSize` does not. Hence, the only way of overriding `joinFilterRewriteMaxSize` hard-coded default
value is with runtime property `druid.query.default.context.joinFilterRewriteMaxSize`.
To further elaborate on the previous example:
If neither `druid.broker.http.maxQueuedBytes` or `druid.query.default.context.maxQueuedBytes` is set and
the query does not have `maxQueuedBytes` in the context, then the hard-coded value in Druid code is use.
If runtime property only contains `druid.broker.http.maxQueuedBytes=x` and query does not have `maxQueuedBytes` in the
context, then the value of the property, `x`, is use. However, if query does have `maxQueuedBytes` in the context,
then that value is use instead.
If runtime property only contains `druid.query.default.context.maxQueuedBytes=y` OR runtime property contains both
`druid.broker.http.maxQueuedBytes=x` and `druid.query.default.context.maxQueuedBytes=y`, then the value of
`druid.query.default.context.maxQueuedBytes`, `y`, is use (given that query does not have `maxQueuedBytes` in the
context). If query does have `maxQueuedBytes` in the context, then that value is use instead.
### TopN query config

View File

@ -32,6 +32,9 @@ the following ways:
HTTP POST API, or as properties to the JDBC connection.
- For [native queries](querying.md), context parameters are provided as a JSON object named `context`.
Note that setting query context will override both the default value and the runtime properties value in the format of
`druid.query.default.context.{property_key}` (if set).
These parameters apply to all query types.
|property |default | description |
@ -100,5 +103,5 @@ vectorization. These query types will ignore the "vectorize" parameter even if i
|property|default| description|
|--------|-------|------------|
|vectorize|`true`|Enables or disables vectorized query execution. Possible values are `false` (disabled), `true` (enabled if possible, disabled otherwise, on a per-segment basis), and `force` (enabled, and groupBy or timeseries queries that cannot be vectorized will fail). The `"force"` setting is meant to aid in testing, and is not generally useful in production (since real-time segments can never be processed with vectorized execution, any queries on real-time data will fail). This will override `druid.query.vectorize` if it's set.|
|vectorSize|`512`|Sets the row batching size for a particular query. This will override `druid.query.vectorSize` if it's set.|
|vectorize|`true`|Enables or disables vectorized query execution. Possible values are `false` (disabled), `true` (enabled if possible, disabled otherwise, on a per-segment basis), and `force` (enabled, and groupBy or timeseries queries that cannot be vectorized will fail). The `"force"` setting is meant to aid in testing, and is not generally useful in production (since real-time segments can never be processed with vectorized execution, any queries on real-time data will fail). This will override `druid.query.default.context.vectorize` if it's set.|
|vectorSize|`512`|Sets the row batching size for a particular query. This will override `druid.query.default.context.vectorSize` if it's set.|

View File

@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
@ -100,7 +99,6 @@ public class MapVirtualColumnGroupByTest extends InitializedNullHandlingTest
}
},
GroupByQueryConfig::new,
QueryConfig::new,
new StupidPool<>("map-virtual-column-groupby-test", () -> ByteBuffer.allocate(1024)),
new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1),
new DefaultObjectMapper(),

View File

@ -19,43 +19,48 @@
package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.query.QueryContexts.Vectorize;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import com.google.common.collect.ImmutableMap;
import javax.annotation.Nonnull;
import java.util.Map;
/**
* A user configuration holder for all query types.
* Any query-specific configurations should go to their own configuration.
*
* @see org.apache.druid.query.groupby.GroupByQueryConfig
* @see org.apache.druid.query.search.SearchQueryConfig
* @see org.apache.druid.query.topn.TopNQueryConfig
* @see org.apache.druid.query.metadata.SegmentMetadataQueryConfig
* @see org.apache.druid.query.scan.ScanQueryConfig
*
*/
public class QueryConfig
public class DefaultQueryConfig
{
/**
* Note that context values should not be directly retrieved from this field but instead should
* be read through {@link QueryContexts}. This field contains context configs from runtime property
* which is then merged with configs passed in query context. The result of the merge is subsequently stored in
* the query context. The order of precedence in merging of the configs is as follow:
* runtime property values (store in this class) override by query context parameter passed in with the query
*/
@JsonProperty
private Vectorize vectorize = QueryContexts.DEFAULT_VECTORIZE;
private final Map<String, Object> context;
@JsonProperty
private int vectorSize = QueryableIndexStorageAdapter.DEFAULT_VECTOR_SIZE;
public Vectorize getVectorize()
@Nonnull
public Map<String, Object> getContext()
{
return vectorize;
return context;
}
public int getVectorSize()
@JsonCreator
public DefaultQueryConfig(@JsonProperty("context") Map<String, Object> context)
{
return vectorSize;
}
public QueryConfig withOverrides(final Query<?> query)
{
final QueryConfig newConfig = new QueryConfig();
newConfig.vectorize = QueryContexts.getVectorize(query, vectorize);
newConfig.vectorSize = QueryContexts.getVectorSize(query, vectorSize);
return newConfig;
if (context == null) {
this.context = ImmutableMap.of();
} else {
this.context = context;
}
}
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import java.util.concurrent.TimeUnit;
@ -184,11 +185,21 @@ public class QueryContexts
return parseBoolean(query, "serializeDateTimeAsLongInner", defaultValue);
}
public static <T> Vectorize getVectorize(Query<T> query)
{
return getVectorize(query, QueryContexts.DEFAULT_VECTORIZE);
}
public static <T> Vectorize getVectorize(Query<T> query, Vectorize defaultValue)
{
return parseEnum(query, VECTORIZE_KEY, Vectorize.class, defaultValue);
}
public static <T> int getVectorSize(Query<T> query)
{
return getVectorSize(query, QueryableIndexStorageAdapter.DEFAULT_VECTOR_SIZE);
}
public static <T> int getVectorSize(Query<T> query, int defaultSize)
{
return parseInt(query, VECTOR_SIZE_KEY, defaultSize);

View File

@ -33,7 +33,7 @@ import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
@ -114,8 +114,7 @@ public class GroupByQueryEngineV2
final GroupByQuery query,
@Nullable final StorageAdapter storageAdapter,
final NonBlockingPool<ByteBuffer> intermediateResultsBufferPool,
final GroupByQueryConfig querySpecificConfig,
final QueryConfig queryConfig
final GroupByQueryConfig querySpecificConfig
)
{
if (storageAdapter == null) {
@ -143,7 +142,7 @@ public class GroupByQueryEngineV2
final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter()));
final Interval interval = Iterables.getOnlyElement(query.getIntervals());
final boolean doVectorize = queryConfig.getVectorize().shouldVectorize(
final boolean doVectorize = QueryContexts.getVectorize(query).shouldVectorize(
VectorGroupByEngine.canVectorize(query, storageAdapter, filter)
);
@ -157,8 +156,7 @@ public class GroupByQueryEngineV2
fudgeTimestamp,
filter,
interval,
querySpecificConfig,
queryConfig
querySpecificConfig
);
} else {
result = processNonVectorized(

View File

@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.Filter;
@ -95,8 +95,7 @@ public class VectorGroupByEngine
@Nullable final DateTime fudgeTimestamp,
@Nullable final Filter filter,
final Interval interval,
final GroupByQueryConfig config,
final QueryConfig queryConfig
final GroupByQueryConfig config
)
{
if (!canVectorize(query, storageAdapter, filter)) {
@ -114,7 +113,7 @@ public class VectorGroupByEngine
interval,
query.getVirtualColumns(),
false,
queryConfig.getVectorSize(),
QueryContexts.getVectorSize(query),
null
);

View File

@ -42,7 +42,6 @@ import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.InsufficientResourcesException;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryPlus;
@ -89,7 +88,6 @@ public class GroupByStrategyV2 implements GroupByStrategy
private final DruidProcessingConfig processingConfig;
private final Supplier<GroupByQueryConfig> configSupplier;
private final Supplier<QueryConfig> queryConfigSupplier;
private final NonBlockingPool<ByteBuffer> bufferPool;
private final BlockingPool<ByteBuffer> mergeBufferPool;
private final ObjectMapper spillMapper;
@ -99,7 +97,6 @@ public class GroupByStrategyV2 implements GroupByStrategy
public GroupByStrategyV2(
DruidProcessingConfig processingConfig,
Supplier<GroupByQueryConfig> configSupplier,
Supplier<QueryConfig> queryConfigSupplier,
@Global NonBlockingPool<ByteBuffer> bufferPool,
@Merging BlockingPool<ByteBuffer> mergeBufferPool,
@Smile ObjectMapper spillMapper,
@ -108,7 +105,6 @@ public class GroupByStrategyV2 implements GroupByStrategy
{
this.processingConfig = processingConfig;
this.configSupplier = configSupplier;
this.queryConfigSupplier = queryConfigSupplier;
this.bufferPool = bufferPool;
this.mergeBufferPool = mergeBufferPool;
this.spillMapper = spillMapper;
@ -574,8 +570,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
query,
storageAdapter,
bufferPool,
configSupplier.get().withOverrides(query),
queryConfigSupplier.get().withOverrides(query)
configSupplier.get().withOverrides(query)
);
}

View File

@ -20,8 +20,6 @@
package org.apache.druid.query.timeseries;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.collections.NonBlockingPool;
@ -33,7 +31,7 @@ import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.Aggregator;
@ -59,7 +57,6 @@ import java.util.Objects;
*/
public class TimeseriesQueryEngine
{
private final Supplier<QueryConfig> queryConfigSupplier;
private final NonBlockingPool<ByteBuffer> bufferPool;
/**
@ -68,17 +65,14 @@ public class TimeseriesQueryEngine
@VisibleForTesting
public TimeseriesQueryEngine()
{
this.queryConfigSupplier = Suppliers.ofInstance(new QueryConfig());
this.bufferPool = new StupidPool<>("dummy", () -> ByteBuffer.allocate(1000000));
}
@Inject
public TimeseriesQueryEngine(
final Supplier<QueryConfig> queryConfigSupplier,
final @Global NonBlockingPool<ByteBuffer> bufferPool
)
{
this.queryConfigSupplier = queryConfigSupplier;
this.bufferPool = bufferPool;
}
@ -94,13 +88,12 @@ public class TimeseriesQueryEngine
);
}
final QueryConfig queryConfigToUse = queryConfigSupplier.get().withOverrides(query);
final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter()));
final Interval interval = Iterables.getOnlyElement(query.getIntervals());
final Granularity gran = query.getGranularity();
final boolean descending = query.isDescending();
final boolean doVectorize = queryConfigToUse.getVectorize().shouldVectorize(
final boolean doVectorize = QueryContexts.getVectorize(query).shouldVectorize(
adapter.canVectorize(filter, query.getVirtualColumns(), descending)
&& query.getAggregatorSpecs().stream().allMatch(aggregatorFactory -> aggregatorFactory.canVectorize(adapter))
);
@ -108,7 +101,7 @@ public class TimeseriesQueryEngine
final Sequence<Result<TimeseriesResultValue>> result;
if (doVectorize) {
result = processVectorized(query, queryConfigToUse, adapter, filter, interval, gran, descending);
result = processVectorized(query, adapter, filter, interval, gran, descending);
} else {
result = processNonVectorized(query, adapter, filter, interval, gran, descending);
}
@ -123,7 +116,6 @@ public class TimeseriesQueryEngine
private Sequence<Result<TimeseriesResultValue>> processVectorized(
final TimeseriesQuery query,
final QueryConfig queryConfig,
final StorageAdapter adapter,
@Nullable final Filter filter,
final Interval queryInterval,
@ -139,7 +131,7 @@ public class TimeseriesQueryEngine
queryInterval,
query.getVirtualColumns(),
descending,
queryConfig.getVectorSize(),
QueryContexts.getVectorSize(query),
null
);

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.annotations.Global;
import org.junit.Assert;
import org.junit.Test;
import java.util.Properties;
public class DefaultQueryConfigTest
{
@Test
public void testSerdeContextMap()
{
final Injector injector = createInjector();
final String propertyPrefix = "druid.query.default";
final JsonConfigProvider<DefaultQueryConfig> provider = JsonConfigProvider.of(
propertyPrefix,
DefaultQueryConfig.class
);
final Properties properties = new Properties();
properties.put(propertyPrefix + ".context.joinFilterRewriteMaxSize", "10");
properties.put(propertyPrefix + ".context.vectorize", "true");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final DefaultQueryConfig defaultQueryConfig = provider.get().get();
Assert.assertNotNull(defaultQueryConfig.getContext());
Assert.assertEquals(2, defaultQueryConfig.getContext().size());
Assert.assertEquals("10", defaultQueryConfig.getContext().get("joinFilterRewriteMaxSize"));
Assert.assertEquals("true", defaultQueryConfig.getContext().get("vectorize"));
}
@Test
public void testSerdeEmptyContextMap()
{
final Injector injector = createInjector();
final String propertyPrefix = "druid.query.default";
final JsonConfigProvider<DefaultQueryConfig> provider = JsonConfigProvider.of(
propertyPrefix,
DefaultQueryConfig.class
);
final Properties properties = new Properties();
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final DefaultQueryConfig defaultQueryConfig = provider.get().get();
Assert.assertNotNull(defaultQueryConfig.getContext());
Assert.assertEquals(0, defaultQueryConfig.getContext().size());
}
private Injector createInjector()
{
Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.of(
binder -> {
JsonConfigProvider.bind(binder, "druid.query.default", DefaultQueryConfig.class, Global.class);
}
)
);
return injector;
}
}

View File

@ -48,7 +48,6 @@ import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
@ -352,9 +351,6 @@ public class GroupByLimitPushDownInsufficientBufferTest
};
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final Supplier<QueryConfig> queryConfigSupplier = Suppliers.ofInstance(
new QueryConfig()
);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
@ -366,7 +362,6 @@ public class GroupByLimitPushDownInsufficientBufferTest
new GroupByStrategyV2(
druidProcessingConfig,
configSupplier,
queryConfigSupplier,
bufferPool,
mergePool,
new ObjectMapper(new SmileFactory()),
@ -385,7 +380,6 @@ public class GroupByLimitPushDownInsufficientBufferTest
new GroupByStrategyV2(
tooSmallDruidProcessingConfig,
configSupplier,
queryConfigSupplier,
bufferPool,
tooSmallMergePool,
new ObjectMapper(new SmileFactory()),

View File

@ -48,7 +48,6 @@ import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
@ -384,9 +383,6 @@ public class GroupByLimitPushDownMultiNodeMergeTest
};
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final Supplier<QueryConfig> queryConfigSupplier = Suppliers.ofInstance(
new QueryConfig()
);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
@ -398,7 +394,6 @@ public class GroupByLimitPushDownMultiNodeMergeTest
new GroupByStrategyV2(
druidProcessingConfig,
configSupplier,
queryConfigSupplier,
bufferPool,
mergePool,
new ObjectMapper(new SmileFactory()),
@ -417,7 +412,6 @@ public class GroupByLimitPushDownMultiNodeMergeTest
new GroupByStrategyV2(
druidProcessingConfig,
configSupplier,
queryConfigSupplier,
bufferPool,
mergePool2,
new ObjectMapper(new SmileFactory()),

View File

@ -45,7 +45,6 @@ import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
@ -275,7 +274,6 @@ public class GroupByMultiSegmentTest
new GroupByStrategyV2(
druidProcessingConfig,
configSupplier,
Suppliers.ofInstance(new QueryConfig()),
bufferPool,
mergePool,
new ObjectMapper(new SmileFactory()),

View File

@ -31,7 +31,6 @@ import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunner;
@ -138,7 +137,6 @@ public class GroupByQueryMergeBufferTest extends InitializedNullHandlingTest
new GroupByStrategyV2(
PROCESSING_CONFIG,
configSupplier,
Suppliers.ofInstance(new QueryConfig()),
BUFFER_POOL,
MERGE_BUFFER_POOL,
mapper,

View File

@ -31,7 +31,6 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.InsufficientResourcesException;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryInterruptedException;
@ -111,7 +110,6 @@ public class GroupByQueryRunnerFailureTest
new GroupByStrategyV2(
DEFAULT_PROCESSING_CONFIG,
configSupplier,
Suppliers.ofInstance(new QueryConfig()),
BUFFER_POOL,
MERGE_BUFFER_POOL,
mapper,

View File

@ -53,7 +53,6 @@ import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.ChainedExecutionQueryRunner;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryPlus;
@ -396,7 +395,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
new GroupByStrategyV2(
processingConfig,
configSupplier,
Suppliers.ofInstance(new QueryConfig()),
bufferPool,
mergeBufferPool,
mapper,

View File

@ -49,7 +49,6 @@ import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
@ -312,9 +311,6 @@ public class NestedQueryPushDownTest
};
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final Supplier<QueryConfig> queryConfigSupplier = Suppliers.ofInstance(
new QueryConfig()
);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
@ -326,7 +322,6 @@ public class NestedQueryPushDownTest
new GroupByStrategyV2(
druidProcessingConfig,
configSupplier,
queryConfigSupplier,
bufferPool,
mergePool,
new ObjectMapper(new SmileFactory()),
@ -345,7 +340,6 @@ public class NestedQueryPushDownTest
new GroupByStrategyV2(
druidProcessingConfig,
configSupplier,
queryConfigSupplier,
bufferPool,
mergePool2,
new ObjectMapper(new SmileFactory()),

View File

@ -1,80 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.search;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryContexts.Vectorize;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.TestQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class QueryConfigTest
{
@Test
public void testSerde() throws IOException
{
final ObjectMapper mapper = TestHelper.makeJsonMapper();
final String json = "{"
+ "\"vectorize\" : \"force\","
+ "\"vectorSize\" : 1"
+ "}";
final QueryConfig config = mapper.readValue(json, QueryConfig.class);
Assert.assertEquals(Vectorize.FORCE, config.getVectorize());
Assert.assertEquals(1, config.getVectorSize());
}
@Test
public void testDefault()
{
final QueryConfig config = new QueryConfig();
Assert.assertEquals(QueryContexts.DEFAULT_VECTORIZE, config.getVectorize());
Assert.assertEquals(QueryableIndexStorageAdapter.DEFAULT_VECTOR_SIZE, config.getVectorSize());
}
@Test
public void testOverrides()
{
final Query<?> query = new TestQuery(
new TableDataSource("datasource"),
new MultipleIntervalSegmentSpec(ImmutableList.of()),
false,
ImmutableMap.of(
QueryContexts.VECTORIZE_KEY,
"true",
QueryContexts.VECTOR_SIZE_KEY,
QueryableIndexStorageAdapter.DEFAULT_VECTOR_SIZE * 2
)
);
final QueryConfig config = new QueryConfig().withOverrides(query);
Assert.assertEquals(Vectorize.TRUE, config.getVectorize());
Assert.assertEquals(QueryableIndexStorageAdapter.DEFAULT_VECTOR_SIZE * 2, config.getVectorSize());
}
}

View File

@ -25,10 +25,10 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.GenericQueryMetricsFactory;
import org.apache.druid.query.MapQueryToolChestWarehouse;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.datasourcemetadata.DataSourceMetadataQuery;
@ -97,7 +97,7 @@ public class QueryToolChestModule implements Module
binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);
JsonConfigProvider.bind(binder, "druid.query", QueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.default", DefaultQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.search", SearchQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.topN", TopNQueryConfig.class);

View File

@ -31,6 +31,8 @@ import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.GenericQueryMetricsFactory;
import org.apache.druid.query.Query;
@ -77,6 +79,7 @@ public class QueryLifecycle
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final AuthorizerMapper authorizerMapper;
private final DefaultQueryConfig defaultQueryConfig;
private final long startMs;
private final long startNs;
@ -92,6 +95,7 @@ public class QueryLifecycle
final ServiceEmitter emitter,
final RequestLogger requestLogger,
final AuthorizerMapper authorizerMapper,
final DefaultQueryConfig defaultQueryConfig,
final long startMs,
final long startNs
)
@ -102,6 +106,7 @@ public class QueryLifecycle
this.emitter = emitter;
this.requestLogger = requestLogger;
this.authorizerMapper = authorizerMapper;
this.defaultQueryConfig = defaultQueryConfig;
this.startMs = startMs;
this.startNs = startNs;
}
@ -170,7 +175,14 @@ public class QueryLifecycle
queryId = UUID.randomUUID().toString();
}
this.baseQuery = baseQuery.withId(queryId);
Map<String, Object> mergedUserAndConfigContext;
if (baseQuery.getContext() != null) {
mergedUserAndConfigContext = BaseQuery.computeOverriddenContext(defaultQueryConfig.getContext(), baseQuery.getContext());
} else {
mergedUserAndConfigContext = defaultQueryConfig.getContext();
}
this.baseQuery = baseQuery.withOverriddenContext(mergedUserAndConfigContext).withId(queryId);
this.toolChest = warehouse.getToolChest(baseQuery);
}

View File

@ -19,9 +19,11 @@
package org.apache.druid.server;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.GenericQueryMetricsFactory;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChestWarehouse;
@ -38,6 +40,7 @@ public class QueryLifecycleFactory
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final AuthorizerMapper authorizerMapper;
private final DefaultQueryConfig defaultQueryConfig;
@Inject
public QueryLifecycleFactory(
@ -47,7 +50,8 @@ public class QueryLifecycleFactory
final ServiceEmitter emitter,
final RequestLogger requestLogger,
final AuthConfig authConfig,
final AuthorizerMapper authorizerMapper
final AuthorizerMapper authorizerMapper,
final Supplier<DefaultQueryConfig> queryConfigSupplier
)
{
this.warehouse = warehouse;
@ -56,6 +60,7 @@ public class QueryLifecycleFactory
this.emitter = emitter;
this.requestLogger = requestLogger;
this.authorizerMapper = authorizerMapper;
this.defaultQueryConfig = queryConfigSupplier.get();
}
public QueryLifecycle factorize()
@ -67,6 +72,7 @@ public class QueryLifecycleFactory
emitter,
requestLogger,
authorizerMapper,
defaultQueryConfig,
System.currentTimeMillis(),
System.nanoTime()
);

View File

@ -22,6 +22,7 @@ package org.apache.druid.server;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@ -34,6 +35,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.MapQueryToolChestWarehouse;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
@ -202,7 +204,8 @@ public class QueryResourceTest
new NoopServiceEmitter(),
testRequestLogger,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
),
JSON_MAPPER,
JSON_MAPPER,
@ -233,6 +236,111 @@ public class QueryResourceTest
Assert.assertNotNull(response);
}
@Test
public void testGoodQueryWithQueryConfigOverrideDefault() throws IOException
{
String overrideConfigKey = "priority";
String overrideConfigValue = "678";
DefaultQueryConfig overrideConfig = new DefaultQueryConfig(ImmutableMap.of(overrideConfigKey, overrideConfigValue));
queryResource = new QueryResource(
new QueryLifecycleFactory(
WAREHOUSE,
TEST_SEGMENT_WALKER,
new DefaultGenericQueryMetricsFactory(),
new NoopServiceEmitter(),
testRequestLogger,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Suppliers.ofInstance(overrideConfig)
),
JSON_MAPPER,
JSON_MAPPER,
queryScheduler,
new AuthConfig(),
null,
ResponseContextConfig.newConfig(true),
DRUID_NODE
);
expectPermissiveHappyPathAuth();
Response response = queryResource.doPost(
new ByteArrayInputStream(SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)),
null /*pretty*/,
testServletRequest
);
Assert.assertNotNull(response);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
((StreamingOutput) response.getEntity()).write(baos);
final List<Result<TimeBoundaryResultValue>> responses = JSON_MAPPER.readValue(
baos.toByteArray(),
new TypeReference<List<Result<TimeBoundaryResultValue>>>() {}
);
Assert.assertNotNull(response);
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
Assert.assertEquals(0, responses.size());
Assert.assertEquals(1, testRequestLogger.getNativeQuerylogs().size());
Assert.assertNotNull(testRequestLogger.getNativeQuerylogs().get(0).getQuery());
Assert.assertNotNull(testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext());
Assert.assertTrue(testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().containsKey(overrideConfigKey));
Assert.assertEquals(overrideConfigValue, testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().get(overrideConfigKey));
}
@Test
public void testGoodQueryWithQueryConfigDoesNotOverrideQueryContext() throws IOException
{
String overrideConfigKey = "priority";
String overrideConfigValue = "678";
DefaultQueryConfig overrideConfig = new DefaultQueryConfig(ImmutableMap.of(overrideConfigKey, overrideConfigValue));
queryResource = new QueryResource(
new QueryLifecycleFactory(
WAREHOUSE,
TEST_SEGMENT_WALKER,
new DefaultGenericQueryMetricsFactory(),
new NoopServiceEmitter(),
testRequestLogger,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Suppliers.ofInstance(overrideConfig)
),
JSON_MAPPER,
JSON_MAPPER,
queryScheduler,
new AuthConfig(),
null,
ResponseContextConfig.newConfig(true),
DRUID_NODE
);
expectPermissiveHappyPathAuth();
Response response = queryResource.doPost(
// SIMPLE_TIMESERIES_QUERY_LOW_PRIORITY context has overrideConfigKey with value of -1
new ByteArrayInputStream(SIMPLE_TIMESERIES_QUERY_LOW_PRIORITY.getBytes(StandardCharsets.UTF_8)),
null /*pretty*/,
testServletRequest
);
Assert.assertNotNull(response);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
((StreamingOutput) response.getEntity()).write(baos);
final List<Result<TimeBoundaryResultValue>> responses = JSON_MAPPER.readValue(
baos.toByteArray(),
new TypeReference<List<Result<TimeBoundaryResultValue>>>() {}
);
Assert.assertNotNull(response);
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
Assert.assertEquals(0, responses.size());
Assert.assertEquals(1, testRequestLogger.getNativeQuerylogs().size());
Assert.assertNotNull(testRequestLogger.getNativeQuerylogs().get(0).getQuery());
Assert.assertNotNull(testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext());
Assert.assertTrue(testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().containsKey(overrideConfigKey));
Assert.assertEquals(-1, testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().get(overrideConfigKey));
}
@Test
public void testTruncatedResponseContextShouldFail() throws IOException
{
@ -471,7 +579,8 @@ public class QueryResourceTest
new NoopServiceEmitter(),
testRequestLogger,
new AuthConfig(),
authMapper
authMapper,
Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
),
JSON_MAPPER,
JSON_MAPPER,
@ -586,7 +695,8 @@ public class QueryResourceTest
new NoopServiceEmitter(),
testRequestLogger,
new AuthConfig(),
authMapper
authMapper,
Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
),
JSON_MAPPER,
JSON_MAPPER,
@ -709,7 +819,8 @@ public class QueryResourceTest
new NoopServiceEmitter(),
testRequestLogger,
new AuthConfig(),
authMapper
authMapper,
Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
),
JSON_MAPPER,
JSON_MAPPER,
@ -967,7 +1078,8 @@ public class QueryResourceTest
new NoopServiceEmitter(),
testRequestLogger,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
),
JSON_MAPPER,
JSON_MAPPER,

View File

@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@ -59,6 +60,7 @@ import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.Query;
@ -705,7 +707,8 @@ public class CalciteTests
new ServiceEmitter("dummy", "dummy", new NoopEmitter()),
new NoopRequestLogger(),
new AuthConfig(),
TEST_AUTHORIZER_MAPPER
TEST_AUTHORIZER_MAPPER,
Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
);
}