Add result level caching to Brokers (#5028)

* Add result level caching to Brokers

* Minor doc changes

* Simplify sequences

*  Move etag execution

* Modify cacheLimit criteria

* Fix incorrect etag computation

* Fix docs

* Add separate query runner for result level caching

* Update docs

* Add post aggregated results to result level cache

* Fix indents

* Check byte size for exceeding cache limit

* Fix indents

* Fix indents

* Add flag for result caching

* Remove logs

* Make cache object generation synchronous

* Avoid saving intermediate cache results to list

* Fix changes that handle etag based response

* Release bytestream after use

*  Address PR comments

*  Discard resultcache stream after use

* Fix docs

* Address comments

* Add comment about fluent workflow issue
This commit is contained in:
Atul Mohan 2018-03-23 21:11:52 -05:00 committed by Charles Allen
parent ef21ce5a64
commit ec17a44e09
24 changed files with 618 additions and 51 deletions

View File

@ -110,6 +110,9 @@ You can optionally only configure caching to be enabled on the broker by setting
|--------|---------------|-----------|-------|
|`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false|
|`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false|
|`druid.broker.cache.useResultLevelCache`|true, false|Enable result level caching on the broker.|false|
|`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result level cache on the broker.|false|
|`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the historicals|`Integer.MAX_VALUE`|

View File

@ -3,9 +3,10 @@ layout: doc_page
---
# Query Caching
Druid supports query result caching through an LRU cache. Results are stored on a per segment basis, along with the
parameters of a given query. This allows Druid to return final results based partially on segment results in the cache and partially
on segment results from scanning historical/real-time segments.
Druid supports query result caching through an LRU cache. Results are stored as a whole or either on a per segment basis along with the
parameters of a given query. Segment level caching allows Druid to return final results based partially on segment results in the cache
and partially on segment results from scanning historical/real-time segments. Result level caching enables Druid to cache the entire
result set, so that query results can be completely retrieved from the cache for identical queries.
Segment results can be stored in a local heap cache or in an external distributed key/value store. Segment query caches
can be enabled at either the Historical and Broker level (it is not recommended to enable caching on both).
@ -15,6 +16,7 @@ can be enabled at either the Historical and Broker level (it is not recommended
Enabling caching on the broker can yield faster results than if query caches were enabled on Historicals for small clusters. This is
the recommended setup for smaller production clusters (< 20 servers). Take note that when caching is enabled on the Broker,
results from Historicals are returned on a per segment basis, and Historicals will not be able to do any local result merging.
Result level caching is enabled only on the Broker side.
## Query caching on Historicals

View File

@ -15,6 +15,8 @@ The query context is used for various query configuration parameters. The follow
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache or druid.historical.cache.populateCache to determine whether or not to save the results of this query to the query cache |
|useResultLevelCache | `false` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useResultLevelCache to determine whether or not to read from the query cache |
|populateResultLevelCache | `false` | Flag indicating whether to save the results of the query to the result level cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache to determine whether or not to save the results of this query to the query cache |
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |

View File

@ -26,7 +26,7 @@ import io.druid.guice.annotations.ExtensionPoint;
import java.util.concurrent.ExecutorService;
/**
*/
*/
@ExtensionPoint
public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
{
@ -37,6 +37,7 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
* @param query the query to be cached
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
* called on the cached by-segment results
*
* @return true if the query is cacheable, otherwise false.
*/
boolean isCacheable(QueryType query, boolean willMergeRunners);
@ -45,6 +46,7 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
* Computes the cache key for the given query
*
* @param query the query to compute a cache key for
*
* @return the cache key
*/
byte[] computeCacheKey(QueryType query);
@ -58,17 +60,32 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
/**
* Returns a function that converts from the QueryType's result type to something cacheable.
*
* <p>
* The resulting function must be thread-safe.
*
* @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching
*
* @return a thread-safe function that converts the QueryType's result type into something cacheable
*/
Function<T, CacheType> prepareForCache();
Function<T, CacheType> prepareForCache(boolean isResultLevelCache);
/**
* A function that does the inverse of the operation that the function prepareForCache returns
*
* @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching
*
* @return A function that does the inverse of the operation that the function prepareForCache returns
*/
Function<CacheType, T> pullFromCache();
Function<CacheType, T> pullFromCache(boolean isResultLevelCache);
default Function<T, CacheType> prepareForSegmentLevelCache()
{
return prepareForCache(false);
}
default Function<CacheType, T> pullFromSegmentLevelCache()
{
return pullFromCache(false);
}
}

View File

@ -37,6 +37,8 @@ public class QueryContexts
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
public static final boolean DEFAULT_USE_CACHE = true;
public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true;
public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true;
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes
@ -72,6 +74,26 @@ public class QueryContexts
return parseBoolean(query, "useCache", defaultValue);
}
public static <T> boolean isPopulateResultLevelCache(Query<T> query)
{
return isPopulateResultLevelCache(query, DEFAULT_POPULATE_RESULTLEVEL_CACHE);
}
public static <T> boolean isPopulateResultLevelCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "populateResultLevelCache", defaultValue);
}
public static <T> boolean isUseResultLevelCache(Query<T> query)
{
return isUseResultLevelCache(query, DEFAULT_USE_RESULTLEVEL_CACHE);
}
public static <T> boolean isUseResultLevelCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "useResultLevelCache", defaultValue);
}
public static <T> boolean isFinalize(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "finalize", defaultValue);

