Disable caching on brokers for groupBy v2 (#3950)

* Disable caching on brokers for groupBy v2

* Rename parameter

* address comments
This commit is contained in:
Jihoon Son 2017-02-22 02:49:49 +09:00 committed by Fangjin Yang
parent bc33b68b51
commit 128274c6f0
15 changed files with 159 additions and 21 deletions

View File

@ -22,10 +22,23 @@ package io.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import java.util.concurrent.ExecutorService;
/**
*/
public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
{
/**
* Returns the given query is cacheable or not.
* The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
*
* @param query the query to be cached
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
* called on the cached by-segment results
* @return true if the query is cacheable, otherwise false.
*/
boolean isCacheable(QueryType query, boolean willMergeRunners);
/**
* Computes the cache key for the given query
*

View File

@ -351,6 +351,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<DimensionSpec> dims = query.getDimensions();
@Override
public boolean isCacheable(GroupByQuery query, boolean willMergeRunners)
{
return strategySelector.strategize(query).isCacheable(willMergeRunners);
}
@Override
public byte[] computeCacheKey(GroupByQuery query)

View File

@ -23,13 +23,26 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.data.input.Row;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.groupby.GroupByQuery;
import io.druid.segment.StorageAdapter;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public interface GroupByStrategy
{
/**
* Indicates this strategy is cacheable or not.
* The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
*
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
* called on the cached by-segment results
* @return true if this strategy is cacheable, otherwise false.
*/
boolean isCacheable(boolean willMergeRunners);
Sequence<Row> mergeResults(
QueryRunner<Row> baseRunner,
GroupByQuery query,

View File

@ -77,6 +77,12 @@ public class GroupByStrategyV1 implements GroupByStrategy
this.bufferPool = bufferPool;
}
@Override
public boolean isCacheable(boolean willMergeRunners)
{
return true;
}
@Override
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,

View File

@ -112,6 +112,12 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
}
@Override
public boolean isCacheable(boolean willMergeRunners)
{
return willMergeRunners;
}
@Override
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,

View File

@ -171,6 +171,12 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
{
return new CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery>()
{
@Override
public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners)
{
return true;
}
@Override
public byte[] computeCacheKey(SegmentMetadataQuery query)
{

View File

@ -153,6 +153,12 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
:
Collections.<String>emptyList();
@Override
public boolean isCacheable(SearchQuery query, boolean willMergeRunners)
{
return true;
}
@Override
public byte[] computeCacheKey(SearchQuery query)
{

View File

@ -159,6 +159,12 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
:
Collections.<String>emptyList();
@Override
public boolean isCacheable(SelectQuery query, boolean willMergeRunners)
{
return true;
}
@Override
public byte[] computeCacheKey(SelectQuery query)
{

View File

@ -133,6 +133,12 @@ public class TimeBoundaryQueryQueryToolChest
{
return new CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>()
{
@Override
public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners)
{
return true;
}
@Override
public byte[] computeCacheKey(TimeBoundaryQuery query)
{

View File

@ -127,6 +127,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
{
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
@Override
public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners)
{
return true;
}
@Override
public byte[] computeCacheKey(TimeseriesQuery query)
{

View File

@ -302,6 +302,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
.getMetricName(query.getDimensionSpec())
);
@Override
public boolean isCacheable(TopNQuery query, boolean willMergeRunners)
{
return true;
}
@Override
public byte[] computeCacheKey(TopNQuery query)
{

View File

@ -23,6 +23,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
@ -77,4 +81,64 @@ public class CacheUtil
}
}
public static <T> boolean useCacheOnBrokers(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false);
}
public static <T> boolean populateCacheOnBrokers(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false);
}
public static <T> boolean useCacheOnDataNodes(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true);
}
public static <T> boolean populateCacheOnDataNodes(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true);
}
private static <T> boolean useCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return BaseQuery.getContextUseCache(query, true)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
}
private static <T> boolean populateCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return BaseQuery.getContextPopulateCache(query, true)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);
}
}

View File

@ -112,6 +112,13 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
this.cacheConfig = cacheConfig;
this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);
if (cacheConfig.isQueryCacheable(Query.GROUP_BY)) {
log.warn(
"Even though groupBy caching is enabled, v2 groupBys will not be cached. "
+ "Consider disabling cache on your broker and enabling it on your data nodes to enable v2 groupBy caching."
);
}
serverView.registerSegmentCallback(
Execs.singleThreaded("CCClient-ServerView-CB-%d"),
new ServerView.BaseSegmentCallback()
@ -137,17 +144,10 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final List<Pair<Interval, byte[]>> cachedResults = Lists.newArrayList();
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
final boolean useCache = BaseQuery.getContextUseCache(query, true)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
final boolean populateCache = BaseQuery.getContextPopulateCache(query, true)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);
final boolean useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig);
final boolean populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig);
final boolean isBySegment = BaseQuery.getContextBySegment(query, false);
final ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
final int priority = BaseQuery.getContextPriority(query, 0);

View File

@ -36,7 +36,6 @@ import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -87,16 +86,8 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
final boolean populateCache = BaseQuery.getContextPopulateCache(query, true)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);
final boolean useCache = BaseQuery.getContextUseCache(query, true)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
final boolean populateCache = CacheUtil.populateCacheOnDataNodes(query, strategy, cacheConfig);
final boolean useCache = CacheUtil.useCacheOnDataNodes(query, strategy, cacheConfig);
final Cache.NamedKey key;
if (strategy != null && (useCache || populateCache)) {

View File

@ -69,7 +69,11 @@ public class CacheConfig
public boolean isQueryCacheable(Query query)
{
return isQueryCacheable(query.getType());
}
public boolean isQueryCacheable(String queryType) {
// O(n) impl, but I don't think we'll ever have a million query types here
return !unCacheable.contains(query.getType());
return !unCacheable.contains(queryType);
}
}