fix populateCache flag not working

This commit is contained in:
Xavier Léauté 2014-03-04 13:24:56 -08:00
parent 0feddc3831
commit 5b4c04510f
1 changed files with 24 additions and 16 deletions

View File

@ -176,32 +176,37 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
queryCacheKey = null; queryCacheKey = null;
} }
// Pull cached segments from cache and remove from set of segments to query if (queryCacheKey != null) {
if (useCache && queryCacheKey != null) {
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap(); Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap();
for (Pair<ServerSelector, SegmentDescriptor> e : segments) { for (Pair<ServerSelector, SegmentDescriptor> segment : segments) {
cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey)); final Cache.NamedKey segmentCacheKey = computeSegmentCacheKey(
segment.lhs.getSegment().getIdentifier(),
segment.rhs,
queryCacheKey
);
cacheKeys.put(segment, segmentCacheKey);
} }
Map<Cache.NamedKey, byte[]> cachedValues = cache.getBulk(cacheKeys.values()); // Pull cached segments from cache and remove from set of segments to query
final Map<Cache.NamedKey, byte[]> cachedValues;
if (useCache) {
cachedValues = cache.getBulk(cacheKeys.values());
} else {
cachedValues = ImmutableMap.of();
}
for (Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) { for (Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) {
Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey(); Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey();
Cache.NamedKey segmentCacheKey = entry.getValue(); Cache.NamedKey segmentCacheKey = entry.getValue();
final Interval segmentQueryInterval = segment.rhs.getInterval();
final ServerSelector selector = segment.lhs;
final SegmentDescriptor descriptor = segment.rhs;
final Interval segmentQueryInterval = descriptor.getInterval();
final byte[] cachedValue = cachedValues.get(segmentCacheKey); final byte[] cachedValue = cachedValues.get(segmentCacheKey);
if (cachedValue != null) { if (cachedValue != null) {
cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
// remove cached segment from set of segments to query // remove cached segment from set of segments to query
segments.remove(segment); segments.remove(segment);
} else { cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
final String segmentIdentifier = selector.getSegment().getIdentifier(); } else if (populateCache) {
final String segmentIdentifier = segment.lhs.getSegment().getIdentifier();
cachePopulatorMap.put( cachePopulatorMap.put(
String.format("%s_%s", segmentIdentifier, segmentQueryInterval), String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
new CachePopulator(cache, objectMapper, segmentCacheKey) new CachePopulator(cache, objectMapper, segmentCacheKey)
@ -331,9 +336,12 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
String segmentIdentifier = value.getSegmentId(); String segmentIdentifier = value.getSegmentId();
final Iterable<T> segmentResults = value.getResults(); final Iterable<T> segmentResults = value.getResults();
cachePopulatorMap.get( CachePopulator cachePopulator = cachePopulatorMap.get(
String.format("%s_%s", segmentIdentifier, value.getInterval()) String.format("%s_%s", segmentIdentifier, value.getInterval())
).populate(Iterables.transform(segmentResults, prepareForCache)); );
if(cachePopulator != null) {
cachePopulator.populate(Iterables.transform(segmentResults, prepareForCache));
}
return Sequences.simple( return Sequences.simple(
Iterables.transform( Iterables.transform(