View File

@ -50,6 +50,7 @@ import io.druid.query.SubqueryQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.MetricManipulatorFns;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
@ -408,7 +409,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
@Override
public Function<Row, Object> prepareForCache()
public Function<Row, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Row, Object>()
{
@ -426,6 +427,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
for (AggregatorFactory agg : aggs) {
retVal.add(event.get(agg.getName()));
}
if (isResultLevelCache) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
retVal.add(event.get(postAgg.getName()));
}
}
return retVal;
}
@ -435,7 +441,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
@Override
public Function<Object, Row> pullFromCache()
public Function<Object, Row> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Row>()
{
@ -460,7 +466,12 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
final AggregatorFactory factory = aggsIter.next();
event.put(factory.getName(), factory.deserialize(results.next()));
}
if (isResultLevelCache) {
Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator();
while (postItr.hasNext() && results.hasNext()) {
event.put(postItr.next().getName(), results.next());
}
}
if (dimsIter.hasNext() || aggsIter.hasNext() || results.hasNext()) {
throw new ISE(
"Found left over objects while reading from cache!! dimsIter[%s] aggsIter[%s] results[%s]",

View File

@ -198,7 +198,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
}
@Override
public Function<SegmentAnalysis, SegmentAnalysis> prepareForCache()
public Function<SegmentAnalysis, SegmentAnalysis> prepareForCache(boolean isResultLevelCache)
{
return new Function<SegmentAnalysis, SegmentAnalysis>()
{
@ -211,7 +211,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
}
@Override
public Function<SegmentAnalysis, SegmentAnalysis> pullFromCache()
public Function<SegmentAnalysis, SegmentAnalysis> pullFromCache(boolean isResultLevelCache)
{
return new Function<SegmentAnalysis, SegmentAnalysis>()
{

View File

@ -206,7 +206,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
@Override
public Function<Result<SearchResultValue>, Object> prepareForCache()
public Function<Result<SearchResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<SearchResultValue>, Object>()
{
@ -221,7 +221,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
@Override
public Function<Object, Result<SearchResultValue>> pullFromCache()
public Function<Object, Result<SearchResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<SearchResultValue>>()
{

View File

@ -243,7 +243,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
}
@Override
public Function<Result<SelectResultValue>, Object> prepareForCache()
public Function<Result<SelectResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<SelectResultValue>, Object>()
{
@ -272,7 +272,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
}
@Override
public Function<Object, Result<SelectResultValue>> pullFromCache()
public Function<Object, Result<SelectResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<SelectResultValue>>()
{

View File

@ -171,7 +171,7 @@ public class TimeBoundaryQueryQueryToolChest
}
@Override
public Function<Result<TimeBoundaryResultValue>, Object> prepareForCache()
public Function<Result<TimeBoundaryResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<TimeBoundaryResultValue>, Object>()
{
@ -184,7 +184,7 @@ public class TimeBoundaryQueryQueryToolChest
}
@Override
public Function<Object, Result<TimeBoundaryResultValue>> pullFromCache()
public Function<Object, Result<TimeBoundaryResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<TimeBoundaryResultValue>>()
{

View File

@ -174,7 +174,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
}
@Override
public Function<Result<TimeseriesResultValue>, Object> prepareForCache()
public Function<Result<TimeseriesResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<TimeseriesResultValue>, Object>()
{
@ -188,14 +188,18 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
for (AggregatorFactory agg : aggs) {
retVal.add(results.getMetric(agg.getName()));
}
if (isResultLevelCache) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
retVal.add(results.getMetric(postAgg.getName()));
}
}
return retVal;
}
};
}
@Override
public Function<Object, Result<TimeseriesResultValue>> pullFromCache()
public Function<Object, Result<TimeseriesResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<TimeseriesResultValue>>()
{
@ -216,6 +220,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
final AggregatorFactory factory = aggsIter.next();
retVal.put(factory.getName(), factory.deserialize(resultIter.next()));
}
if (isResultLevelCache) {
Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator();
while (postItr.hasNext() && resultIter.hasNext()) {
retVal.put(postItr.next().getName(), resultIter.next());
}
}
return new Result<TimeseriesResultValue>(
timestamp,

View File

@ -293,7 +293,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
@Override
public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> getCacheStrategy(final TopNQuery query)
{
@ -341,7 +340,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
@Override
public Function<Result<TopNResultValue>, Object> prepareForCache()
public Function<Result<TopNResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<TopNResultValue>, Object>()
{
@ -361,6 +360,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
for (String aggName : aggFactoryNames) {
vals.add(result.getMetric(aggName));
}
if (isResultLevelCache) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
vals.add(result.getMetric(postAgg.getName()));
}
}
retVal.add(vals);
}
return retVal;
@ -369,7 +373,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
@Override
public Function<Object, Result<TopNResultValue>> pullFromCache()
public Function<Object, Result<TopNResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<TopNResultValue>>()
{
@ -401,7 +405,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
for (PostAggregator postAgg : postAggs) {
vals.put(postAgg.getName(), postAgg.compute(vals));
}
if (isResultLevelCache) {
Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator();
while (postItr.hasNext() && resultIter.hasNext()) {
vals.put(postItr.next().getName(), resultIter.next());
}
}
retVal.add(vals);
}

View File

@ -94,7 +94,7 @@ public class SegmentMetadataQueryQueryToolChestTest
null
);
Object preparedValue = strategy.prepareForCache().apply(result);
Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result);
ObjectMapper objectMapper = new DefaultObjectMapper();
SegmentAnalysis fromCacheValue = objectMapper.readValue(
@ -102,7 +102,7 @@ public class SegmentMetadataQueryQueryToolChestTest
strategy.getCacheObjectClazz()
);
SegmentAnalysis fromCacheResult = strategy.pullFromCache().apply(fromCacheValue);
SegmentAnalysis fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue);
Assert.assertEquals(result, fromCacheResult);
}

