diff --git a/processing/src/main/java/io/druid/query/CacheStrategy.java b/processing/src/main/java/io/druid/query/CacheStrategy.java index 6b00ef0e577..8e14947b406 100644 --- a/processing/src/main/java/io/druid/query/CacheStrategy.java +++ b/processing/src/main/java/io/druid/query/CacheStrategy.java @@ -22,10 +22,23 @@ package io.druid.query; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; +import java.util.concurrent.ExecutorService; + /** */ public interface CacheStrategy> { + /** + * Returns the given query is cacheable or not. + * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node. + * + * @param query the query to be cached + * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be + * called on the cached by-segment results + * @return true if the query is cacheable, otherwise false. + */ + boolean isCacheable(QueryType query, boolean willMergeRunners); + /** * Computes the cache key for the given query * diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 57979634bec..2058b829526 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -351,6 +351,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest aggs = query.getAggregatorSpecs(); private final List dims = query.getDimensions(); + @Override + public boolean isCacheable(GroupByQuery query, boolean willMergeRunners) + { + return strategySelector.strategize(query).isCacheable(willMergeRunners); + } @Override public byte[] computeCacheKey(GroupByQuery query) diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java index 4fb63a92e53..4a84f665ffb 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java @@ -23,13 +23,26 @@ import com.google.common.util.concurrent.ListeningExecutorService; import io.druid.data.input.Row; import io.druid.java.util.common.guava.Sequence; import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; import io.druid.query.groupby.GroupByQuery; import io.druid.segment.StorageAdapter; import java.util.Map; +import java.util.concurrent.ExecutorService; public interface GroupByStrategy { + + /** + * Indicates this strategy is cacheable or not. + * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node. + * + * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be + * called on the cached by-segment results + * @return true if this strategy is cacheable, otherwise false. + */ + boolean isCacheable(boolean willMergeRunners); + Sequence mergeResults( QueryRunner baseRunner, GroupByQuery query, diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 794fe68d7e6..cc85d3a5a09 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -77,6 +77,12 @@ public class GroupByStrategyV1 implements GroupByStrategy this.bufferPool = bufferPool; } + @Override + public boolean isCacheable(boolean willMergeRunners) + { + return true; + } + @Override public Sequence mergeResults( final QueryRunner baseRunner, diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 95df89b550a..08ff15b29fc 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -112,6 +112,12 @@ public class GroupByStrategyV2 implements GroupByStrategy } } + @Override + public boolean isCacheable(boolean willMergeRunners) + { + return willMergeRunners; + } + @Override public Sequence mergeResults( final QueryRunner baseRunner, diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 0cb17be8443..a9fa0be3c41 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -171,6 +171,12 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest() { + @Override + public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners) + { + return true; + } + @Override public byte[] computeCacheKey(SegmentMetadataQuery query) { diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 4a010322630..7943b84d6df 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -153,6 +153,12 @@ public class SearchQueryQueryToolChest extends QueryToolChestemptyList(); + @Override + public boolean isCacheable(SearchQuery query, boolean willMergeRunners) + { + return true; + } + @Override public byte[] computeCacheKey(SearchQuery query) { diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index f6591884696..0e4515a5534 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -159,6 +159,12 @@ public class SelectQueryQueryToolChest extends QueryToolChestemptyList(); + @Override + public boolean isCacheable(SelectQuery query, boolean willMergeRunners) + { + return true; + } + @Override public byte[] computeCacheKey(SelectQuery query) { diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 03da955bb2d..2a98bb205f8 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -133,6 +133,12 @@ public class TimeBoundaryQueryQueryToolChest { return new CacheStrategy, Object, TimeBoundaryQuery>() { + @Override + public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners) + { + return true; + } + @Override public byte[] computeCacheKey(TimeBoundaryQuery query) { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index a399af557e7..edb2e151bb5 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -127,6 +127,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest aggs = query.getAggregatorSpecs(); + @Override + public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners) + { + return true; + } + @Override public byte[] computeCacheKey(TimeseriesQuery query) { diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 6f876de37e2..950bc88dd70 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -302,6 +302,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest boolean useCacheOnBrokers( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + } + + public static boolean populateCacheOnBrokers( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + } + + public static boolean useCacheOnDataNodes( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true); + } + + public static boolean populateCacheOnDataNodes( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true); + } + + private static boolean useCache( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return BaseQuery.getContextUseCache(query, true) + && strategy != null + && cacheConfig.isUseCache() + && cacheConfig.isQueryCacheable(query); + } + + private static boolean populateCache( + Query query, + CacheStrategy> strategy, + CacheConfig cacheConfig + ) + { + return BaseQuery.getContextPopulateCache(query, true) + && strategy != null + && cacheConfig.isPopulateCache() + && cacheConfig.isQueryCacheable(query); + } + } diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 960c4feb475..1083a48e4f0 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -112,6 +112,13 @@ public class CachingClusteredClient implements QueryRunner this.cacheConfig = cacheConfig; this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService); + if (cacheConfig.isQueryCacheable(Query.GROUP_BY)) { + log.warn( + "Even though groupBy caching is enabled, v2 groupBys will not be cached. " + + "Consider disabling cache on your broker and enabling it on your data nodes to enable v2 groupBy caching." + ); + } + serverView.registerSegmentCallback( Execs.singleThreaded("CCClient-ServerView-CB-%d"), new ServerView.BaseSegmentCallback() @@ -137,17 +144,10 @@ public class CachingClusteredClient implements QueryRunner final List> cachedResults = Lists.newArrayList(); final Map cachePopulatorMap = Maps.newHashMap(); - final boolean useCache = BaseQuery.getContextUseCache(query, true) - && strategy != null - && cacheConfig.isUseCache() - && cacheConfig.isQueryCacheable(query); - final boolean populateCache = BaseQuery.getContextPopulateCache(query, true) - && strategy != null - && cacheConfig.isPopulateCache() - && cacheConfig.isQueryCacheable(query); + final boolean useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig); + final boolean populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig); final boolean isBySegment = BaseQuery.getContextBySegment(query, false); - final ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); final int priority = BaseQuery.getContextPriority(query, 0); diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 8ae49c71c7b..418098c9d55 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -36,7 +36,6 @@ import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; -import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -87,16 +86,8 @@ public class CachingQueryRunner implements QueryRunner public Sequence run(Query query, Map responseContext) { final CacheStrategy strategy = toolChest.getCacheStrategy(query); - - final boolean populateCache = BaseQuery.getContextPopulateCache(query, true) - && strategy != null - && cacheConfig.isPopulateCache() - && cacheConfig.isQueryCacheable(query); - - final boolean useCache = BaseQuery.getContextUseCache(query, true) - && strategy != null - && cacheConfig.isUseCache() - && cacheConfig.isQueryCacheable(query); + final boolean populateCache = CacheUtil.populateCacheOnDataNodes(query, strategy, cacheConfig); + final boolean useCache = CacheUtil.useCacheOnDataNodes(query, strategy, cacheConfig); final Cache.NamedKey key; if (strategy != null && (useCache || populateCache)) { diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java b/server/src/main/java/io/druid/client/cache/CacheConfig.java index d7eb9714955..07189e31ffc 100644 --- a/server/src/main/java/io/druid/client/cache/CacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java @@ -69,7 +69,11 @@ public class CacheConfig public boolean isQueryCacheable(Query query) { + return isQueryCacheable(query.getType()); + } + + public boolean isQueryCacheable(String queryType) { // O(n) impl, but I don't think we'll ever have a million query types here - return !unCacheable.contains(query.getType()); + return !unCacheable.contains(queryType); } }