Allow setting upper limit on the number of cache segments a broker will try to fetch.

This commit is contained in:
Charles Allen 2015-10-23 11:34:35 -07:00
parent 72c408cf2d
commit dfce14ed17
4 changed files with 147 additions and 59 deletions

View File

@ -90,5 +90,6 @@ You can optionally only configure caching to be enabled on the broker by setting
|`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false|
|`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the historicals|`Integer.MAX_VALUE`|
See [cache configuration](caching.html) for how to configure cache settings.

View File

@ -62,6 +62,8 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@ -72,7 +74,6 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import org.joda.time.Interval;
/**
*/
@ -206,7 +207,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// Pull cached segments from cache and remove from set of segments to query
final Map<Cache.NamedKey, byte[]> cachedValues;
if (useCache) {
cachedValues = cache.getBulk(cacheKeys.values());
cachedValues = cache.getBulk(Iterables.limit(cacheKeys.values(), cacheConfig.getCacheBulkMergeLimit()));
} else {
cachedValues = ImmutableMap.of();
}
@ -237,7 +238,11 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final QueryableDruidServer queryableDruidServer = segment.lhs.pick();
if (queryableDruidServer == null) {
log.makeAlert("No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!", segment.rhs, query.getDataSource()).emit();
log.makeAlert(
"No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!",
segment.rhs,
query.getDataSource()
).emit();
} else {
final DruidServer server = queryableDruidServer.getServer();
List<SegmentDescriptor> descriptors = serverSegments.get(server);
@ -483,7 +488,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
return toolChest.mergeSequencesUnordered(
Sequences.simple(
sequencesByInterval
sequencesByInterval
)
);
}

View File

@ -39,6 +39,10 @@ public class CacheConfig
@Min(0)
private int numBackgroundThreads = 0;
@JsonProperty
@Min(0)
private int cacheBulkMergeLimit = Integer.MAX_VALUE;
@JsonProperty
private List<String> unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT);
@ -56,6 +60,11 @@ public class CacheConfig
return numBackgroundThreads;
}
public int getCacheBulkMergeLimit()
{
return cacheBulkMergeLimit;
}
public boolean isQueryCacheable(Query query)
{
// O(n) impl, but I don't think we'll ever have a million query types here

View File

@ -69,6 +69,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TestQueryRunners;
@ -207,6 +208,59 @@ public class CachingClusteredClientTest
private static final DateTimeZone TIMEZONE = DateTimeZone.forID("America/Los_Angeles");
private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE);
private static final String TOP_DIM = "a_dim";
private static final Supplier<GroupByQueryConfig> GROUPBY_QUERY_CONFIG_SUPPLIER = Suppliers.ofInstance(new GroupByQueryConfig());
private static final QueryToolChestWarehouse WAREHOUSE = new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(
TopNQuery.class, new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(
SearchQuery.class, new SearchQueryQueryToolChest(
new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(
SelectQuery.class,
new SelectQueryQueryToolChest(
jsonMapper,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(
GroupByQuery.class,
new GroupByQueryQueryToolChest(
GROUPBY_QUERY_CONFIG_SUPPLIER,
jsonMapper,
new GroupByQueryEngine(
GROUPBY_QUERY_CONFIG_SUPPLIER,
new StupidPool<>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
),
TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest())
.build()
);
private final Random random;
public CachingClusteredClient client;
private Runnable queryCompletedCallback;
@ -221,7 +275,7 @@ public class CachingClusteredClientTest
this.random = new Random(randomSeed);
}
@Parameterized.Parameters
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return Lists.transform(
@ -241,7 +295,7 @@ public class CachingClusteredClientTest
public void setUp() throws Exception
{
timeline = new VersionedIntervalTimeline<>(Ordering.<String>natural());
serverView = EasyMock.createStrictMock(TimelineServerView.class);
serverView = EasyMock.createNiceMock(TimelineServerView.class);
cache = MapCache.create(100000);
client = makeClient(MoreExecutors.sameThreadExecutor());
@ -469,6 +523,63 @@ public class CachingClusteredClientTest
);
}
@Test
@SuppressWarnings("unchecked")
public void testCachingOverBulkLimitEnforcesLimit() throws Exception
{
final int limit = 10;
final Interval interval = new Interval("2011-01-01/2011-01-02");
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(interval)))
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT)
.build();
final Map<String, Object> context = new HashMap<>();
final Cache cache = EasyMock.createStrictMock(Cache.class);
final Capture<Iterable<Cache.NamedKey>> cacheKeyCapture = EasyMock.newCapture();
EasyMock.expect(cache.getBulk(EasyMock.capture(cacheKeyCapture)))
.andReturn(ImmutableMap.<Cache.NamedKey, byte[]>of())
.once();
EasyMock.replay(cache);
client = makeClient(MoreExecutors.sameThreadExecutor(), cache, limit);
final DruidServer lastServer = servers[random.nextInt(servers.length)];
final DataSegment dataSegment = EasyMock.createNiceMock(DataSegment.class);
EasyMock.expect(dataSegment.getIdentifier()).andReturn(DATA_SOURCE).anyTimes();
EasyMock.replay(dataSegment);
final ServerSelector selector = new ServerSelector(
dataSegment,
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
);
selector.addServer(new QueryableDruidServer(lastServer, null));
timeline.add(interval, "v", new SingleElementPartitionChunk<>(selector));
client.run(query, context);
Assert.assertTrue("Capture cache keys", cacheKeyCapture.hasCaptured());
Assert.assertTrue("Cache key below limit", ImmutableList.copyOf(cacheKeyCapture.getValue()).size() <= limit);
EasyMock.verify(cache);
EasyMock.reset(cache);
cacheKeyCapture.reset();
EasyMock.expect(cache.getBulk(EasyMock.capture(cacheKeyCapture)))
.andReturn(ImmutableMap.<Cache.NamedKey, byte[]>of())
.once();
EasyMock.replay(cache);
client = makeClient(MoreExecutors.sameThreadExecutor(), cache, 0);
client.run(query, context);
EasyMock.verify(cache);
EasyMock.verify(dataSegment);
Assert.assertTrue("Capture cache keys", cacheKeyCapture.hasCaptured());
Assert.assertTrue("Cache Keys empty", ImmutableList.copyOf(cacheKeyCapture.getValue()).isEmpty());
}
@Test
public void testTimeseriesMergingOutOfOrderPartitions() throws Exception
{
@ -2055,61 +2166,17 @@ public class CachingClusteredClientTest
protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService)
{
final Supplier<GroupByQueryConfig> groupByQueryConfigSupplier = Suppliers.ofInstance(new GroupByQueryConfig());
return makeClient(backgroundExecutorService, cache, 10);
}
protected CachingClusteredClient makeClient(
final ListeningExecutorService backgroundExecutorService,
final Cache cache,
final int mergeLimit
)
{
return new CachingClusteredClient(
new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(
TopNQuery.class, new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(
SearchQuery.class, new SearchQueryQueryToolChest(
new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(
SelectQuery.class,
new SelectQueryQueryToolChest(
jsonMapper,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(
GroupByQuery.class,
new GroupByQueryQueryToolChest(
groupByQueryConfigSupplier,
jsonMapper,
new GroupByQueryEngine(
groupByQueryConfigSupplier,
new StupidPool<>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
),
TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest())
.build()
),
WAREHOUSE,
new TimelineServerView()
{
@Override
@ -2157,6 +2224,12 @@ public class CachingClusteredClientTest
{
return true;
}
@Override
public int getCacheBulkMergeLimit()
{
return mergeLimit;
}
}
);
}