View File

@ -59,7 +59,7 @@ public class SearchQueryQueryToolChestTest
new SearchResultValue(ImmutableList.of(new SearchHit("dim1", "a")))
);
Object preparedValue = strategy.prepareForCache().apply(
Object preparedValue = strategy.prepareForSegmentLevelCache().apply(
result
);
@ -69,7 +69,7 @@ public class SearchQueryQueryToolChestTest
strategy.getCacheObjectClazz()
);
Result<SearchResultValue> fromCacheResult = strategy.pullFromCache().apply(fromCacheValue);
Result<SearchResultValue> fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue);
Assert.assertEquals(result, fromCacheResult);
}

View File

@ -215,7 +215,7 @@ public class TimeBoundaryQueryQueryToolChestTest
)
);
Object preparedValue = strategy.prepareForCache().apply(
Object preparedValue = strategy.prepareForSegmentLevelCache().apply(
result
);
@ -225,7 +225,7 @@ public class TimeBoundaryQueryQueryToolChestTest
strategy.getCacheObjectClazz()
);
Result<TimeBoundaryResultValue> fromCacheResult = strategy.pullFromCache().apply(fromCacheValue);
Result<TimeBoundaryResultValue> fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue);
Assert.assertEquals(result, fromCacheResult);
}

