Enable result level cache for GroupByStrategyV2 on broker (#11595)

Cache is disabled for GroupByStrategyV2 on broker since the pr #3820 [groupBy v2: Results not fully merged when caching is enabled on the broker]. But we can enable the result-level cache on broker for GroupByStrategyV2 and keep the segment-level cache disabled.
This commit is contained in:
hqx871 2023-07-12 17:30:01 +08:00 committed by GitHub
parent d76903f10b
commit 7142b0c39e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 138 additions and 28 deletions

View File

@ -1992,7 +1992,7 @@ You can optionally only configure caching to be enabled on the Broker by setting
See [cache configuration](#cache-configuration) for how to configure cache settings.
> Note: Even if cache is enabled, for [groupBy v2](../querying/groupbyquery.md#strategies) queries, both of non-result level cache and result level cache do not work on Brokers.
> Note: Even if cache is enabled, for [groupBy v2](../querying/groupbyquery.md#strategies) queries, segment level cache do not work on Brokers.
> See [Differences between v1 and v2](../querying/groupbyquery.md#differences-between-v1-and-v2) and [Query caching](../querying/caching.md) for more information.
#### Segment Discovery

View File

@ -36,6 +36,23 @@ import java.util.List;
@ExtensionPoint
public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
{
/**
* This method is deprecated and retained for backward incompatibility.
* Returns whether the given query is cacheable or not.
* The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
*
* @param ignoredQuery the query to be cached
* @param ignoredWillMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be
* called on the cached by-segment results
*
* @return true if the query is cacheable, otherwise false.
*/
@Deprecated
default boolean isCacheable(QueryType ignoredQuery, boolean ignoredWillMergeRunners)
{
return false;
}
/**
* Returns whether the given query is cacheable or not.
* The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
@ -43,10 +60,14 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
* @param query the query to be cached
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be
* called on the cached by-segment results
* @param bySegment segment level or result level cache
*
* @return true if the query is cacheable, otherwise false.
*/
boolean isCacheable(QueryType query, boolean willMergeRunners);
default boolean isCacheable(QueryType query, boolean willMergeRunners, boolean bySegment)
{
return isCacheable(query, willMergeRunners);
}
/**
* Computes the per-segment cache key for the given query. Because this is a per-segment cache key, it should only

View File

@ -526,9 +526,9 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupB
private final List<DimensionSpec> dims = query.getDimensions();
@Override
public boolean isCacheable(GroupByQuery query, boolean willMergeRunners)
public boolean isCacheable(GroupByQuery query, boolean willMergeRunners, boolean bySegment)
{
return strategySelector.strategize(query).isCacheable(willMergeRunners);
return strategySelector.strategize(query).isCacheable(willMergeRunners, bySegment);
}
@Override

View File

@ -60,10 +60,11 @@ public interface GroupByStrategy
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be
* called on the cached by-segment results. Can be used to distinguish if we are running on
* a broker or data node.
* @param bySegment segment level or result level cache
*
* @return true if this strategy is cacheable, otherwise false.
*/
boolean isCacheable(boolean willMergeRunners);
boolean isCacheable(boolean willMergeRunners, boolean bySegment);
/**
* Indicates if this query should undergo "mergeResults" or not. Checked by

View File

@ -83,7 +83,7 @@ public class GroupByStrategyV1 implements GroupByStrategy
}
@Override
public boolean isCacheable(boolean willMergeRunners)
public boolean isCacheable(boolean willMergeRunners, boolean bySegment)
{
return true;
}

View File

@ -180,9 +180,11 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
@Override
public boolean isCacheable(boolean willMergeRunners)
public boolean isCacheable(boolean willMergeRunners, boolean bySegment)
{
return willMergeRunners;
//disable segment-level cache on borker,
//see PR https://github.com/apache/druid/issues/3820
return willMergeRunners || !bySegment;
}
@Override

View File

@ -185,7 +185,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
return new CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery>()
{
@Override
public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners)
public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners, boolean bySegment)
{
return true;
}

View File

@ -136,7 +136,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
: Collections.emptyList();
@Override
public boolean isCacheable(SearchQuery query, boolean willMergeRunners)
public boolean isCacheable(SearchQuery query, boolean willMergeRunners, boolean bySegment)
{
return true;
}

View File

@ -167,7 +167,7 @@ public class TimeBoundaryQueryQueryToolChest
return new CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>()
{
@Override
public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners)
public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners, boolean bySegment)
{
return true;
}

View File

@ -281,7 +281,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
@Override
public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners)
public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners, boolean bySegment)
{
return true;
}

View File

@ -294,7 +294,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
);
@Override
public boolean isCacheable(TopNQuery query, boolean willMergeRunners)
public boolean isCacheable(TopNQuery query, boolean willMergeRunners, boolean bySegment)
{
return true;
}

View File

@ -20,10 +20,17 @@
package org.apache.druid.query.groupby;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.DateTimes;
@ -31,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunnerTestHelper;
@ -62,6 +70,9 @@ import org.apache.druid.query.groupby.having.NotHavingSpec;
import org.apache.druid.query.groupby.having.OrHavingSpec;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
@ -74,6 +85,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -1136,4 +1148,68 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
{
return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals);
}
@Test
public void testIsQueryCacheableOnGroupByStrategyV2()
{
final GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setGranularity(Granularities.DAY)
.setDimensions(new DefaultDimensionSpec("col", "dim"))
.setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
.setAggregatorSpecs(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS)
.build();
final DruidProcessingConfig processingConfig = new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return "processing-%s";
}
};
final GroupByQueryConfig queryConfig = new GroupByQueryConfig();
final Supplier<GroupByQueryConfig> queryConfigSupplier = Suppliers.ofInstance(queryConfig);
final Supplier<ByteBuffer> bufferSupplier =
() -> ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes());
final NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByQueryEngine-bufferPool",
bufferSupplier
);
final BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(
bufferSupplier,
processingConfig.getNumMergeBuffers()
);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
queryConfigSupplier,
new GroupByStrategyV1(
queryConfigSupplier,
new GroupByQueryEngine(queryConfigSupplier, bufferPool),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
new GroupByStrategyV2(
processingConfig,
queryConfigSupplier,
bufferPool,
mergeBufferPool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
final GroupByQueryQueryToolChest queryToolChest = new GroupByQueryQueryToolChest(strategySelector);
CacheStrategy<ResultRow, Object, GroupByQuery> cacheStrategy = queryToolChest.getCacheStrategy(query);
Assert.assertTrue(
"result level cache on broker server for GroupByStrategyV2 should be enabled",
cacheStrategy.isCacheable(query, false, false)
);
Assert.assertFalse(
"segment level cache on broker server for GroupByStrategyV2 should be disabled",
cacheStrategy.isCacheable(query, false, true)
);
Assert.assertTrue(
"segment level cache on data server for GroupByStrategyV2 should be enabled",
cacheStrategy.isCacheable(query, true, true)
);
}
}

View File

@ -110,7 +110,7 @@ public class CacheUtil
{
return cacheConfig.isUseCache()
&& query.context().isUseCache()
&& isQueryCacheable(query, cacheStrategy, cacheConfig, serverType);
&& isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, true);
}
/**
@ -128,7 +128,7 @@ public class CacheUtil
ServerType serverType
)
{
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType)
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, true)
&& query.context().isPopulateCache()
&& cacheConfig.isPopulateCache();
}
@ -148,7 +148,7 @@ public class CacheUtil
ServerType serverType
)
{
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType)
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, false)
&& query.context().isUseResultLevelCache()
&& cacheConfig.isUseResultLevelCache();
}
@ -168,7 +168,7 @@ public class CacheUtil
ServerType serverType
)
{
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType)
return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType, false)
&& query.context().isPopulateResultLevelCache()
&& cacheConfig.isPopulateResultLevelCache();
}
@ -181,16 +181,18 @@ public class CacheUtil
* @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query
* @param cacheConfig current active cache config
* @param serverType BROKER or DATA
* @param bySegment segement level or result-level cache
*/
static <T> boolean isQueryCacheable(
final Query<T> query,
@Nullable final CacheStrategy<T, Object, Query<T>> cacheStrategy,
final CacheConfig cacheConfig,
final ServerType serverType
final ServerType serverType,
final boolean bySegment
)
{
return cacheStrategy != null
&& cacheStrategy.isCacheable(query, serverType.willMergeRunners())
&& cacheStrategy.isCacheable(query, serverType.willMergeRunners(), bySegment)
&& cacheConfig.isQueryCacheable(query)
&& query.getDataSource().isCacheable(serverType == ServerType.BROKER);
}

View File

@ -54,7 +54,8 @@ public class CacheUtilTest
timeseriesQuery,
new DummyCacheStrategy<>(true, true),
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.BROKER
CacheUtil.ServerType.BROKER,
false
)
);
}
@ -67,7 +68,8 @@ public class CacheUtilTest
timeseriesQuery,
new DummyCacheStrategy<>(true, true),
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.DATA
CacheUtil.ServerType.DATA,
true
)
);
}
@ -80,7 +82,8 @@ public class CacheUtilTest
timeseriesQuery,
new DummyCacheStrategy<>(false, true),
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.BROKER
CacheUtil.ServerType.BROKER,
false
)
);
}
@ -93,7 +96,8 @@ public class CacheUtilTest
timeseriesQuery,
new DummyCacheStrategy<>(true, false),
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.DATA
CacheUtil.ServerType.DATA,
true
)
);
}
@ -106,7 +110,8 @@ public class CacheUtilTest
timeseriesQuery,
new DummyCacheStrategy<>(true, false),
makeCacheConfig(ImmutableMap.of("unCacheable", ImmutableList.of("timeseries"))),
CacheUtil.ServerType.BROKER
CacheUtil.ServerType.BROKER,
false
)
);
}
@ -119,7 +124,8 @@ public class CacheUtilTest
timeseriesQuery.withDataSource(new GlobalTableDataSource("global")),
new DummyCacheStrategy<>(true, true),
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.BROKER
CacheUtil.ServerType.BROKER,
false
)
);
}
@ -132,7 +138,8 @@ public class CacheUtilTest
timeseriesQuery.withDataSource(new LookupDataSource("lookyloo")),
new DummyCacheStrategy<>(true, true),
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.DATA
CacheUtil.ServerType.DATA,
true
)
);
}
@ -145,7 +152,8 @@ public class CacheUtilTest
timeseriesQuery,
null,
makeCacheConfig(ImmutableMap.of()),
CacheUtil.ServerType.BROKER
CacheUtil.ServerType.BROKER,
false
)
);
}
@ -168,7 +176,7 @@ public class CacheUtilTest
}
@Override
public boolean isCacheable(QueryType query, boolean willMergeRunners)
public boolean isCacheable(QueryType query, boolean willMergeRunners, boolean bySegment)
{
return willMergeRunners ? cacheableOnDataServers : cacheableOnBrokers;
}