diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 99f0031e4f7..f01e18ad9e9 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -151,6 +151,7 @@ public class CachingClusteredClient implements QueryRunner contextBuilder.put("priority", priority); if (populateCache) { + // prevent down-stream nodes from caching results as well if we are populating the cache contextBuilder.put(CacheConfig.POPULATE_CACHE, false); contextBuilder.put("bySegment", true); } @@ -186,14 +187,17 @@ public class CachingClusteredClient implements QueryRunner } final byte[] queryCacheKey; - if (strategy != null) { + + if ( (populateCache || useCache) // implies strategy != null + && !isBySegment ) // explicit bySegment queries are never cached + { queryCacheKey = strategy.computeCacheKey(query); } else { queryCacheKey = null; } if (queryCacheKey != null) { - // cache keys must preserve segment ordering, in order for shards to always be combined in the same order + // cachKeys map must preserve segment ordering, in order for shards to always be combined in the same order Map, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap(); for (Pair segment : segments) { final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey( @@ -223,6 +227,7 @@ public class CachingClusteredClient implements QueryRunner segments.remove(segment); cachedResults.add(Pair.of(segmentQueryInterval, cachedValue)); } else if (populateCache) { + // otherwise, if populating cache, add segment to list of segments to cache final String segmentIdentifier = segment.lhs.getSegment().getIdentifier(); cachePopulatorMap.put( String.format("%s_%s", segmentIdentifier, segmentQueryInterval), @@ -326,13 +331,15 @@ public class CachingClusteredClient implements QueryRunner } final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors); - List intervals = segmentSpec.getIntervals(); + final List intervals = segmentSpec.getIntervals(); final Sequence resultSeqToAdd; if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable if (!isBySegment) { resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext); } else { + // bySegment queries need to be de-serialized, see DirectDruidClient.run() + @SuppressWarnings("unchecked") final Query>> bySegmentQuery = (Query>>) query; @@ -386,6 +393,9 @@ public class CachingClusteredClient implements QueryRunner public Sequence apply(Result> input) { final BySegmentResultValueClass value = input.getValue(); + final CachePopulator cachePopulator = cachePopulatorMap.get( + String.format("%s_%s", value.getSegmentId(), value.getInterval()) + ); final List cacheData = Lists.newLinkedList(); @@ -398,18 +408,21 @@ public class CachingClusteredClient implements QueryRunner @Override public T apply(final T input) { - cacheFutures.add( - backgroundExecutorService.submit( - new Runnable() - { - @Override - public void run() + if(cachePopulator != null) { + // only compute cache data if populating cache + cacheFutures.add( + backgroundExecutorService.submit( + new Runnable() { - cacheData.add(cacheFn.apply(input)); + @Override + public void run() + { + cacheData.add(cacheFn.apply(input)); + } } - } - ) - ); + ) + ); + } return input; } } @@ -426,9 +439,6 @@ public class CachingClusteredClient implements QueryRunner @Override public void run() { - CachePopulator cachePopulator = cachePopulatorMap.get( - String.format("%s_%s", value.getSegmentId(), value.getInterval()) - ); if (cachePopulator != null) { try { Futures.allAsList(cacheFutures).get(); diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java b/server/src/main/java/io/druid/client/cache/CacheConfig.java index 637d8f36ff0..9321327de2b 100644 --- a/server/src/main/java/io/druid/client/cache/CacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java @@ -54,6 +54,7 @@ public class CacheConfig { return useCache; } + public int getNumBackgroundThreads(){ return numBackgroundThreads; }