View File

@ -32,6 +32,8 @@ import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.TestHelper;
import io.druid.segment.VirtualColumns;
@ -76,12 +78,12 @@ public class TimeseriesQueryQueryToolChestTest
new CountAggregatorFactory("metric1"),
new LongSumAggregatorFactory("metric0", "metric0")
),
null,
ImmutableList.<PostAggregator>of(new ConstantPostAggregator("post", 10)),
null
)
);
final Result<TimeseriesResultValue> result = new Result<>(
final Result<TimeseriesResultValue> result1 = new Result<>(
// test timestamps that result in integer size millis
DateTimes.utc(123L),
new TimeseriesResultValue(
@ -89,7 +91,7 @@ public class TimeseriesQueryQueryToolChestTest
)
);
Object preparedValue = strategy.prepareForCache().apply(result);
Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result1);
ObjectMapper objectMapper = TestHelper.makeJsonMapper();
Object fromCacheValue = objectMapper.readValue(
@ -97,9 +99,26 @@ public class TimeseriesQueryQueryToolChestTest
strategy.getCacheObjectClazz()
);
Result<TimeseriesResultValue> fromCacheResult = strategy.pullFromCache().apply(fromCacheValue);
Result<TimeseriesResultValue> fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue);
Assert.assertEquals(result, fromCacheResult);
Assert.assertEquals(result1, fromCacheResult);
final Result<TimeseriesResultValue> result2 = new Result<>(
// test timestamps that result in integer size millis
DateTimes.utc(123L),
new TimeseriesResultValue(
ImmutableMap.of("metric1", 2, "metric0", 3, "post", 10)
)
);
Object preparedResultLevelCacheValue = strategy.prepareForCache(true).apply(result2);
Object fromResultLevelCacheValue = objectMapper.readValue(
objectMapper.writeValueAsBytes(preparedResultLevelCacheValue),
strategy.getCacheObjectClazz()
);
Result<TimeseriesResultValue> fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue);
Assert.assertEquals(result2, fromResultLevelCacheRes);
}
@Test

View File

