rework code pulling from cache to be more readable

This commit is contained in:
xvrl 2013-01-18 15:04:12 -08:00
parent a70ae15585
commit 86ca8967ca
1 changed files with 45 additions and 32 deletions

View File

@ -20,8 +20,6 @@
package com.metamx.druid.client;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
@ -30,6 +28,7 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
@ -66,6 +65,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
/**
@ -134,9 +134,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
return Sequences.empty();
}
Map<Pair<ServerSelector, SegmentDescriptor>, CacheBroker.NamedKey> segments = Maps.newLinkedHashMap();
final byte[] queryCacheKey = (strategy != null) ? strategy.computeCacheKey(query) : null;
// build set of segments to query
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();
for (Interval interval : rewrittenQuery.getIntervals()) {
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = timeline.lookup(interval);
@ -148,43 +147,44 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
);
segments.put(
Pair.of(selector, descriptor),
queryCacheKey == null ? null :
computeSegmentCacheKey(selector.getSegment().getIdentifier(), descriptor, queryCacheKey)
);
segments.add(Pair.of(selector, descriptor));
}
}
}
Map<CacheBroker.NamedKey, byte[]> cachedValues = cacheBroker.getBulk(
Iterables.filter(segments.values(), Predicates.notNull())
);
final byte[] queryCacheKey;
if(strategy != null) {
queryCacheKey = strategy.computeCacheKey(query);
} else {
queryCacheKey = null;
}
for(Map.Entry<Pair<ServerSelector, SegmentDescriptor>, CacheBroker.NamedKey> entry : segments.entrySet()) {
Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey();
CacheBroker.NamedKey segmentCacheKey = entry.getValue();
// Pull cached segments from cache and remove from set of segments to query
if(useCache && queryCacheKey != null) {
Map<Pair<ServerSelector, SegmentDescriptor>, CacheBroker.NamedKey> cacheKeys = Maps.newHashMap();
for(Pair<ServerSelector, SegmentDescriptor> e : segments) {
cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey));
}
final ServerSelector selector = segment.lhs;
final SegmentDescriptor descriptor = segment.rhs;
final Interval segmentQueryInterval = descriptor.getInterval();
Map<CacheBroker.NamedKey, byte[]> cachedValues = cacheBroker.getBulk(cacheKeys.values());
final byte[] cachedValue = segmentCacheKey == null ? null : cachedValues.get(segmentCacheKey);
for(Map.Entry<Pair<ServerSelector, SegmentDescriptor>, CacheBroker.NamedKey> entry : cacheKeys.entrySet()) {
Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey();
CacheBroker.NamedKey segmentCacheKey = entry.getValue();
if (useCache && cachedValue != null) {
cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
} else {
final DruidServer server = selector.pick();
List<SegmentDescriptor> descriptors = serverSegments.get(server);
final ServerSelector selector = segment.lhs;
final SegmentDescriptor descriptor = segment.rhs;
final Interval segmentQueryInterval = descriptor.getInterval();
if (descriptors == null) {
descriptors = Lists.newArrayList();
serverSegments.put(server, descriptors);
final byte[] cachedValue = cachedValues.get(segmentCacheKey);
if (cachedValue != null) {
cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
// remove cached segment from set of segments to query
segments.remove(segment);
}
descriptors.add(descriptor);
if(segmentCacheKey != null) {
else {
final String segmentIdentifier = selector.getSegment().getIdentifier();
cachePopulatorMap.put(
String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
@ -194,6 +194,19 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
}
// Compile list of all segments not pulled from cache
for(Pair<ServerSelector, SegmentDescriptor> segment : segments) {
final DruidServer server = segment.lhs.pick();
List<SegmentDescriptor> descriptors = serverSegments.get(server);
if (descriptors == null) {
descriptors = Lists.newArrayList();
serverSegments.put(server, descriptors);
}
descriptors.add(segment.rhs);
}
return new LazySequence<T>(
new Supplier<Sequence<T>>()