diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 2df800f16b7..9f74731b7a9 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -91,5 +91,6 @@ You can optionally only configure caching to be enabled on the broker by setting |`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false| |`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false| |`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`| +|`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the historicals|`Integer.MAX_VALUE`| See [cache configuration](caching.html) for how to configure cache settings. diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 87d2d388d06..187f35a872f 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -62,6 +62,8 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineLookup; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.partition.PartitionChunk; +import org.joda.time.Interval; + import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -72,7 +74,6 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; -import org.joda.time.Interval; /** */ @@ -206,7 +207,7 @@ public class CachingClusteredClient implements QueryRunner // Pull cached segments from cache and remove from set of segments to query final Map cachedValues; if (useCache) { - cachedValues = cache.getBulk(cacheKeys.values()); + cachedValues = cache.getBulk(Iterables.limit(cacheKeys.values(), cacheConfig.getCacheBulkMergeLimit())); } else { cachedValues = ImmutableMap.of(); } @@ -237,7 +238,11 @@ public class CachingClusteredClient implements QueryRunner final QueryableDruidServer queryableDruidServer = segment.lhs.pick(); if (queryableDruidServer == null) { - log.makeAlert("No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!", segment.rhs, query.getDataSource()).emit(); + log.makeAlert( + "No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!", + segment.rhs, + query.getDataSource() + ).emit(); } else { final DruidServer server = queryableDruidServer.getServer(); List descriptors = serverSegments.get(server); @@ -483,7 +488,7 @@ public class CachingClusteredClient implements QueryRunner return toolChest.mergeSequencesUnordered( Sequences.simple( - sequencesByInterval + sequencesByInterval ) ); } diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java b/server/src/main/java/io/druid/client/cache/CacheConfig.java index 9d42a4d4b3e..2eb058d9848 100644 --- a/server/src/main/java/io/druid/client/cache/CacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java @@ -39,6 +39,10 @@ public class CacheConfig @Min(0) private int numBackgroundThreads = 0; + @JsonProperty + @Min(0) + private int cacheBulkMergeLimit = Integer.MAX_VALUE; + @JsonProperty private List unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT); @@ -56,6 +60,11 @@ public class CacheConfig return numBackgroundThreads; } + public int getCacheBulkMergeLimit() + { + return cacheBulkMergeLimit; + } + public boolean isQueryCacheable(Query query) { // O(n) impl, but I don't think we'll ever have a million query types here diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index a68bcc30cf7..3606e055d25 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -69,6 +69,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; +import io.druid.query.QueryToolChestWarehouse; import io.druid.query.Result; import io.druid.query.SegmentDescriptor; import io.druid.query.TestQueryRunners; @@ -207,6 +208,59 @@ public class CachingClusteredClientTest private static final DateTimeZone TIMEZONE = DateTimeZone.forID("America/Los_Angeles"); private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE); private static final String TOP_DIM = "a_dim"; + private static final Supplier GROUPBY_QUERY_CONFIG_SUPPLIER = Suppliers.ofInstance(new GroupByQueryConfig()); + private static final QueryToolChestWarehouse WAREHOUSE = new MapQueryToolChestWarehouse( + ImmutableMap., QueryToolChest>builder() + .put( + TimeseriesQuery.class, + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ) + ) + .put( + TopNQuery.class, new TopNQueryQueryToolChest( + new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ) + ) + .put( + SearchQuery.class, new SearchQueryQueryToolChest( + new SearchQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ) + ) + .put( + SelectQuery.class, + new SelectQueryQueryToolChest( + jsonMapper, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ) + ) + .put( + GroupByQuery.class, + new GroupByQueryQueryToolChest( + GROUPBY_QUERY_CONFIG_SUPPLIER, + jsonMapper, + new GroupByQueryEngine( + GROUPBY_QUERY_CONFIG_SUPPLIER, + new StupidPool<>( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ), + TestQueryRunners.pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ) + ) + .put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest()) + .build() + ); private final Random random; public CachingClusteredClient client; private Runnable queryCompletedCallback; @@ -221,7 +275,7 @@ public class CachingClusteredClientTest this.random = new Random(randomSeed); } - @Parameterized.Parameters + @Parameterized.Parameters(name = "{0}") public static Iterable constructorFeeder() throws IOException { return Lists.transform( @@ -241,7 +295,7 @@ public class CachingClusteredClientTest public void setUp() throws Exception { timeline = new VersionedIntervalTimeline<>(Ordering.natural()); - serverView = EasyMock.createStrictMock(TimelineServerView.class); + serverView = EasyMock.createNiceMock(TimelineServerView.class); cache = MapCache.create(100000); client = makeClient(MoreExecutors.sameThreadExecutor()); @@ -469,6 +523,63 @@ public class CachingClusteredClientTest ); } + + @Test + @SuppressWarnings("unchecked") + public void testCachingOverBulkLimitEnforcesLimit() throws Exception + { + final int limit = 10; + final Interval interval = new Interval("2011-01-01/2011-01-02"); + final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(interval))) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(CONTEXT) + .build(); + + final Map context = new HashMap<>(); + final Cache cache = EasyMock.createStrictMock(Cache.class); + final Capture> cacheKeyCapture = EasyMock.newCapture(); + EasyMock.expect(cache.getBulk(EasyMock.capture(cacheKeyCapture))) + .andReturn(ImmutableMap.of()) + .once(); + EasyMock.replay(cache); + client = makeClient(MoreExecutors.sameThreadExecutor(), cache, limit); + final DruidServer lastServer = servers[random.nextInt(servers.length)]; + final DataSegment dataSegment = EasyMock.createNiceMock(DataSegment.class); + EasyMock.expect(dataSegment.getIdentifier()).andReturn(DATA_SOURCE).anyTimes(); + EasyMock.replay(dataSegment); + final ServerSelector selector = new ServerSelector( + dataSegment, + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + ); + selector.addServer(new QueryableDruidServer(lastServer, null)); + timeline.add(interval, "v", new SingleElementPartitionChunk<>(selector)); + + client.run(query, context); + + Assert.assertTrue("Capture cache keys", cacheKeyCapture.hasCaptured()); + Assert.assertTrue("Cache key below limit", ImmutableList.copyOf(cacheKeyCapture.getValue()).size() <= limit); + + EasyMock.verify(cache); + + EasyMock.reset(cache); + cacheKeyCapture.reset(); + EasyMock.expect(cache.getBulk(EasyMock.capture(cacheKeyCapture))) + .andReturn(ImmutableMap.of()) + .once(); + EasyMock.replay(cache); + client = makeClient(MoreExecutors.sameThreadExecutor(), cache, 0); + client.run(query, context); + EasyMock.verify(cache); + EasyMock.verify(dataSegment); + Assert.assertTrue("Capture cache keys", cacheKeyCapture.hasCaptured()); + Assert.assertTrue("Cache Keys empty", ImmutableList.copyOf(cacheKeyCapture.getValue()).isEmpty()); + } + @Test public void testTimeseriesMergingOutOfOrderPartitions() throws Exception { @@ -2055,61 +2166,17 @@ public class CachingClusteredClientTest protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService) { - final Supplier groupByQueryConfigSupplier = Suppliers.ofInstance(new GroupByQueryConfig()); + return makeClient(backgroundExecutorService, cache, 10); + } + protected CachingClusteredClient makeClient( + final ListeningExecutorService backgroundExecutorService, + final Cache cache, + final int mergeLimit + ) + { return new CachingClusteredClient( - new MapQueryToolChestWarehouse( - ImmutableMap., QueryToolChest>builder() - .put( - TimeseriesQuery.class, - new TimeseriesQueryQueryToolChest( - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() - ) - ) - .put( - TopNQuery.class, new TopNQueryQueryToolChest( - new TopNQueryConfig(), - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() - ) - ) - .put( - SearchQuery.class, new SearchQueryQueryToolChest( - new SearchQueryConfig(), - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() - ) - ) - .put( - SelectQuery.class, - new SelectQueryQueryToolChest( - jsonMapper, - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() - ) - ) - .put( - GroupByQuery.class, - new GroupByQueryQueryToolChest( - groupByQueryConfigSupplier, - jsonMapper, - new GroupByQueryEngine( - groupByQueryConfigSupplier, - new StupidPool<>( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(1024 * 1024); - } - } - ) - ), - TestQueryRunners.pool, - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() - ) - ) - .put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest()) - .build() - ), + WAREHOUSE, new TimelineServerView() { @Override @@ -2157,6 +2224,12 @@ public class CachingClusteredClientTest { return true; } + + @Override + public int getCacheBulkMergeLimit() + { + return mergeLimit; + } } ); }