@ -78,7 +78,7 @@ public class TopNQueryQueryToolChestTest
)
);
final Result<TopNResultValue> result = new Result<>(
final Result<TopNResultValue> result1 = new Result<>(
// test timestamps that result in integer size millis
DateTimes.utc(123L),
new TopNResultValue(
@ -91,8 +91,8 @@ public class TopNQueryQueryToolChestTest
)
);
Object preparedValue = strategy.prepareForCache().apply(
result
Object preparedValue = strategy.prepareForSegmentLevelCache().apply(
result1
);
ObjectMapper objectMapper = TestHelper.makeJsonMapper();
@ -101,9 +101,36 @@ public class TopNQueryQueryToolChestTest
strategy.getCacheObjectClazz()
);
Result<TopNResultValue> fromCacheResult = strategy.pullFromCache().apply(fromCacheValue);
Result<TopNResultValue> fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue);
Assert.assertEquals(result1, fromCacheResult);
final Result<TopNResultValue> result2 = new Result<>(
// test timestamps that result in integer size millis
DateTimes.utc(123L),
new TopNResultValue(
Arrays.asList(
ImmutableMap.<String, Object>of(
"test", "val1",
"metric1", 2,
"post", 10
)
)
)
);
Object preparedResultCacheValue = strategy.prepareForCache(true).apply(
result2
);
Object fromResultCacheValue = objectMapper.readValue(
objectMapper.writeValueAsBytes(preparedResultCacheValue),
strategy.getCacheObjectClazz()
);
Result<TopNResultValue> fromResultCacheResult = strategy.pullFromCache(true).apply(fromResultCacheValue);
Assert.assertEquals(result2, fromResultCacheResult);
Assert.assertEquals(result, fromCacheResult);
}
@Test

View File

