fixes for bySegment queries and caching

- prevent bySegment queries from getting cached
- avoid computing cache keys if not populating or using cache
- avoid computing cache data when not populating cache
This commit is contained in:
Xavier Léauté 2014-12-10 10:57:48 -08:00
parent c44242f0af
commit 7853e801d5
2 changed files with 27 additions and 16 deletions

View File

@ -151,6 +151,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
contextBuilder.put("priority", priority); contextBuilder.put("priority", priority);
if (populateCache) { if (populateCache) {
// prevent down-stream nodes from caching results as well if we are populating the cache
contextBuilder.put(CacheConfig.POPULATE_CACHE, false); contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
contextBuilder.put("bySegment", true); contextBuilder.put("bySegment", true);
} }
@ -186,14 +187,17 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
} }
final byte[] queryCacheKey; final byte[] queryCacheKey;
if (strategy != null) {
if ( (populateCache || useCache) // implies strategy != null
&& !isBySegment ) // explicit bySegment queries are never cached
{
queryCacheKey = strategy.computeCacheKey(query); queryCacheKey = strategy.computeCacheKey(query);
} else { } else {
queryCacheKey = null; queryCacheKey = null;
} }
if (queryCacheKey != null) { if (queryCacheKey != null) {
// cache keys must preserve segment ordering, in order for shards to always be combined in the same order // cachKeys map must preserve segment ordering, in order for shards to always be combined in the same order
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap(); Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap();
for (Pair<ServerSelector, SegmentDescriptor> segment : segments) { for (Pair<ServerSelector, SegmentDescriptor> segment : segments) {
final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey( final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey(
@ -223,6 +227,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
segments.remove(segment); segments.remove(segment);
cachedResults.add(Pair.of(segmentQueryInterval, cachedValue)); cachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
} else if (populateCache) { } else if (populateCache) {
// otherwise, if populating cache, add segment to list of segments to cache
final String segmentIdentifier = segment.lhs.getSegment().getIdentifier(); final String segmentIdentifier = segment.lhs.getSegment().getIdentifier();
cachePopulatorMap.put( cachePopulatorMap.put(
String.format("%s_%s", segmentIdentifier, segmentQueryInterval), String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
@ -326,13 +331,15 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
} }
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors); final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
List<Interval> intervals = segmentSpec.getIntervals(); final List<Interval> intervals = segmentSpec.getIntervals();
final Sequence<T> resultSeqToAdd; final Sequence<T> resultSeqToAdd;
if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable
if (!isBySegment) { if (!isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext); resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext);
} else { } else {
// bySegment queries need to be de-serialized, see DirectDruidClient.run()
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Query<Result<BySegmentResultValueClass<T>>> bySegmentQuery = (Query<Result<BySegmentResultValueClass<T>>>) query; final Query<Result<BySegmentResultValueClass<T>>> bySegmentQuery = (Query<Result<BySegmentResultValueClass<T>>>) query;
@ -386,6 +393,9 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
public Sequence<T> apply(Result<BySegmentResultValueClass<T>> input) public Sequence<T> apply(Result<BySegmentResultValueClass<T>> input)
{ {
final BySegmentResultValueClass<T> value = input.getValue(); final BySegmentResultValueClass<T> value = input.getValue();
final CachePopulator cachePopulator = cachePopulatorMap.get(
String.format("%s_%s", value.getSegmentId(), value.getInterval())
);
final List<Object> cacheData = Lists.newLinkedList(); final List<Object> cacheData = Lists.newLinkedList();
@ -398,18 +408,21 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
@Override @Override
public T apply(final T input) public T apply(final T input)
{ {
cacheFutures.add( if(cachePopulator != null) {
backgroundExecutorService.submit( // only compute cache data if populating cache
new Runnable() cacheFutures.add(
{ backgroundExecutorService.submit(
@Override new Runnable()
public void run()
{ {
cacheData.add(cacheFn.apply(input)); @Override
public void run()
{
cacheData.add(cacheFn.apply(input));
}
} }
} )
) );
); }
return input; return input;
} }
} }
@ -426,9 +439,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
@Override @Override
public void run() public void run()
{ {
CachePopulator cachePopulator = cachePopulatorMap.get(
String.format("%s_%s", value.getSegmentId(), value.getInterval())
);
if (cachePopulator != null) { if (cachePopulator != null) {
try { try {
Futures.allAsList(cacheFutures).get(); Futures.allAsList(cacheFutures).get();

View File

@ -54,6 +54,7 @@ public class CacheConfig
{ {
return useCache; return useCache;
} }
public int getNumBackgroundThreads(){ public int getNumBackgroundThreads(){
return numBackgroundThreads; return numBackgroundThreads;
} }