From ec17a44e0983cd6391ba9f60a1c5571a59595b0a Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Fri, 23 Mar 2018 21:11:52 -0500 Subject: [PATCH] Add result level caching to Brokers (#5028) * Add result level caching to Brokers * Minor doc changes * Simplify sequences * Move etag execution * Modify cacheLimit criteria * Fix incorrect etag computation * Fix docs * Add separate query runner for result level caching * Update docs * Add post aggregated results to result level cache * Fix indents * Check byte size for exceeding cache limit * Fix indents * Fix indents * Add flag for result caching * Remove logs * Make cache object generation synchronous * Avoid saving intermediate cache results to list * Fix changes that handle etag based response * Release bytestream after use * Address PR comments * Discard resultcache stream after use * Fix docs * Address comments * Add comment about fluent workflow issue --- docs/content/configuration/broker.md | 3 + docs/content/querying/caching.md | 8 +- docs/content/querying/query-context.md | 2 + .../java/io/druid/query/CacheStrategy.java | 25 +- .../java/io/druid/query/QueryContexts.java | 22 ++ .../groupby/GroupByQueryQueryToolChest.java | 17 +- .../SegmentMetadataQueryQueryToolChest.java | 4 +- .../search/SearchQueryQueryToolChest.java | 4 +- .../select/SelectQueryQueryToolChest.java | 4 +- .../TimeBoundaryQueryQueryToolChest.java | 4 +- .../TimeseriesQueryQueryToolChest.java | 16 +- .../query/topn/TopNQueryQueryToolChest.java | 17 +- ...egmentMetadataQueryQueryToolChestTest.java | 4 +- .../search/SearchQueryQueryToolChestTest.java | 4 +- .../TimeBoundaryQueryQueryToolChestTest.java | 4 +- .../TimeseriesQueryQueryToolChestTest.java | 29 +- .../topn/TopNQueryQueryToolChestTest.java | 37 ++- .../druid/client/CachingClusteredClient.java | 4 +- .../io/druid/client/CachingQueryRunner.java | 4 +- .../io/druid/client/ResultLevelCacheUtil.java | 94 ++++++ .../io/druid/client/cache/CacheConfig.java | 27 +- .../query/ResultLevelCachingQueryRunner.java | 302 ++++++++++++++++++ .../server/ClientQuerySegmentWalker.java | 30 +- .../druid/client/CachingQueryRunnerTest.java | 4 +- 24 files changed, 618 insertions(+), 51 deletions(-) create mode 100644 server/src/main/java/io/druid/client/ResultLevelCacheUtil.java create mode 100644 server/src/main/java/io/druid/query/ResultLevelCachingQueryRunner.java diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index f36ae2a95e2..0d57e843e34 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -110,6 +110,9 @@ You can optionally only configure caching to be enabled on the broker by setting |--------|---------------|-----------|-------| |`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false| |`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false| +|`druid.broker.cache.useResultLevelCache`|true, false|Enable result level caching on the broker.|false| +|`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result level cache on the broker.|false| +|`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`| |`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`| |`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the historicals|`Integer.MAX_VALUE`| diff --git a/docs/content/querying/caching.md b/docs/content/querying/caching.md index 1d6f60af441..2dd2c88d6b6 100644 --- a/docs/content/querying/caching.md +++ b/docs/content/querying/caching.md @@ -3,9 +3,10 @@ layout: doc_page --- # Query Caching -Druid supports query result caching through an LRU cache. Results are stored on a per segment basis, along with the -parameters of a given query. This allows Druid to return final results based partially on segment results in the cache and partially -on segment results from scanning historical/real-time segments. +Druid supports query result caching through an LRU cache. Results are stored as a whole or either on a per segment basis along with the +parameters of a given query. Segment level caching allows Druid to return final results based partially on segment results in the cache +and partially on segment results from scanning historical/real-time segments. Result level caching enables Druid to cache the entire +result set, so that query results can be completely retrieved from the cache for identical queries. Segment results can be stored in a local heap cache or in an external distributed key/value store. Segment query caches can be enabled at either the Historical and Broker level (it is not recommended to enable caching on both). @@ -15,6 +16,7 @@ can be enabled at either the Historical and Broker level (it is not recommended Enabling caching on the broker can yield faster results than if query caches were enabled on Historicals for small clusters. This is the recommended setup for smaller production clusters (< 20 servers). Take note that when caching is enabled on the Broker, results from Historicals are returned on a per segment basis, and Historicals will not be able to do any local result merging. +Result level caching is enabled only on the Broker side. ## Query caching on Historicals diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index b0effe81cf9..d4e2be28f12 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -15,6 +15,8 @@ The query context is used for various query configuration parameters. The follow |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | |useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache | |populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache or druid.historical.cache.populateCache to determine whether or not to save the results of this query to the query cache | +|useResultLevelCache | `false` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useResultLevelCache to determine whether or not to read from the query cache | +|populateResultLevelCache | `false` | Flag indicating whether to save the results of the query to the result level cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses druid.broker.cache.populateCache to determine whether or not to save the results of this query to the query cache | |bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | |finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | |chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | diff --git a/processing/src/main/java/io/druid/query/CacheStrategy.java b/processing/src/main/java/io/druid/query/CacheStrategy.java index 95681dd1b06..2c3ec35ed19 100644 --- a/processing/src/main/java/io/druid/query/CacheStrategy.java +++ b/processing/src/main/java/io/druid/query/CacheStrategy.java @@ -26,7 +26,7 @@ import io.druid.guice.annotations.ExtensionPoint; import java.util.concurrent.ExecutorService; /** -*/ + */ @ExtensionPoint public interface CacheStrategy> { @@ -37,6 +37,7 @@ public interface CacheStrategy> * @param query the query to be cached * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be * called on the cached by-segment results + * * @return true if the query is cacheable, otherwise false. */ boolean isCacheable(QueryType query, boolean willMergeRunners); @@ -45,6 +46,7 @@ public interface CacheStrategy> * Computes the cache key for the given query * * @param query the query to compute a cache key for + * * @return the cache key */ byte[] computeCacheKey(QueryType query); @@ -58,17 +60,32 @@ public interface CacheStrategy> /** * Returns a function that converts from the QueryType's result type to something cacheable. - * + *

* The resulting function must be thread-safe. * + * @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching + * * @return a thread-safe function that converts the QueryType's result type into something cacheable */ - Function prepareForCache(); + Function prepareForCache(boolean isResultLevelCache); /** * A function that does the inverse of the operation that the function prepareForCache returns * + * @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching + * * @return A function that does the inverse of the operation that the function prepareForCache returns */ - Function pullFromCache(); + Function pullFromCache(boolean isResultLevelCache); + + + default Function prepareForSegmentLevelCache() + { + return prepareForCache(false); + } + + default Function pullFromSegmentLevelCache() + { + return pullFromCache(false); + } } diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index d0a16fd8784..d88a536eb9e 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -37,6 +37,8 @@ public class QueryContexts public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; public static final boolean DEFAULT_USE_CACHE = true; + public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true; + public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true; public static final int DEFAULT_PRIORITY = 0; public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0; public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes @@ -72,6 +74,26 @@ public class QueryContexts return parseBoolean(query, "useCache", defaultValue); } + public static boolean isPopulateResultLevelCache(Query query) + { + return isPopulateResultLevelCache(query, DEFAULT_POPULATE_RESULTLEVEL_CACHE); + } + + public static boolean isPopulateResultLevelCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "populateResultLevelCache", defaultValue); + } + + public static boolean isUseResultLevelCache(Query query) + { + return isUseResultLevelCache(query, DEFAULT_USE_RESULTLEVEL_CACHE); + } + + public static boolean isUseResultLevelCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "useResultLevelCache", defaultValue); + } + public static boolean isFinalize(Query query, boolean defaultValue) { return parseBoolean(query, "finalize", defaultValue); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 28d5acb42de..1252caa40dd 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -50,6 +50,7 @@ import io.druid.query.SubqueryQueryRunner; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulatorFns; +import io.druid.query.aggregation.PostAggregator; import io.druid.query.cache.CacheKeyBuilder; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; @@ -408,7 +409,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest prepareForCache() + public Function prepareForCache(boolean isResultLevelCache) { return new Function() { @@ -426,6 +427,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest pullFromCache() + public Function pullFromCache(boolean isResultLevelCache) { return new Function() { @@ -460,7 +466,12 @@ public class GroupByQueryQueryToolChest extends QueryToolChest postItr = query.getPostAggregatorSpecs().iterator(); + while (postItr.hasNext() && results.hasNext()) { + event.put(postItr.next().getName(), results.next()); + } + } if (dimsIter.hasNext() || aggsIter.hasNext() || results.hasNext()) { throw new ISE( "Found left over objects while reading from cache!! dimsIter[%s] aggsIter[%s] results[%s]", diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 4a921480f30..5025d9dbc97 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -198,7 +198,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest prepareForCache() + public Function prepareForCache(boolean isResultLevelCache) { return new Function() { @@ -211,7 +211,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest pullFromCache() + public Function pullFromCache(boolean isResultLevelCache) { return new Function() { diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 74148d66aea..9efad496b35 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -206,7 +206,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -221,7 +221,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 78639151560..c0c3d827850 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -243,7 +243,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -272,7 +272,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 99586b595dd..a1046a00c05 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -171,7 +171,7 @@ public class TimeBoundaryQueryQueryToolChest } @Override - public Function, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -184,7 +184,7 @@ public class TimeBoundaryQueryQueryToolChest } @Override - public Function> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 473775b71bc..6e9f0307f70 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -174,7 +174,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -188,14 +188,18 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { @@ -216,6 +220,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest postItr = query.getPostAggregatorSpecs().iterator(); + while (postItr.hasNext() && resultIter.hasNext()) { + retVal.put(postItr.next().getName(), resultIter.next()); + } + } return new Result( timestamp, diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 81761afc6a3..c34e43eb1a4 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -293,7 +293,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Object, TopNQuery> getCacheStrategy(final TopNQuery query) { @@ -341,7 +340,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Object> prepareForCache() + public Function, Object> prepareForCache(boolean isResultLevelCache) { return new Function, Object>() { @@ -361,6 +360,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest> pullFromCache() + public Function> pullFromCache(boolean isResultLevelCache) { return new Function>() { @@ -401,7 +405,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest postItr = query.getPostAggregatorSpecs().iterator(); + while (postItr.hasNext() && resultIter.hasNext()) { + vals.put(postItr.next().getName(), resultIter.next()); + } + } retVal.add(vals); } diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java index 359e39f6af2..a497a631380 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java @@ -94,7 +94,7 @@ public class SegmentMetadataQueryQueryToolChestTest null ); - Object preparedValue = strategy.prepareForCache().apply(result); + Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result); ObjectMapper objectMapper = new DefaultObjectMapper(); SegmentAnalysis fromCacheValue = objectMapper.readValue( @@ -102,7 +102,7 @@ public class SegmentMetadataQueryQueryToolChestTest strategy.getCacheObjectClazz() ); - SegmentAnalysis fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + SegmentAnalysis fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); Assert.assertEquals(result, fromCacheResult); } diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryQueryToolChestTest.java index 621a435f5e9..dd73518ae5f 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryQueryToolChestTest.java @@ -59,7 +59,7 @@ public class SearchQueryQueryToolChestTest new SearchResultValue(ImmutableList.of(new SearchHit("dim1", "a"))) ); - Object preparedValue = strategy.prepareForCache().apply( + Object preparedValue = strategy.prepareForSegmentLevelCache().apply( result ); @@ -69,7 +69,7 @@ public class SearchQueryQueryToolChestTest strategy.getCacheObjectClazz() ); - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + Result fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); Assert.assertEquals(result, fromCacheResult); } diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java index e9092200682..d947613dbf6 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java @@ -215,7 +215,7 @@ public class TimeBoundaryQueryQueryToolChestTest ) ); - Object preparedValue = strategy.prepareForCache().apply( + Object preparedValue = strategy.prepareForSegmentLevelCache().apply( result ); @@ -225,7 +225,7 @@ public class TimeBoundaryQueryQueryToolChestTest strategy.getCacheObjectClazz() ); - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + Result fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); Assert.assertEquals(result, fromCacheResult); } diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java index 8e67921dd83..0c4c0e7b6e8 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java @@ -32,6 +32,8 @@ import io.druid.query.Result; import io.druid.query.TableDataSource; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.aggregation.post.ConstantPostAggregator; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.TestHelper; import io.druid.segment.VirtualColumns; @@ -76,12 +78,12 @@ public class TimeseriesQueryQueryToolChestTest new CountAggregatorFactory("metric1"), new LongSumAggregatorFactory("metric0", "metric0") ), - null, + ImmutableList.of(new ConstantPostAggregator("post", 10)), null ) ); - final Result result = new Result<>( + final Result result1 = new Result<>( // test timestamps that result in integer size millis DateTimes.utc(123L), new TimeseriesResultValue( @@ -89,7 +91,7 @@ public class TimeseriesQueryQueryToolChestTest ) ); - Object preparedValue = strategy.prepareForCache().apply(result); + Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result1); ObjectMapper objectMapper = TestHelper.makeJsonMapper(); Object fromCacheValue = objectMapper.readValue( @@ -97,9 +99,26 @@ public class TimeseriesQueryQueryToolChestTest strategy.getCacheObjectClazz() ); - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + Result fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); - Assert.assertEquals(result, fromCacheResult); + Assert.assertEquals(result1, fromCacheResult); + + final Result result2 = new Result<>( + // test timestamps that result in integer size millis + DateTimes.utc(123L), + new TimeseriesResultValue( + ImmutableMap.of("metric1", 2, "metric0", 3, "post", 10) + ) + ); + + Object preparedResultLevelCacheValue = strategy.prepareForCache(true).apply(result2); + Object fromResultLevelCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedResultLevelCacheValue), + strategy.getCacheObjectClazz() + ); + + Result fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue); + Assert.assertEquals(result2, fromResultLevelCacheRes); } @Test diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java index 5cd277123a6..1a4e78351f7 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -78,7 +78,7 @@ public class TopNQueryQueryToolChestTest ) ); - final Result result = new Result<>( + final Result result1 = new Result<>( // test timestamps that result in integer size millis DateTimes.utc(123L), new TopNResultValue( @@ -91,8 +91,8 @@ public class TopNQueryQueryToolChestTest ) ); - Object preparedValue = strategy.prepareForCache().apply( - result + Object preparedValue = strategy.prepareForSegmentLevelCache().apply( + result1 ); ObjectMapper objectMapper = TestHelper.makeJsonMapper(); @@ -101,9 +101,36 @@ public class TopNQueryQueryToolChestTest strategy.getCacheObjectClazz() ); - Result fromCacheResult = strategy.pullFromCache().apply(fromCacheValue); + Result fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue); + + Assert.assertEquals(result1, fromCacheResult); + + final Result result2 = new Result<>( + // test timestamps that result in integer size millis + DateTimes.utc(123L), + new TopNResultValue( + Arrays.asList( + ImmutableMap.of( + "test", "val1", + "metric1", 2, + "post", 10 + ) + ) + ) + ); + + Object preparedResultCacheValue = strategy.prepareForCache(true).apply( + result2 + ); + + Object fromResultCacheValue = objectMapper.readValue( + objectMapper.writeValueAsBytes(preparedResultCacheValue), + strategy.getCacheObjectClazz() + ); + + Result fromResultCacheResult = strategy.pullFromCache(true).apply(fromResultCacheValue); + Assert.assertEquals(result2, fromResultCacheResult); - Assert.assertEquals(result, fromCacheResult); } @Test diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 1e2e441949d..df48cc0f21e 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -500,7 +500,7 @@ public class CachingClusteredClient implements QuerySegmentWalker return; } - final Function pullFromCacheFunction = strategy.pullFromCache(); + final Function pullFromCacheFunction = strategy.pullFromSegmentLevelCache(); final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz(); for (Pair cachedResultPair : cachedResults) { final byte[] cachedResult = cachedResultPair.rhs; @@ -600,7 +600,7 @@ public class CachingClusteredClient implements QuerySegmentWalker .withQuerySegmentSpec(segmentsOfServerSpec), responseContext ); - final Function cacheFn = strategy.prepareForCache(); + final Function cacheFn = strategy.prepareForSegmentLevelCache(); return resultsBySegments .map(result -> { final BySegmentResultValueClass resultsOfSegment = result.getValue(); diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 54ecc5c35a9..79bcbfb0d17 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -102,7 +102,7 @@ public class CachingQueryRunner implements QueryRunner } if (useCache) { - final Function cacheFn = strategy.pullFromCache(); + final Function cacheFn = strategy.pullFromSegmentLevelCache(); final byte[] cachedResult = cache.get(key); if (cachedResult != null) { final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz(); @@ -142,7 +142,7 @@ public class CachingQueryRunner implements QueryRunner final Collection> cacheFutures = Collections.synchronizedList(Lists.>newLinkedList()); if (populateCache) { - final Function cacheFn = strategy.prepareForCache(); + final Function cacheFn = strategy.prepareForSegmentLevelCache(); return Sequences.withEffect( Sequences.map( diff --git a/server/src/main/java/io/druid/client/ResultLevelCacheUtil.java b/server/src/main/java/io/druid/client/ResultLevelCacheUtil.java new file mode 100644 index 00000000000..f55d2e6bba6 --- /dev/null +++ b/server/src/main/java/io/druid/client/ResultLevelCacheUtil.java @@ -0,0 +1,94 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.StringUtils; +import io.druid.query.CacheStrategy; +import io.druid.query.Query; +import io.druid.query.QueryContexts; + +public class ResultLevelCacheUtil +{ + private static final Logger log = new Logger(ResultLevelCacheUtil.class); + + public static Cache.NamedKey computeResultLevelCacheKey( + String resultLevelCacheIdentifier + ) + { + return new Cache.NamedKey( + resultLevelCacheIdentifier, StringUtils.toUtf8(resultLevelCacheIdentifier) + ); + } + + public static void populate( + Cache cache, + Cache.NamedKey key, + byte[] resultBytes + ) + { + log.debug("Populating results into cache"); + cache.put(key, resultBytes); + } + + public static boolean useResultLevelCacheOnBrokers( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return useResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + } + + public static boolean populateResultLevelCacheOnBrokers( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return populateResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + } + + private static boolean useResultLevelCache( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return QueryContexts.isUseResultLevelCache(query) + && strategy != null + && cacheConfig.isUseResultLevelCache() + && cacheConfig.isQueryCacheable(query); + } + + private static boolean populateResultLevelCache( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return QueryContexts.isPopulateResultLevelCache(query) + && strategy != null + && cacheConfig.isPopulateResultLevelCache() + && cacheConfig.isQueryCacheable(query); + } +} diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java b/server/src/main/java/io/druid/client/cache/CacheConfig.java index f1cc030c09f..d73f9f387da 100644 --- a/server/src/main/java/io/druid/client/cache/CacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java @@ -29,13 +29,20 @@ import java.util.List; public class CacheConfig { public static final String POPULATE_CACHE = "populateCache"; - + // The defaults defined here for cache related parameters are different from the QueryContext defaults due to legacy reasons. + // They should be made the same at some point in the future. @JsonProperty private boolean useCache = false; @JsonProperty private boolean populateCache = false; + @JsonProperty + private boolean useResultLevelCache = false; + + @JsonProperty + private boolean populateResultLevelCache = false; + @JsonProperty @Min(0) private int numBackgroundThreads = 0; @@ -44,6 +51,9 @@ public class CacheConfig @Min(0) private int cacheBulkMergeLimit = Integer.MAX_VALUE; + @JsonProperty + private int resultLevelCacheLimit = Integer.MAX_VALUE; + @JsonProperty private List unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT); @@ -57,6 +67,16 @@ public class CacheConfig return useCache; } + public boolean isPopulateResultLevelCache() + { + return populateResultLevelCache; + } + + public boolean isUseResultLevelCache() + { + return useResultLevelCache; + } + public int getNumBackgroundThreads() { return numBackgroundThreads; @@ -67,6 +87,11 @@ public class CacheConfig return cacheBulkMergeLimit; } + public int getResultLevelCacheLimit() + { + return resultLevelCacheLimit; + } + public boolean isQueryCacheable(Query query) { return isQueryCacheable(query.getType()); diff --git a/server/src/main/java/io/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/io/druid/query/ResultLevelCachingQueryRunner.java new file mode 100644 index 00000000000..39a5a6de781 --- /dev/null +++ b/server/src/main/java/io/druid/query/ResultLevelCachingQueryRunner.java @@ -0,0 +1,302 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import io.druid.client.ResultLevelCacheUtil; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; +import io.druid.java.util.common.RE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.SequenceWrapper; +import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.common.logger.Logger; +import io.druid.server.QueryResource; + + +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; + +public class ResultLevelCachingQueryRunner implements QueryRunner +{ + private static final Logger log = new Logger(ResultLevelCachingQueryRunner.class); + private final QueryRunner baseRunner; + private ObjectMapper objectMapper; + private final Cache cache; + private final CacheConfig cacheConfig; + private final boolean useResultCache; + private final boolean populateResultCache; + private Query query; + private final CacheStrategy> strategy; + + + public ResultLevelCachingQueryRunner( + QueryRunner baseRunner, + QueryToolChest queryToolChest, + Query query, + ObjectMapper objectMapper, + Cache cache, + CacheConfig cacheConfig + ) + { + this.baseRunner = baseRunner; + this.objectMapper = objectMapper; + this.cache = cache; + this.cacheConfig = cacheConfig; + this.query = query; + this.strategy = queryToolChest.getCacheStrategy(query); + this.populateResultCache = ResultLevelCacheUtil.populateResultLevelCacheOnBrokers(query, strategy, cacheConfig); + this.useResultCache = ResultLevelCacheUtil.useResultLevelCacheOnBrokers(query, strategy, cacheConfig); + } + + @Override + public Sequence run(QueryPlus queryPlus, Map responseContext) + { + if (useResultCache || populateResultCache) { + + final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeCacheKey(query)); + final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr); + String existingResultSetId = extractEtagFromResults(cachedResultSet); + + existingResultSetId = existingResultSetId == null ? "" : existingResultSetId; + query = query.withOverriddenContext( + ImmutableMap.of(QueryResource.HEADER_IF_NONE_MATCH, existingResultSetId)); + + Sequence resultFromClient = baseRunner.run( + QueryPlus.wrap(query), + responseContext + ); + String newResultSetId = (String) responseContext.get(QueryResource.HEADER_ETAG); + + if (useResultCache && newResultSetId != null && newResultSetId.equals(existingResultSetId)) { + log.debug("Return cached result set as there is no change in identifiers for query %s ", query.getId()); + return deserializeResults(cachedResultSet, strategy, existingResultSetId); + } else { + @Nullable + ResultLevelCachePopulator resultLevelCachePopulator = createResultLevelCachePopulator( + cacheKeyStr, + newResultSetId + ); + if (resultLevelCachePopulator == null) { + return resultFromClient; + } + final Function cacheFn = strategy.prepareForCache(true); + + return Sequences.wrap(Sequences.map( + resultFromClient, + new Function() + { + @Override + public T apply(T input) + { + if (resultLevelCachePopulator.isShouldPopulate()) { + resultLevelCachePopulator.cacheResultEntry(resultLevelCachePopulator, input, cacheFn); + } + return input; + } + } + ), new SequenceWrapper() + { + @Override + public void after(boolean isDone, Throwable thrown) + { + Preconditions.checkNotNull( + resultLevelCachePopulator, + "ResultLevelCachePopulator cannot be null during cache population" + ); + if (thrown != null) { + log.error( + thrown, + "Error while preparing for result level caching for query %s with error %s ", + query.getId(), + thrown.getMessage() + ); + } else if (resultLevelCachePopulator.isShouldPopulate()) { + // The resultset identifier and its length is cached along with the resultset + resultLevelCachePopulator.populateResults(); + log.debug("Cache population complete for query %s", query.getId()); + } + resultLevelCachePopulator.cacheObjectStream = null; + } + }); + } + } else { + return baseRunner.run( + queryPlus, + responseContext + ); + } + } + + private byte[] fetchResultsFromResultLevelCache( + final String queryCacheKey + ) + { + if (useResultCache && queryCacheKey != null) { + return cache.get(ResultLevelCacheUtil.computeResultLevelCacheKey(queryCacheKey)); + } + return null; + } + + private String extractEtagFromResults( + final byte[] cachedResult + ) + { + if (cachedResult == null) { + return null; + } + log.debug("Fetching result level cache identifier for query: %s", query.getId()); + int etagLength = ByteBuffer.wrap(cachedResult, 0, Integer.BYTES).getInt(); + return StringUtils.fromUtf8(Arrays.copyOfRange(cachedResult, Integer.BYTES, etagLength + Integer.BYTES)); + } + + private Sequence deserializeResults( + final byte[] cachedResult, CacheStrategy strategy, String resultSetId + ) + { + if (cachedResult == null) { + log.error("Cached result set is null"); + } + final Function pullFromCacheFunction = strategy.pullFromCache(true); + final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz(); + //Skip the resultsetID and its length bytes + Sequence cachedSequence = Sequences.simple(() -> { + try { + int resultOffset = Integer.BYTES + resultSetId.length(); + return objectMapper.readValues( + objectMapper.getFactory().createParser( + cachedResult, + resultOffset, + cachedResult.length - resultOffset + ), + cacheObjectClazz + ); + } + catch (IOException e) { + throw new RE(e, "Failed to retrieve results from cache for query ID [%s]", query.getId()); + } + }); + + return Sequences.map(cachedSequence, pullFromCacheFunction); + } + + private ResultLevelCachePopulator createResultLevelCachePopulator( + String cacheKeyStr, + String resultSetId + ) + { + if (resultSetId != null && populateResultCache) { + ResultLevelCachePopulator resultLevelCachePopulator = new ResultLevelCachePopulator( + cache, + objectMapper, + ResultLevelCacheUtil.computeResultLevelCacheKey(cacheKeyStr), + cacheConfig, + true + ); + try { + // Save the resultSetId and its length + resultLevelCachePopulator.cacheObjectStream.write(ByteBuffer.allocate(Integer.BYTES) + .putInt(resultSetId.length()) + .array()); + resultLevelCachePopulator.cacheObjectStream.write(StringUtils.toUtf8(resultSetId)); + } + catch (IOException ioe) { + log.error(ioe, "Failed to write cached values for query %s", query.getId()); + return null; + } + return resultLevelCachePopulator; + } else { + return null; + } + } + + public class ResultLevelCachePopulator + { + private final Cache cache; + private final ObjectMapper mapper; + private final Cache.NamedKey key; + private final CacheConfig cacheConfig; + private ByteArrayOutputStream cacheObjectStream = new ByteArrayOutputStream(); + + public boolean isShouldPopulate() + { + return shouldPopulate; + } + + private boolean shouldPopulate; + + private ResultLevelCachePopulator( + Cache cache, + ObjectMapper mapper, + Cache.NamedKey key, + CacheConfig cacheConfig, + boolean shouldPopulate + ) + { + this.cache = cache; + this.mapper = mapper; + this.key = key; + this.cacheConfig = cacheConfig; + this.shouldPopulate = shouldPopulate; + } + + private void cacheResultEntry( + ResultLevelCachePopulator resultLevelCachePopulator, + T resultEntry, + Function cacheFn + ) + { + + int cacheLimit = cacheConfig.getResultLevelCacheLimit(); + try (JsonGenerator gen = mapper.getFactory().createGenerator(resultLevelCachePopulator.cacheObjectStream)) { + gen.writeObject(cacheFn.apply(resultEntry)); + if (cacheLimit > 0 && resultLevelCachePopulator.cacheObjectStream.size() > cacheLimit) { + shouldPopulate = false; + resultLevelCachePopulator.cacheObjectStream = null; + return; + } + } + catch (IOException ex) { + log.error(ex, "Failed to retrieve entry to be cached. Result Level caching will not be performed!"); + shouldPopulate = false; + resultLevelCachePopulator.cacheObjectStream = null; + } + } + + public void populateResults() + { + ResultLevelCacheUtil.populate( + cache, + key, + cacheObjectStream.toByteArray() + ); + } + } +} diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 768e284ca1c..e9ee6959ea3 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.client.CachingClusteredClient; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.query.FluentQueryRunnerBuilder; import io.druid.query.PostProcessingOperator; import io.druid.query.Query; @@ -31,6 +33,7 @@ import io.druid.query.QueryRunner; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; +import io.druid.query.ResultLevelCachingQueryRunner; import io.druid.query.RetryQueryRunner; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.SegmentDescriptor; @@ -47,6 +50,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private final RetryQueryRunnerConfig retryConfig; private final ObjectMapper objectMapper; private final ServerConfig serverConfig; + private final Cache cache; + private final CacheConfig cacheConfig; + @Inject public ClientQuerySegmentWalker( @@ -55,7 +61,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker QueryToolChestWarehouse warehouse, RetryQueryRunnerConfig retryConfig, ObjectMapper objectMapper, - ServerConfig serverConfig + ServerConfig serverConfig, + Cache cache, + CacheConfig cacheConfig ) { this.emitter = emitter; @@ -64,6 +72,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker this.retryConfig = retryConfig; this.objectMapper = objectMapper; this.serverConfig = serverConfig; + this.cache = cache; + this.cacheConfig = cacheConfig; } @Override @@ -81,6 +91,22 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private QueryRunner makeRunner(Query query, QueryRunner baseClientRunner) { QueryToolChest> toolChest = warehouse.getToolChest(query); + + // This does not adhere to the fluent workflow. See https://github.com/druid-io/druid/issues/5517 + return new ResultLevelCachingQueryRunner<>(makeRunner(query, baseClientRunner, toolChest), + toolChest, + query, + objectMapper, + cache, + cacheConfig); + } + + private QueryRunner makeRunner( + Query query, + QueryRunner baseClientRunner, + QueryToolChest> toolChest + ) + { PostProcessingOperator postProcessing = objectMapper.convertValue( query.getContextValue("postProcessing"), new TypeReference>() @@ -105,6 +131,4 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker .emitCPUTimeMetric(emitter) .postProcess(postProcessing); } - - } diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index 4ee3013eb7d..65ee95acdfa 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -314,7 +314,7 @@ public class CachingQueryRunnerTest byte[] cacheValue = cache.get(cacheKey); Assert.assertNotNull(cacheValue); - Function fn = cacheStrategy.pullFromCache(); + Function fn = cacheStrategy.pullFromSegmentLevelCache(); List cacheResults = Lists.newArrayList( Iterators.transform( objectMapper.readValues( @@ -349,7 +349,7 @@ public class CachingQueryRunnerTest cache, objectMapper, cacheKey, - Iterables.transform(expectedResults, cacheStrategy.prepareForCache()) + Iterables.transform(expectedResults, cacheStrategy.prepareForSegmentLevelCache()) ); CachingQueryRunner runner = new CachingQueryRunner(