@ -500,7 +500,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
return;
}
final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache();
final Function<Object, T> pullFromCacheFunction = strategy.pullFromSegmentLevelCache();
final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz();
for (Pair<Interval, byte[]> cachedResultPair : cachedResults) {
final byte[] cachedResult = cachedResultPair.rhs;
@ -600,7 +600,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
.withQuerySegmentSpec(segmentsOfServerSpec),
responseContext
);
final Function<T, Object> cacheFn = strategy.prepareForCache();
final Function<T, Object> cacheFn = strategy.prepareForSegmentLevelCache();
return resultsBySegments
.map(result -> {
final BySegmentResultValueClass<T> resultsOfSegment = result.getValue();

View File

@ -102,7 +102,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
}
if (useCache) {
final Function cacheFn = strategy.pullFromCache();
final Function cacheFn = strategy.pullFromSegmentLevelCache();
final byte[] cachedResult = cache.get(key);
if (cachedResult != null) {
final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz();
@ -142,7 +142,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
final Collection<ListenableFuture<?>> cacheFutures = Collections.synchronizedList(Lists.<ListenableFuture<?>>newLinkedList());
if (populateCache) {
final Function cacheFn = strategy.prepareForCache();
final Function cacheFn = strategy.prepareForSegmentLevelCache();
return Sequences.withEffect(
Sequences.map(

View File

@ -0,0 +1,94 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.StringUtils;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
public class ResultLevelCacheUtil
{
private static final Logger log = new Logger(ResultLevelCacheUtil.class);
public static Cache.NamedKey computeResultLevelCacheKey(
String resultLevelCacheIdentifier
)
{
return new Cache.NamedKey(
resultLevelCacheIdentifier, StringUtils.toUtf8(resultLevelCacheIdentifier)
);
}
public static void populate(
Cache cache,
Cache.NamedKey key,
byte[] resultBytes
)
{
log.debug("Populating results into cache");
cache.put(key, resultBytes);
}
public static <T> boolean useResultLevelCacheOnBrokers(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return useResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false);
}
public static <T> boolean populateResultLevelCacheOnBrokers(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return populateResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false);
}
private static <T> boolean useResultLevelCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return QueryContexts.isUseResultLevelCache(query)
&& strategy != null
&& cacheConfig.isUseResultLevelCache()
&& cacheConfig.isQueryCacheable(query);
}
private static <T> boolean populateResultLevelCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return QueryContexts.isPopulateResultLevelCache(query)
&& strategy != null
&& cacheConfig.isPopulateResultLevelCache()
&& cacheConfig.isQueryCacheable(query);
}
}

View File

@ -29,13 +29,20 @@ import java.util.List;
public class CacheConfig
{
public static final String POPULATE_CACHE = "populateCache";
// The defaults defined here for cache related parameters are different from the QueryContext defaults due to legacy reasons.
// They should be made the same at some point in the future.
@JsonProperty
private boolean useCache = false;
@JsonProperty
private boolean populateCache = false;
@JsonProperty
private boolean useResultLevelCache = false;
@JsonProperty
private boolean populateResultLevelCache = false;
@JsonProperty
@Min(0)
private int numBackgroundThreads = 0;
@ -44,6 +51,9 @@ public class CacheConfig
@Min(0)
private int cacheBulkMergeLimit = Integer.MAX_VALUE;
@JsonProperty
private int resultLevelCacheLimit = Integer.MAX_VALUE;
@JsonProperty
private List<String> unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT);
@ -57,6 +67,16 @@ public class CacheConfig
return useCache;
}
public boolean isPopulateResultLevelCache()
{
return populateResultLevelCache;
}
public boolean isUseResultLevelCache()
{
return useResultLevelCache;
}
public int getNumBackgroundThreads()
{
return numBackgroundThreads;
@ -67,6 +87,11 @@ public class CacheConfig
return cacheBulkMergeLimit;
}
public int getResultLevelCacheLimit()
{
return resultLevelCacheLimit;
}
public boolean isQueryCacheable(Query query)
{
return isQueryCacheable(query.getType());

View File

@ -0,0 +1,302 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.druid.client.ResultLevelCacheUtil;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.SequenceWrapper;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.QueryResource;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
{
private static final Logger log = new Logger(ResultLevelCachingQueryRunner.class);
private final QueryRunner baseRunner;
private ObjectMapper objectMapper;
private final Cache cache;
private final CacheConfig cacheConfig;
private final boolean useResultCache;
private final boolean populateResultCache;
private Query<T> query;
private final CacheStrategy<T, Object, Query<T>> strategy;
public ResultLevelCachingQueryRunner(
QueryRunner baseRunner,
QueryToolChest queryToolChest,
Query<T> query,
ObjectMapper objectMapper,
Cache cache,
CacheConfig cacheConfig
)
{
this.baseRunner = baseRunner;
this.objectMapper = objectMapper;
this.cache = cache;
this.cacheConfig = cacheConfig;
this.query = query;
this.strategy = queryToolChest.getCacheStrategy(query);
this.populateResultCache = ResultLevelCacheUtil.populateResultLevelCacheOnBrokers(query, strategy, cacheConfig);
this.useResultCache = ResultLevelCacheUtil.useResultLevelCacheOnBrokers(query, strategy, cacheConfig);
}
@Override
public Sequence<T> run(QueryPlus queryPlus, Map responseContext)
{
if (useResultCache || populateResultCache) {
final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeCacheKey(query));
final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr);
String existingResultSetId = extractEtagFromResults(cachedResultSet);
existingResultSetId = existingResultSetId == null ? "" : existingResultSetId;
query = query.withOverriddenContext(
ImmutableMap.of(QueryResource.HEADER_IF_NONE_MATCH, existingResultSetId));
Sequence<T> resultFromClient = baseRunner.run(
QueryPlus.wrap(query),
responseContext
);
String newResultSetId = (String) responseContext.get(QueryResource.HEADER_ETAG);
if (useResultCache && newResultSetId != null && newResultSetId.equals(existingResultSetId)) {
log.debug("Return cached result set as there is no change in identifiers for query %s ", query.getId());
return deserializeResults(cachedResultSet, strategy, existingResultSetId);
} else {
@Nullable
ResultLevelCachePopulator resultLevelCachePopulator = createResultLevelCachePopulator(
cacheKeyStr,
newResultSetId
);
if (resultLevelCachePopulator == null) {
return resultFromClient;
}
final Function<T, Object> cacheFn = strategy.prepareForCache(true);
return Sequences.wrap(Sequences.map(
resultFromClient,
new Function<T, T>()
{
@Override
public T apply(T input)
{
if (resultLevelCachePopulator.isShouldPopulate()) {
resultLevelCachePopulator.cacheResultEntry(resultLevelCachePopulator, input, cacheFn);
}
return input;
}
}
), new SequenceWrapper()
{
@Override
public void after(boolean isDone, Throwable thrown)
{
Preconditions.checkNotNull(
resultLevelCachePopulator,
"ResultLevelCachePopulator cannot be null during cache population"
);
if (thrown != null) {
log.error(
thrown,
"Error while preparing for result level caching for query %s with error %s ",
query.getId(),
thrown.getMessage()
);
} else if (resultLevelCachePopulator.isShouldPopulate()) {
// The resultset identifier and its length is cached along with the resultset
resultLevelCachePopulator.populateResults();
log.debug("Cache population complete for query %s", query.getId());
}
resultLevelCachePopulator.cacheObjectStream = null;
}
});
}
} else {
return baseRunner.run(
queryPlus,
responseContext
);
}
}
private byte[] fetchResultsFromResultLevelCache(
final String queryCacheKey
)
{
if (useResultCache && queryCacheKey != null) {
return cache.get(ResultLevelCacheUtil.computeResultLevelCacheKey(queryCacheKey));
}
return null;
}
private String extractEtagFromResults(
final byte[] cachedResult
)
{
if (cachedResult == null) {
return null;
}
log.debug("Fetching result level cache identifier for query: %s", query.getId());
int etagLength = ByteBuffer.wrap(cachedResult, 0, Integer.BYTES).getInt();
return StringUtils.fromUtf8(Arrays.copyOfRange(cachedResult, Integer.BYTES, etagLength + Integer.BYTES));
}
private Sequence<T> deserializeResults(
final byte[] cachedResult, CacheStrategy strategy, String resultSetId
)
{
if (cachedResult == null) {
log.error("Cached result set is null");
}
final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache(true);
final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz();
//Skip the resultsetID and its length bytes
Sequence<T> cachedSequence = Sequences.simple(() -> {
try {
int resultOffset = Integer.BYTES + resultSetId.length();
return objectMapper.readValues(
objectMapper.getFactory().createParser(
cachedResult,
resultOffset,
cachedResult.length - resultOffset
),
cacheObjectClazz
);
}
catch (IOException e) {
throw new RE(e, "Failed to retrieve results from cache for query ID [%s]", query.getId());
}
});
return Sequences.map(cachedSequence, pullFromCacheFunction);
}
private ResultLevelCachePopulator createResultLevelCachePopulator(
String cacheKeyStr,
String resultSetId
)
{
if (resultSetId != null && populateResultCache) {
ResultLevelCachePopulator resultLevelCachePopulator = new ResultLevelCachePopulator(
cache,
objectMapper,
ResultLevelCacheUtil.computeResultLevelCacheKey(cacheKeyStr),
cacheConfig,
true
);
try {
// Save the resultSetId and its length
resultLevelCachePopulator.cacheObjectStream.write(ByteBuffer.allocate(Integer.BYTES)
.putInt(resultSetId.length())
.array());
resultLevelCachePopulator.cacheObjectStream.write(StringUtils.toUtf8(resultSetId));
}
catch (IOException ioe) {
log.error(ioe, "Failed to write cached values for query %s", query.getId());
return null;
}
return resultLevelCachePopulator;
} else {
return null;
}
}
public class ResultLevelCachePopulator
{
private final Cache cache;
private final ObjectMapper mapper;
private final Cache.NamedKey key;
private final CacheConfig cacheConfig;
private ByteArrayOutputStream cacheObjectStream = new ByteArrayOutputStream();
public boolean isShouldPopulate()
{
return shouldPopulate;
}
private boolean shouldPopulate;
private ResultLevelCachePopulator(
Cache cache,
ObjectMapper mapper,
Cache.NamedKey key,
CacheConfig cacheConfig,
boolean shouldPopulate
)
{
this.cache = cache;
this.mapper = mapper;
this.key = key;
this.cacheConfig = cacheConfig;
this.shouldPopulate = shouldPopulate;
}
private void cacheResultEntry(
ResultLevelCachePopulator resultLevelCachePopulator,
T resultEntry,
Function<T, Object> cacheFn
)
{
int cacheLimit = cacheConfig.getResultLevelCacheLimit();
try (JsonGenerator gen = mapper.getFactory().createGenerator(resultLevelCachePopulator.cacheObjectStream)) {
gen.writeObject(cacheFn.apply(resultEntry));
if (cacheLimit > 0 && resultLevelCachePopulator.cacheObjectStream.size() > cacheLimit) {
shouldPopulate = false;
resultLevelCachePopulator.cacheObjectStream = null;
return;
}
}
catch (IOException ex) {
log.error(ex, "Failed to retrieve entry to be cached. Result Level caching will not be performed!");
shouldPopulate = false;
resultLevelCachePopulator.cacheObjectStream = null;
}
}
public void populateResults()
{
ResultLevelCacheUtil.populate(
cache,
key,
cacheObjectStream.toByteArray()
);
}
}
}

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.CachingClusteredClient;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.query.FluentQueryRunnerBuilder;
import io.druid.query.PostProcessingOperator;
import io.druid.query.Query;
@ -31,6 +33,7 @@ import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.ResultLevelCachingQueryRunner;
import io.druid.query.RetryQueryRunner;
import io.druid.query.RetryQueryRunnerConfig;
import io.druid.query.SegmentDescriptor;
@ -47,6 +50,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private final RetryQueryRunnerConfig retryConfig;
private final ObjectMapper objectMapper;
private final ServerConfig serverConfig;
private final Cache cache;
private final CacheConfig cacheConfig;
@Inject
public ClientQuerySegmentWalker(
@ -55,7 +61,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
QueryToolChestWarehouse warehouse,
RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper,
ServerConfig serverConfig
ServerConfig serverConfig,
Cache cache,
CacheConfig cacheConfig
)
{
this.emitter = emitter;
@ -64,6 +72,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
this.retryConfig = retryConfig;
this.objectMapper = objectMapper;
this.serverConfig = serverConfig;
this.cache = cache;
this.cacheConfig = cacheConfig;
}
@Override
@ -81,6 +91,22 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private <T> QueryRunner<T> makeRunner(Query<T> query, QueryRunner<T> baseClientRunner)
{
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
// This does not adhere to the fluent workflow. See https://github.com/druid-io/druid/issues/5517
return new ResultLevelCachingQueryRunner<>(makeRunner(query, baseClientRunner, toolChest),
toolChest,
query,
objectMapper,
cache,
cacheConfig);
}
private <T> QueryRunner<T> makeRunner(
Query<T> query,
QueryRunner<T> baseClientRunner,
QueryToolChest<T, Query<T>> toolChest
)
{
PostProcessingOperator<T> postProcessing = objectMapper.convertValue(
query.<String>getContextValue("postProcessing"),
new TypeReference<PostProcessingOperator<T>>()
@ -105,6 +131,4 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
.emitCPUTimeMetric(emitter)
.postProcess(postProcessing);
}
}

View File

@ -314,7 +314,7 @@ public class CachingQueryRunnerTest
byte[] cacheValue = cache.get(cacheKey);
Assert.assertNotNull(cacheValue);
Function<Object, Result> fn = cacheStrategy.pullFromCache();
Function<Object, Result> fn = cacheStrategy.pullFromSegmentLevelCache();
List<Result> cacheResults = Lists.newArrayList(
Iterators.transform(
objectMapper.readValues(
@ -349,7 +349,7 @@ public class CachingQueryRunnerTest
cache,
objectMapper,
cacheKey,
Iterables.transform(expectedResults, cacheStrategy.prepareForCache())
Iterables.transform(expectedResults, cacheStrategy.prepareForSegmentLevelCache())
);
CachingQueryRunner runner = new CachingQueryRunner(