From 7142b0c39eca2d6f111db827d5e5b035cc4d9347 Mon Sep 17 00:00:00 2001 From: hqx871 Date: Wed, 12 Jul 2023 17:30:01 +0800 Subject: [PATCH] Enable result level cache for GroupByStrategyV2 on broker (#11595) Cache is disabled for GroupByStrategyV2 on broker since the pr #3820 [groupBy v2: Results not fully merged when caching is enabled on the broker]. But we can enable the result-level cache on broker for GroupByStrategyV2 and keep the segment-level cache disabled. --- docs/configuration/index.md | 2 +- .../org/apache/druid/query/CacheStrategy.java | 23 +++++- .../groupby/GroupByQueryQueryToolChest.java | 4 +- .../groupby/strategy/GroupByStrategy.java | 3 +- .../groupby/strategy/GroupByStrategyV1.java | 2 +- .../groupby/strategy/GroupByStrategyV2.java | 6 +- .../SegmentMetadataQueryQueryToolChest.java | 2 +- .../search/SearchQueryQueryToolChest.java | 2 +- .../TimeBoundaryQueryQueryToolChest.java | 2 +- .../TimeseriesQueryQueryToolChest.java | 2 +- .../query/topn/TopNQueryQueryToolChest.java | 2 +- .../GroupByQueryQueryToolChestTest.java | 76 +++++++++++++++++++ .../org/apache/druid/client/CacheUtil.java | 14 ++-- .../apache/druid/client/CacheUtilTest.java | 26 ++++--- 14 files changed, 138 insertions(+), 28 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index f0cf9c60112..def717a7d94 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1992,7 +1992,7 @@ You can optionally only configure caching to be enabled on the Broker by setting See [cache configuration](#cache-configuration) for how to configure cache settings. -> Note: Even if cache is enabled, for [groupBy v2](../querying/groupbyquery.md#strategies) queries, both of non-result level cache and result level cache do not work on Brokers. +> Note: Even if cache is enabled, for [groupBy v2](../querying/groupbyquery.md#strategies) queries, segment level cache do not work on Brokers. > See [Differences between v1 and v2](../querying/groupbyquery.md#differences-between-v1-and-v2) and [Query caching](../querying/caching.md) for more information. #### Segment Discovery diff --git a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java index bd21034186c..d883e7555e9 100644 --- a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java @@ -36,6 +36,23 @@ import java.util.List; @ExtensionPoint public interface CacheStrategy> { + /** + * This method is deprecated and retained for backward incompatibility. + * Returns whether the given query is cacheable or not. + * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node. + * + * @param ignoredQuery the query to be cached + * @param ignoredWillMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be + * called on the cached by-segment results + * + * @return true if the query is cacheable, otherwise false. + */ + @Deprecated + default boolean isCacheable(QueryType ignoredQuery, boolean ignoredWillMergeRunners) + { + return false; + } + /** * Returns whether the given query is cacheable or not. * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node. @@ -43,10 +60,14 @@ public interface CacheStrategy> * @param query the query to be cached * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be * called on the cached by-segment results + * @param bySegment segment level or result level cache * * @return true if the query is cacheable, otherwise false. */ - boolean isCacheable(QueryType query, boolean willMergeRunners); + default boolean isCacheable(QueryType query, boolean willMergeRunners, boolean bySegment) + { + return isCacheable(query, willMergeRunners); + } /** * Computes the per-segment cache key for the given query. Because this is a per-segment cache key, it should only diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index d13998c8340..ac1eec04025 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -526,9 +526,9 @@ public class GroupByQueryQueryToolChest extends QueryToolChest dims = query.getDimensions(); @Override - public boolean isCacheable(GroupByQuery query, boolean willMergeRunners) + public boolean isCacheable(GroupByQuery query, boolean willMergeRunners, boolean bySegment) { - return strategySelector.strategize(query).isCacheable(willMergeRunners); + return strategySelector.strategize(query).isCacheable(willMergeRunners, bySegment); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java index 87e6a3dcac1..230e824cab1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java @@ -60,10 +60,11 @@ public interface GroupByStrategy * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be * called on the cached by-segment results. Can be used to distinguish if we are running on * a broker or data node. + * @param bySegment segment level or result level cache * * @return true if this strategy is cacheable, otherwise false. */ - boolean isCacheable(boolean willMergeRunners); + boolean isCacheable(boolean willMergeRunners, boolean bySegment); /** * Indicates if this query should undergo "mergeResults" or not. Checked by diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java index 8c119d59dd4..c82d29d1692 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -83,7 +83,7 @@ public class GroupByStrategyV1 implements GroupByStrategy } @Override - public boolean isCacheable(boolean willMergeRunners) + public boolean isCacheable(boolean willMergeRunners, boolean bySegment) { return true; } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index cda33f459ed..df599952845 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -180,9 +180,11 @@ public class GroupByStrategyV2 implements GroupByStrategy } @Override - public boolean isCacheable(boolean willMergeRunners) + public boolean isCacheable(boolean willMergeRunners, boolean bySegment) { - return willMergeRunners; + //disable segment-level cache on borker, + //see PR https://github.com/apache/druid/issues/3820 + return willMergeRunners || !bySegment; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 45cc18ff5a3..9041db0c6a2 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -185,7 +185,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest() { @Override - public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners) + public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners, boolean bySegment) { return true; } diff --git a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java index da59cf9c7f5..b390cd83a58 100644 --- a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java @@ -136,7 +136,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest, Object, TimeBoundaryQuery>() { @Override - public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners) + public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners, boolean bySegment) { return true; } diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 54a17d267d1..c16fe29c14d 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -281,7 +281,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest aggs = query.getAggregatorSpecs(); @Override - public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners) + public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners, boolean bySegment) { return true; } diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 0109c9828e9..18847fd60b5 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -294,7 +294,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest queryConfigSupplier = Suppliers.ofInstance(queryConfig); + final Supplier bufferSupplier = + () -> ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes()); + + final NonBlockingPool bufferPool = new StupidPool<>( + "GroupByQueryEngine-bufferPool", + bufferSupplier + ); + final BlockingPool mergeBufferPool = new DefaultBlockingPool<>( + bufferSupplier, + processingConfig.getNumMergeBuffers() + ); + final GroupByStrategySelector strategySelector = new GroupByStrategySelector( + queryConfigSupplier, + new GroupByStrategyV1( + queryConfigSupplier, + new GroupByQueryEngine(queryConfigSupplier, bufferPool), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + new GroupByStrategyV2( + processingConfig, + queryConfigSupplier, + bufferPool, + mergeBufferPool, + TestHelper.makeJsonMapper(), + new ObjectMapper(new SmileFactory()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) + ); + final GroupByQueryQueryToolChest queryToolChest = new GroupByQueryQueryToolChest(strategySelector); + CacheStrategy cacheStrategy = queryToolChest.getCacheStrategy(query); + Assert.assertTrue( + "result level cache on broker server for GroupByStrategyV2 should be enabled", + cacheStrategy.isCacheable(query, false, false) + ); + Assert.assertFalse( + "segment level cache on broker server for GroupByStrategyV2 should be disabled", + cacheStrategy.isCacheable(query, false, true) + ); + Assert.assertTrue( + "segment level cache on data server for GroupByStrategyV2 should be enabled", + cacheStrategy.isCacheable(query, true, true) + ); + } } diff --git a/server/src/main/java/org/apache/druid/client/CacheUtil.java b/server/src/main/java/org/apache/druid/client/CacheUtil.java index 42cfe171c7b..a8d65e4effa 100644 --- a/server/src/main/java/org/apache/druid/client/CacheUtil.java +++ b/server/src/main/java/org/apache/druid/client/CacheUtil.java @@ -110,7 +110,7 @@ public class CacheUtil { return cacheConfig.isUseCache() && query.context().isUseCache() - && isQueryCacheable(query, cacheStrategy, cacheConfig, serverType); + && isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, true); } /** @@ -128,7 +128,7 @@ public class CacheUtil ServerType serverType ) { - return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, true) && query.context().isPopulateCache() && cacheConfig.isPopulateCache(); } @@ -148,7 +148,7 @@ public class CacheUtil ServerType serverType ) { - return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, false) && query.context().isUseResultLevelCache() && cacheConfig.isUseResultLevelCache(); } @@ -168,7 +168,7 @@ public class CacheUtil ServerType serverType ) { - return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, false) && query.context().isPopulateResultLevelCache() && cacheConfig.isPopulateResultLevelCache(); } @@ -181,16 +181,18 @@ public class CacheUtil * @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query * @param cacheConfig current active cache config * @param serverType BROKER or DATA + * @param bySegment segement level or result-level cache */ static boolean isQueryCacheable( final Query query, @Nullable final CacheStrategy> cacheStrategy, final CacheConfig cacheConfig, - final ServerType serverType + final ServerType serverType, + final boolean bySegment ) { return cacheStrategy != null - && cacheStrategy.isCacheable(query, serverType.willMergeRunners()) + && cacheStrategy.isCacheable(query, serverType.willMergeRunners(), bySegment) && cacheConfig.isQueryCacheable(query) && query.getDataSource().isCacheable(serverType == ServerType.BROKER); } diff --git a/server/src/test/java/org/apache/druid/client/CacheUtilTest.java b/server/src/test/java/org/apache/druid/client/CacheUtilTest.java index dc155398ab0..9d886765ddc 100644 --- a/server/src/test/java/org/apache/druid/client/CacheUtilTest.java +++ b/server/src/test/java/org/apache/druid/client/CacheUtilTest.java @@ -54,7 +54,8 @@ public class CacheUtilTest timeseriesQuery, new DummyCacheStrategy<>(true, true), makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.BROKER + CacheUtil.ServerType.BROKER, + false ) ); } @@ -67,7 +68,8 @@ public class CacheUtilTest timeseriesQuery, new DummyCacheStrategy<>(true, true), makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.DATA + CacheUtil.ServerType.DATA, + true ) ); } @@ -80,7 +82,8 @@ public class CacheUtilTest timeseriesQuery, new DummyCacheStrategy<>(false, true), makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.BROKER + CacheUtil.ServerType.BROKER, + false ) ); } @@ -93,7 +96,8 @@ public class CacheUtilTest timeseriesQuery, new DummyCacheStrategy<>(true, false), makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.DATA + CacheUtil.ServerType.DATA, + true ) ); } @@ -106,7 +110,8 @@ public class CacheUtilTest timeseriesQuery, new DummyCacheStrategy<>(true, false), makeCacheConfig(ImmutableMap.of("unCacheable", ImmutableList.of("timeseries"))), - CacheUtil.ServerType.BROKER + CacheUtil.ServerType.BROKER, + false ) ); } @@ -119,7 +124,8 @@ public class CacheUtilTest timeseriesQuery.withDataSource(new GlobalTableDataSource("global")), new DummyCacheStrategy<>(true, true), makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.BROKER + CacheUtil.ServerType.BROKER, + false ) ); } @@ -132,7 +138,8 @@ public class CacheUtilTest timeseriesQuery.withDataSource(new LookupDataSource("lookyloo")), new DummyCacheStrategy<>(true, true), makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.DATA + CacheUtil.ServerType.DATA, + true ) ); } @@ -145,7 +152,8 @@ public class CacheUtilTest timeseriesQuery, null, makeCacheConfig(ImmutableMap.of()), - CacheUtil.ServerType.BROKER + CacheUtil.ServerType.BROKER, + false ) ); } @@ -168,7 +176,7 @@ public class CacheUtilTest } @Override - public boolean isCacheable(QueryType query, boolean willMergeRunners) + public boolean isCacheable(QueryType query, boolean willMergeRunners, boolean bySegment) { return willMergeRunners ? cacheableOnDataServers : cacheableOnBrokers; }