diff --git a/client/pom.xml b/client/pom.xml
index 55358d59bab..f5025092b8f 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.1.25-SNAPSHOT
+ 0.2.7-SNAPSHOT
diff --git a/client/src/main/java/com/metamx/druid/BaseQuery.java b/client/src/main/java/com/metamx/druid/BaseQuery.java
index 4538467c167..76448ed9f17 100644
--- a/client/src/main/java/com/metamx/druid/BaseQuery.java
+++ b/client/src/main/java/com/metamx/druid/BaseQuery.java
@@ -22,6 +22,7 @@ package com.metamx.druid;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
+import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.segment.QuerySegmentSpec;
import com.metamx.druid.query.segment.QuerySegmentWalker;
import org.codehaus.jackson.annotate.JsonProperty;
@@ -72,7 +73,12 @@ public abstract class BaseQuery implements Query
@Override
public Sequence run(QuerySegmentWalker walker)
{
- return querySegmentSpec.lookup(this, walker).run(this);
+ return run(querySegmentSpec.lookup(this, walker));
+ }
+
+ public Sequence run(QueryRunner runner)
+ {
+ return runner.run(this);
}
@Override
diff --git a/client/src/main/java/com/metamx/druid/Query.java b/client/src/main/java/com/metamx/druid/Query.java
index bd1dc49702a..4c4e7f715b4 100644
--- a/client/src/main/java/com/metamx/druid/Query.java
+++ b/client/src/main/java/com/metamx/druid/Query.java
@@ -20,6 +20,7 @@
package com.metamx.druid;
import com.metamx.common.guava.Sequence;
+import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.search.SearchQuery;
@@ -57,6 +58,8 @@ public interface Query
public Sequence run(QuerySegmentWalker walker);
+ public Sequence run(QueryRunner runner);
+
public List getIntervals();
public Duration getDuration();
diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
index 47376dd2e68..163f1986a53 100644
--- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
+++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
@@ -41,7 +42,6 @@ import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.cache.Cache;
-import com.metamx.druid.client.cache.CacheBroker;
import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.CacheStrategy;
@@ -54,6 +54,7 @@ import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.druid.result.BySegmentResultValueClass;
import com.metamx.druid.result.Result;
import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -64,6 +65,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executors;
/**
@@ -74,19 +76,19 @@ public class CachingClusteredClient implements QueryRunner
private final QueryToolChestWarehouse warehouse;
private final ServerView serverView;
- private final CacheBroker cacheBroker;
+ private final Cache cache;
private final ObjectMapper objectMapper;
public CachingClusteredClient(
QueryToolChestWarehouse warehouse,
ServerView serverView,
- CacheBroker cacheBroker,
+ Cache cache,
ObjectMapper objectMapper
)
{
this.warehouse = warehouse;
this.serverView = serverView;
- this.cacheBroker = cacheBroker;
+ this.cache = cache;
this.objectMapper = objectMapper;
serverView.registerSegmentCallback(
@@ -98,7 +100,7 @@ public class CachingClusteredClient implements QueryRunner
@Override
public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment)
{
- CachingClusteredClient.this.cacheBroker.provideCache(segment.getIdentifier()).close();
+ CachingClusteredClient.this.cache.close(segment.getIdentifier());
return ServerView.CallbackAction.CONTINUE;
}
}
@@ -109,9 +111,10 @@ public class CachingClusteredClient implements QueryRunner
public Sequence run(final Query query)
{
final QueryToolChest> toolChest = warehouse.getToolChest(query);
- final CacheStrategy> strategy = toolChest.getCacheStrategy(query);
+ final CacheStrategy> strategy = toolChest.getCacheStrategy(query);
+
+ final Map> serverSegments = Maps.newTreeMap();
- final Map> segs = Maps.newTreeMap();
final List> cachedResults = Lists.newArrayList();
final Map cachePopulatorMap = Maps.newHashMap();
@@ -131,10 +134,8 @@ public class CachingClusteredClient implements QueryRunner
return Sequences.empty();
}
- byte[] queryCacheKey = null;
- if (strategy != null) {
- queryCacheKey = strategy.computeCacheKey(query);
- }
+ // build set of segments to query
+ Set> segments = Sets.newLinkedHashSet();
for (Interval interval : rewrittenQuery.getIntervals()) {
List> serversLookup = timeline.lookup(interval);
@@ -146,55 +147,67 @@ public class CachingClusteredClient implements QueryRunner
holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
);
- if (queryCacheKey == null) {
- final DruidServer server = selector.pick();
- List descriptors = segs.get(server);
-
- if (descriptors == null) {
- descriptors = Lists.newArrayList();
- segs.put(server, descriptors);
- }
-
- descriptors.add(descriptor);
- }
- else {
- final Interval segmentQueryInterval = holder.getInterval();
- final byte[] versionBytes = descriptor.getVersion().getBytes();
-
- final byte[] cacheKey = ByteBuffer
- .allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
- .putLong(segmentQueryInterval.getStartMillis())
- .putLong(segmentQueryInterval.getEndMillis())
- .put(versionBytes)
- .putInt(descriptor.getPartitionNumber())
- .put(queryCacheKey)
- .array();
- final String segmentIdentifier = selector.getSegment().getIdentifier();
- final Cache cache = cacheBroker.provideCache(segmentIdentifier);
- final byte[] cachedValue = cache.get(cacheKey);
-
- if (useCache && cachedValue != null) {
- cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
- } else {
- final DruidServer server = selector.pick();
- List descriptors = segs.get(server);
-
- if (descriptors == null) {
- descriptors = Lists.newArrayList();
- segs.put(server, descriptors);
- }
-
- descriptors.add(descriptor);
- cachePopulatorMap.put(
- String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
- new CachePopulator(cache, objectMapper, cacheKey)
- );
- }
- }
+ segments.add(Pair.of(selector, descriptor));
}
}
}
+ final byte[] queryCacheKey;
+ if(strategy != null) {
+ queryCacheKey = strategy.computeCacheKey(query);
+ } else {
+ queryCacheKey = null;
+ }
+
+ // Pull cached segments from cache and remove from set of segments to query
+ if(useCache && queryCacheKey != null) {
+ Map, Cache.NamedKey> cacheKeys = Maps.newHashMap();
+ for(Pair e : segments) {
+ cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey));
+ }
+
+ Map cachedValues = cache.getBulk(cacheKeys.values());
+
+ for(Map.Entry, Cache.NamedKey> entry : cacheKeys.entrySet()) {
+ Pair segment = entry.getKey();
+ Cache.NamedKey segmentCacheKey = entry.getValue();
+
+ final ServerSelector selector = segment.lhs;
+ final SegmentDescriptor descriptor = segment.rhs;
+ final Interval segmentQueryInterval = descriptor.getInterval();
+
+ final byte[] cachedValue = cachedValues.get(segmentCacheKey);
+
+ if (cachedValue != null) {
+ cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
+
+ // remove cached segment from set of segments to query
+ segments.remove(segment);
+ }
+ else {
+ final String segmentIdentifier = selector.getSegment().getIdentifier();
+ cachePopulatorMap.put(
+ String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
+ new CachePopulator(cache, objectMapper, segmentCacheKey)
+ );
+ }
+ }
+ }
+
+ // Compile list of all segments not pulled from cache
+ for(Pair segment : segments) {
+ final DruidServer server = segment.lhs.pick();
+ List descriptors = serverSegments.get(server);
+
+ if (descriptors == null) {
+ descriptors = Lists.newArrayList();
+ serverSegments.put(server, descriptors);
+ }
+
+ descriptors.add(segment.rhs);
+ }
+
+
return new LazySequence(
new Supplier>()
{
@@ -229,6 +242,7 @@ public class CachingClusteredClient implements QueryRunner
}
final Function