diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index 5d640dc4e38..163f1986a53 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -54,6 +54,7 @@ import com.metamx.druid.query.segment.SegmentDescriptor; import com.metamx.druid.result.BySegmentResultValueClass; import com.metamx.druid.result.Result; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -110,7 +111,7 @@ public class CachingClusteredClient implements QueryRunner public Sequence run(final Query query) { final QueryToolChest> toolChest = warehouse.getToolChest(query); - final CacheStrategy> strategy = toolChest.getCacheStrategy(query); + final CacheStrategy> strategy = toolChest.getCacheStrategy(query); final Map> serverSegments = Maps.newTreeMap(); @@ -241,6 +242,7 @@ public class CachingClusteredClient implements QueryRunner } final Function pullFromCacheFunction = strategy.pullFromCache(); + final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz(); for (Pair cachedResultPair : cachedResults) { final byte[] cachedResult = cachedResultPair.rhs; Sequence cachedSequence = new BaseSequence>( @@ -255,7 +257,8 @@ public class CachingClusteredClient implements QueryRunner } return objectMapper.readValues( - objectMapper.getJsonFactory().createJsonParser(cachedResult), Object.class + objectMapper.getJsonFactory().createJsonParser(cachedResult), + cacheObjectClazz ); } catch (IOException e) { diff --git a/client/src/main/java/com/metamx/druid/query/CacheStrategy.java b/client/src/main/java/com/metamx/druid/query/CacheStrategy.java index abdbe4da259..f8f5098f6ca 100644 --- a/client/src/main/java/com/metamx/druid/query/CacheStrategy.java +++ b/client/src/main/java/com/metamx/druid/query/CacheStrategy.java @@ -22,16 +22,19 @@ package com.metamx.druid.query; import com.google.common.base.Function; import com.metamx.common.guava.Sequence; import com.metamx.druid.Query; +import org.codehaus.jackson.type.TypeReference; /** */ -public interface CacheStrategy> +public interface CacheStrategy> { public byte[] computeCacheKey(QueryType query); - public Function prepareForCache(); + public TypeReference getCacheObjectClazz(); - public Function pullFromCache(); + public Function prepareForCache(); + + public Function pullFromCache(); public Sequence mergeSequences(Sequence> seqOfSequences); } diff --git a/client/src/main/java/com/metamx/druid/query/QueryToolChest.java b/client/src/main/java/com/metamx/druid/query/QueryToolChest.java index ebf77f64af4..bec2170ec92 100644 --- a/client/src/main/java/com/metamx/druid/query/QueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/QueryToolChest.java @@ -44,7 +44,7 @@ public interface QueryToolChest> public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query); public Function makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn); public TypeReference getResultTypeReference(); - public CacheStrategy getCacheStrategy(QueryType query); + public CacheStrategy getCacheStrategy(QueryType query); public QueryRunner preMergeQueryDecoration(QueryRunner runner); public QueryRunner postMergeQueryDecoration(QueryRunner runner); } diff --git a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java index 6ce071f8068..9dcf6110322 100644 --- a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java @@ -178,7 +178,7 @@ public class GroupByQueryQueryToolChest implements QueryToolChest getCacheStrategy(GroupByQuery query) + public CacheStrategy getCacheStrategy(GroupByQuery query) { return null; } diff --git a/client/src/main/java/com/metamx/druid/query/metadata/AllColumnIncluderator.java b/client/src/main/java/com/metamx/druid/query/metadata/AllColumnIncluderator.java index 8687f213503..cd96b5d718a 100644 --- a/client/src/main/java/com/metamx/druid/query/metadata/AllColumnIncluderator.java +++ b/client/src/main/java/com/metamx/druid/query/metadata/AllColumnIncluderator.java @@ -28,4 +28,10 @@ public class AllColumnIncluderator implements ColumnIncluderator { return true; } + + @Override + public byte[] getCacheKey() + { + return ALL_CACHE_PREFIX; + } } diff --git a/client/src/main/java/com/metamx/druid/query/metadata/ColumnIncluderator.java b/client/src/main/java/com/metamx/druid/query/metadata/ColumnIncluderator.java index 466167d48fd..90533c4eaca 100644 --- a/client/src/main/java/com/metamx/druid/query/metadata/ColumnIncluderator.java +++ b/client/src/main/java/com/metamx/druid/query/metadata/ColumnIncluderator.java @@ -32,5 +32,10 @@ import org.codehaus.jackson.annotate.JsonTypeInfo; }) public interface ColumnIncluderator { + public static final byte[] NONE_CACHE_PREFIX = new byte[]{0x0}; + public static final byte[] ALL_CACHE_PREFIX = new byte[]{0x1}; + public static final byte[] LIST_CACHE_PREFIX = new byte[]{0x2}; + public boolean include(String columnName); + public byte[] getCacheKey(); } diff --git a/client/src/main/java/com/metamx/druid/query/metadata/ListColumnIncluderator.java b/client/src/main/java/com/metamx/druid/query/metadata/ListColumnIncluderator.java index 4048dcd420a..e74661d6822 100644 --- a/client/src/main/java/com/metamx/druid/query/metadata/ListColumnIncluderator.java +++ b/client/src/main/java/com/metamx/druid/query/metadata/ListColumnIncluderator.java @@ -19,10 +19,16 @@ package com.metamx.druid.query.metadata; +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Set; @@ -53,4 +59,24 @@ public class ListColumnIncluderator implements ColumnIncluderator { return columns.contains(columnName); } + + @Override + public byte[] getCacheKey() + { + int size = 1; + List columns = Lists.newArrayListWithExpectedSize(this.columns.size()); + + for (String column : this.columns) { + final byte[] bytes = column.getBytes(Charsets.UTF_8); + columns.add(bytes); + size += bytes.length; + } + + final ByteBuffer bytes = ByteBuffer.allocate(size).put(LIST_CACHE_PREFIX); + for (byte[] column : columns) { + bytes.put(column); + } + + return bytes.array(); + } } diff --git a/client/src/main/java/com/metamx/druid/query/metadata/NoneColumnIncluderator.java b/client/src/main/java/com/metamx/druid/query/metadata/NoneColumnIncluderator.java index 56e6842b7ad..d1d66d26778 100644 --- a/client/src/main/java/com/metamx/druid/query/metadata/NoneColumnIncluderator.java +++ b/client/src/main/java/com/metamx/druid/query/metadata/NoneColumnIncluderator.java @@ -28,4 +28,10 @@ public class NoneColumnIncluderator implements ColumnIncluderator { return false; } + + @Override + public byte[] getCacheKey() + { + return NONE_CACHE_PREFIX; + } } diff --git a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 447bba2e5c6..160c23cd958 100644 --- a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -27,7 +27,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.metamx.common.ISE; -import com.metamx.common.guava.Comparators; +import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.druid.Query; @@ -44,8 +44,7 @@ import org.joda.time.Interval; import org.joda.time.Minutes; import javax.annotation.Nullable; -import java.util.Comparator; -import java.util.Iterator; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Set; @@ -54,6 +53,7 @@ import java.util.Set; public class SegmentMetadataQueryQueryToolChest implements QueryToolChest { private static final TypeReference TYPE_REFERENCE = new TypeReference(){}; + private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4}; @Override public QueryRunner mergeResults(final QueryRunner runner) @@ -165,9 +165,58 @@ public class SegmentMetadataQueryQueryToolChest implements QueryToolChest getCacheStrategy(SegmentMetadataQuery query) + public CacheStrategy getCacheStrategy(SegmentMetadataQuery query) { - return null; + return new CacheStrategy() + { + @Override + public byte[] computeCacheKey(SegmentMetadataQuery query) + { + byte[] includerBytes = query.getToInclude().getCacheKey(); + return ByteBuffer.allocate(1 + includerBytes.length) + .put(SEGMENT_METADATA_CACHE_PREFIX) + .put(includerBytes) + .array(); + } + + @Override + public TypeReference getCacheObjectClazz() + { + return getResultTypeReference(); + } + + @Override + public Function prepareForCache() + { + return new Function() + { + @Override + public SegmentAnalysis apply(@Nullable SegmentAnalysis input) + { + return input; + } + }; + } + + @Override + public Function pullFromCache() + { + return new Function() + { + @Override + public SegmentAnalysis apply(@Nullable SegmentAnalysis input) + { + return input; + } + }; + } + + @Override + public Sequence mergeSequences(Sequence> seqOfSequences) + { + return new MergeSequence(getOrdering(), seqOfSequences); + } + }; } @Override diff --git a/client/src/main/java/com/metamx/druid/query/search/SearchQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/search/SearchQueryQueryToolChest.java index 95757fc60b1..ce3fcc86114 100644 --- a/client/src/main/java/com/metamx/druid/query/search/SearchQueryQueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/search/SearchQueryQueryToolChest.java @@ -82,6 +82,10 @@ public class SearchQueryQueryToolChest implements QueryToolChest OBJECT_TYPE_REFERENCE = new TypeReference() + { + }; + @Override public QueryRunner> mergeResults(QueryRunner> runner) { @@ -143,9 +147,9 @@ public class SearchQueryQueryToolChest implements QueryToolChest, SearchQuery> getCacheStrategy(SearchQuery query) + public CacheStrategy, Object, SearchQuery> getCacheStrategy(SearchQuery query) { - return new CacheStrategy, SearchQuery>() + return new CacheStrategy, Object, SearchQuery>() { @Override public byte[] computeCacheKey(SearchQuery query) @@ -183,6 +187,12 @@ public class SearchQueryQueryToolChest implements QueryToolChest getCacheObjectClazz() + { + return OBJECT_TYPE_REFERENCE; + } + @Override public Function, Object> prepareForCache() { diff --git a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 9d65ab9b47c..5ee6c321bbb 100644 --- a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -53,6 +53,9 @@ public class TimeBoundaryQueryQueryToolChest private static final TypeReference> TYPE_REFERENCE = new TypeReference>() { }; + private static final TypeReference OBJECT_TYPE_REFERENCE = new TypeReference() + { + }; @Override public QueryRunner> mergeResults( @@ -106,9 +109,9 @@ public class TimeBoundaryQueryQueryToolChest } @Override - public CacheStrategy, TimeBoundaryQuery> getCacheStrategy(TimeBoundaryQuery query) + public CacheStrategy, Object, TimeBoundaryQuery> getCacheStrategy(TimeBoundaryQuery query) { - return new CacheStrategy, TimeBoundaryQuery>() + return new CacheStrategy, Object, TimeBoundaryQuery>() { @Override public byte[] computeCacheKey(TimeBoundaryQuery query) @@ -119,6 +122,12 @@ public class TimeBoundaryQueryQueryToolChest .array(); } + @Override + public TypeReference getCacheObjectClazz() + { + return OBJECT_TYPE_REFERENCE; + } + @Override public Function, Object> prepareForCache() { diff --git a/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index d8ef9802dad..9c633507ec5 100644 --- a/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -68,6 +68,9 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest> TYPE_REFERENCE = new TypeReference>() { }; + private static final TypeReference OBJECT_TYPE_REFERENCE = new TypeReference() + { + }; @Override public QueryRunner> mergeResults(QueryRunner> queryRunner) @@ -155,9 +158,9 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest, TimeseriesQuery> getCacheStrategy(final TimeseriesQuery query) + public CacheStrategy, Object, TimeseriesQuery> getCacheStrategy(final TimeseriesQuery query) { - return new CacheStrategy, TimeseriesQuery>() + return new CacheStrategy, Object, TimeseriesQuery>() { private final List aggs = query.getAggregatorSpecs(); private final List postAggs = query.getPostAggregatorSpecs(); @@ -179,6 +182,12 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest getCacheObjectClazz() + { + return OBJECT_TYPE_REFERENCE; + } + @Override public Function, Object> prepareForCache() { @@ -259,6 +268,4 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest getCacheStrategy(QueryType query) + public CacheStrategy getCacheStrategy(QueryType query) { return null; }