mirror of https://github.com/apache/druid.git
Merge pull request #1850 from metamx/friendlyBardCache
Allow setting upper limit on the number of cache segments a broker will try to fetch.
This commit is contained in:
commit
e9533db987
|
@ -91,5 +91,6 @@ You can optionally only configure caching to be enabled on the broker by setting
|
|||
|`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false|
|
||||
|`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false|
|
||||
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|
||||
|`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the historicals|`Integer.MAX_VALUE`|
|
||||
|
||||
See [cache configuration](caching.html) for how to configure cache settings.
|
||||
|
|
|
@ -62,6 +62,8 @@ import io.druid.timeline.DataSegment;
|
|||
import io.druid.timeline.TimelineLookup;
|
||||
import io.druid.timeline.TimelineObjectHolder;
|
||||
import io.druid.timeline.partition.PartitionChunk;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
|
@ -72,7 +74,6 @@ import java.util.Set;
|
|||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -206,7 +207,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
// Pull cached segments from cache and remove from set of segments to query
|
||||
final Map<Cache.NamedKey, byte[]> cachedValues;
|
||||
if (useCache) {
|
||||
cachedValues = cache.getBulk(cacheKeys.values());
|
||||
cachedValues = cache.getBulk(Iterables.limit(cacheKeys.values(), cacheConfig.getCacheBulkMergeLimit()));
|
||||
} else {
|
||||
cachedValues = ImmutableMap.of();
|
||||
}
|
||||
|
@ -237,7 +238,11 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
final QueryableDruidServer queryableDruidServer = segment.lhs.pick();
|
||||
|
||||
if (queryableDruidServer == null) {
|
||||
log.makeAlert("No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!", segment.rhs, query.getDataSource()).emit();
|
||||
log.makeAlert(
|
||||
"No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!",
|
||||
segment.rhs,
|
||||
query.getDataSource()
|
||||
).emit();
|
||||
} else {
|
||||
final DruidServer server = queryableDruidServer.getServer();
|
||||
List<SegmentDescriptor> descriptors = serverSegments.get(server);
|
||||
|
|
|
@ -39,6 +39,10 @@ public class CacheConfig
|
|||
@Min(0)
|
||||
private int numBackgroundThreads = 0;
|
||||
|
||||
@JsonProperty
|
||||
@Min(0)
|
||||
private int cacheBulkMergeLimit = Integer.MAX_VALUE;
|
||||
|
||||
@JsonProperty
|
||||
private List<String> unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT);
|
||||
|
||||
|
@ -56,6 +60,11 @@ public class CacheConfig
|
|||
return numBackgroundThreads;
|
||||
}
|
||||
|
||||
public int getCacheBulkMergeLimit()
|
||||
{
|
||||
return cacheBulkMergeLimit;
|
||||
}
|
||||
|
||||
public boolean isQueryCacheable(Query query)
|
||||
{
|
||||
// O(n) impl, but I don't think we'll ever have a million query types here
|
||||
|
|
|
@ -69,6 +69,7 @@ import io.druid.query.Query;
|
|||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.QueryToolChestWarehouse;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import io.druid.query.TestQueryRunners;
|
||||
|
@ -207,6 +208,59 @@ public class CachingClusteredClientTest
|
|||
private static final DateTimeZone TIMEZONE = DateTimeZone.forID("America/Los_Angeles");
|
||||
private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE);
|
||||
private static final String TOP_DIM = "a_dim";
|
||||
private static final Supplier<GroupByQueryConfig> GROUPBY_QUERY_CONFIG_SUPPLIER = Suppliers.ofInstance(new GroupByQueryConfig());
|
||||
private static final QueryToolChestWarehouse WAREHOUSE = new MapQueryToolChestWarehouse(
|
||||
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
|
||||
.put(
|
||||
TimeseriesQuery.class,
|
||||
new TimeseriesQueryQueryToolChest(
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
)
|
||||
.put(
|
||||
TopNQuery.class, new TopNQueryQueryToolChest(
|
||||
new TopNQueryConfig(),
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
)
|
||||
.put(
|
||||
SearchQuery.class, new SearchQueryQueryToolChest(
|
||||
new SearchQueryConfig(),
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
)
|
||||
.put(
|
||||
SelectQuery.class,
|
||||
new SelectQueryQueryToolChest(
|
||||
jsonMapper,
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
)
|
||||
.put(
|
||||
GroupByQuery.class,
|
||||
new GroupByQueryQueryToolChest(
|
||||
GROUPBY_QUERY_CONFIG_SUPPLIER,
|
||||
jsonMapper,
|
||||
new GroupByQueryEngine(
|
||||
GROUPBY_QUERY_CONFIG_SUPPLIER,
|
||||
new StupidPool<>(
|
||||
new Supplier<ByteBuffer>()
|
||||
{
|
||||
@Override
|
||||
public ByteBuffer get()
|
||||
{
|
||||
return ByteBuffer.allocate(1024 * 1024);
|
||||
}
|
||||
}
|
||||
)
|
||||
),
|
||||
TestQueryRunners.pool,
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
)
|
||||
.put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest())
|
||||
.build()
|
||||
);
|
||||
private final Random random;
|
||||
public CachingClusteredClient client;
|
||||
private Runnable queryCompletedCallback;
|
||||
|
@ -221,7 +275,7 @@ public class CachingClusteredClientTest
|
|||
this.random = new Random(randomSeed);
|
||||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
@Parameterized.Parameters(name = "{0}")
|
||||
public static Iterable<Object[]> constructorFeeder() throws IOException
|
||||
{
|
||||
return Lists.transform(
|
||||
|
@ -241,7 +295,7 @@ public class CachingClusteredClientTest
|
|||
public void setUp() throws Exception
|
||||
{
|
||||
timeline = new VersionedIntervalTimeline<>(Ordering.<String>natural());
|
||||
serverView = EasyMock.createStrictMock(TimelineServerView.class);
|
||||
serverView = EasyMock.createNiceMock(TimelineServerView.class);
|
||||
cache = MapCache.create(100000);
|
||||
client = makeClient(MoreExecutors.sameThreadExecutor());
|
||||
|
||||
|
@ -469,6 +523,63 @@ public class CachingClusteredClientTest
|
|||
);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCachingOverBulkLimitEnforcesLimit() throws Exception
|
||||
{
|
||||
final int limit = 10;
|
||||
final Interval interval = new Interval("2011-01-01/2011-01-02");
|
||||
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(interval)))
|
||||
.filters(DIM_FILTER)
|
||||
.granularity(GRANULARITY)
|
||||
.aggregators(AGGS)
|
||||
.postAggregators(POST_AGGS)
|
||||
.context(CONTEXT)
|
||||
.build();
|
||||
|
||||
final Map<String, Object> context = new HashMap<>();
|
||||
final Cache cache = EasyMock.createStrictMock(Cache.class);
|
||||
final Capture<Iterable<Cache.NamedKey>> cacheKeyCapture = EasyMock.newCapture();
|
||||
EasyMock.expect(cache.getBulk(EasyMock.capture(cacheKeyCapture)))
|
||||
.andReturn(ImmutableMap.<Cache.NamedKey, byte[]>of())
|
||||
.once();
|
||||
EasyMock.replay(cache);
|
||||
client = makeClient(MoreExecutors.sameThreadExecutor(), cache, limit);
|
||||
final DruidServer lastServer = servers[random.nextInt(servers.length)];
|
||||
final DataSegment dataSegment = EasyMock.createNiceMock(DataSegment.class);
|
||||
EasyMock.expect(dataSegment.getIdentifier()).andReturn(DATA_SOURCE).anyTimes();
|
||||
EasyMock.replay(dataSegment);
|
||||
final ServerSelector selector = new ServerSelector(
|
||||
dataSegment,
|
||||
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
|
||||
);
|
||||
selector.addServer(new QueryableDruidServer(lastServer, null));
|
||||
timeline.add(interval, "v", new SingleElementPartitionChunk<>(selector));
|
||||
|
||||
client.run(query, context);
|
||||
|
||||
Assert.assertTrue("Capture cache keys", cacheKeyCapture.hasCaptured());
|
||||
Assert.assertTrue("Cache key below limit", ImmutableList.copyOf(cacheKeyCapture.getValue()).size() <= limit);
|
||||
|
||||
EasyMock.verify(cache);
|
||||
|
||||
EasyMock.reset(cache);
|
||||
cacheKeyCapture.reset();
|
||||
EasyMock.expect(cache.getBulk(EasyMock.capture(cacheKeyCapture)))
|
||||
.andReturn(ImmutableMap.<Cache.NamedKey, byte[]>of())
|
||||
.once();
|
||||
EasyMock.replay(cache);
|
||||
client = makeClient(MoreExecutors.sameThreadExecutor(), cache, 0);
|
||||
client.run(query, context);
|
||||
EasyMock.verify(cache);
|
||||
EasyMock.verify(dataSegment);
|
||||
Assert.assertTrue("Capture cache keys", cacheKeyCapture.hasCaptured());
|
||||
Assert.assertTrue("Cache Keys empty", ImmutableList.copyOf(cacheKeyCapture.getValue()).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeseriesMergingOutOfOrderPartitions() throws Exception
|
||||
{
|
||||
|
@ -2055,61 +2166,17 @@ public class CachingClusteredClientTest
|
|||
|
||||
protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService)
|
||||
{
|
||||
final Supplier<GroupByQueryConfig> groupByQueryConfigSupplier = Suppliers.ofInstance(new GroupByQueryConfig());
|
||||
return makeClient(backgroundExecutorService, cache, 10);
|
||||
}
|
||||
|
||||
protected CachingClusteredClient makeClient(
|
||||
final ListeningExecutorService backgroundExecutorService,
|
||||
final Cache cache,
|
||||
final int mergeLimit
|
||||
)
|
||||
{
|
||||
return new CachingClusteredClient(
|
||||
new MapQueryToolChestWarehouse(
|
||||
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
|
||||
.put(
|
||||
TimeseriesQuery.class,
|
||||
new TimeseriesQueryQueryToolChest(
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
)
|
||||
.put(
|
||||
TopNQuery.class, new TopNQueryQueryToolChest(
|
||||
new TopNQueryConfig(),
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
)
|
||||
.put(
|
||||
SearchQuery.class, new SearchQueryQueryToolChest(
|
||||
new SearchQueryConfig(),
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
)
|
||||
.put(
|
||||
SelectQuery.class,
|
||||
new SelectQueryQueryToolChest(
|
||||
jsonMapper,
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
)
|
||||
.put(
|
||||
GroupByQuery.class,
|
||||
new GroupByQueryQueryToolChest(
|
||||
groupByQueryConfigSupplier,
|
||||
jsonMapper,
|
||||
new GroupByQueryEngine(
|
||||
groupByQueryConfigSupplier,
|
||||
new StupidPool<>(
|
||||
new Supplier<ByteBuffer>()
|
||||
{
|
||||
@Override
|
||||
public ByteBuffer get()
|
||||
{
|
||||
return ByteBuffer.allocate(1024 * 1024);
|
||||
}
|
||||
}
|
||||
)
|
||||
),
|
||||
TestQueryRunners.pool,
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
)
|
||||
.put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest())
|
||||
.build()
|
||||
),
|
||||
WAREHOUSE,
|
||||
new TimelineServerView()
|
||||
{
|
||||
@Override
|
||||
|
@ -2157,6 +2224,12 @@ public class CachingClusteredClientTest
|
|||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCacheBulkMergeLimit()
|
||||
{
|
||||
return mergeLimit;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue