1) Add caching for SegmentMetadataQuery

This commit is contained in:
Eric Tschetter 2013-01-29 13:04:57 -06:00
parent 8af3ae09d6
commit da914b835e
13 changed files with 145 additions and 21 deletions

View File

@ -54,6 +54,7 @@ import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.druid.result.BySegmentResultValueClass; import com.metamx.druid.result.BySegmentResultValueClass;
import com.metamx.druid.result.Result; import com.metamx.druid.result.Result;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -110,7 +111,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query)
{ {
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query); final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final CacheStrategy<T, Query<T>> strategy = toolChest.getCacheStrategy(query); final CacheStrategy<T, Object, Query<T>> strategy = toolChest.getCacheStrategy(query);
final Map<DruidServer, List<SegmentDescriptor>> serverSegments = Maps.newTreeMap(); final Map<DruidServer, List<SegmentDescriptor>> serverSegments = Maps.newTreeMap();
@ -241,6 +242,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
} }
final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache(); final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache();
final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz();
for (Pair<DateTime, byte[]> cachedResultPair : cachedResults) { for (Pair<DateTime, byte[]> cachedResultPair : cachedResults) {
final byte[] cachedResult = cachedResultPair.rhs; final byte[] cachedResult = cachedResultPair.rhs;
Sequence<Object> cachedSequence = new BaseSequence<Object, Iterator<Object>>( Sequence<Object> cachedSequence = new BaseSequence<Object, Iterator<Object>>(
@ -255,7 +257,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
} }
return objectMapper.readValues( return objectMapper.readValues(
objectMapper.getJsonFactory().createJsonParser(cachedResult), Object.class objectMapper.getJsonFactory().createJsonParser(cachedResult),
cacheObjectClazz
); );
} }
catch (IOException e) { catch (IOException e) {

View File

@ -22,16 +22,19 @@ package com.metamx.druid.query;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.druid.Query; import com.metamx.druid.Query;
import org.codehaus.jackson.type.TypeReference;
/** /**
*/ */
public interface CacheStrategy<T, QueryType extends Query<T>> public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
{ {
public byte[] computeCacheKey(QueryType query); public byte[] computeCacheKey(QueryType query);
public Function<T, Object> prepareForCache(); public TypeReference<CacheType> getCacheObjectClazz();
public Function<Object, T> pullFromCache(); public Function<T, CacheType> prepareForCache();
public Function<CacheType, T> pullFromCache();
public Sequence<T> mergeSequences(Sequence<Sequence<T>> seqOfSequences); public Sequence<T> mergeSequences(Sequence<Sequence<T>> seqOfSequences);
} }

View File

@ -44,7 +44,7 @@ public interface QueryToolChest<ResultType, QueryType extends Query<ResultType>>
public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query); public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
public Function<ResultType, ResultType> makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn); public Function<ResultType, ResultType> makeMetricManipulatorFn(QueryType query, MetricManipulationFn fn);
public TypeReference<ResultType> getResultTypeReference(); public TypeReference<ResultType> getResultTypeReference();
public CacheStrategy<ResultType, QueryType> getCacheStrategy(QueryType query); public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query);
public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner); public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner);
public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner); public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner);
} }

View File

@ -178,7 +178,7 @@ public class GroupByQueryQueryToolChest implements QueryToolChest<Row, GroupByQu
} }
@Override @Override
public CacheStrategy<Row, GroupByQuery> getCacheStrategy(GroupByQuery query) public CacheStrategy<Row, Object, GroupByQuery> getCacheStrategy(GroupByQuery query)
{ {
return null; return null;
} }

View File

@ -28,4 +28,10 @@ public class AllColumnIncluderator implements ColumnIncluderator
{ {
return true; return true;
} }
@Override
public byte[] getCacheKey()
{
return ALL_CACHE_PREFIX;
}
} }

View File

@ -32,5 +32,10 @@ import org.codehaus.jackson.annotate.JsonTypeInfo;
}) })
public interface ColumnIncluderator public interface ColumnIncluderator
{ {
public static final byte[] NONE_CACHE_PREFIX = new byte[]{0x0};
public static final byte[] ALL_CACHE_PREFIX = new byte[]{0x1};
public static final byte[] LIST_CACHE_PREFIX = new byte[]{0x2};
public boolean include(String columnName); public boolean include(String columnName);
public byte[] getCacheKey();
} }

View File

@ -19,10 +19,16 @@
package com.metamx.druid.query.metadata; package com.metamx.druid.query.metadata;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -53,4 +59,24 @@ public class ListColumnIncluderator implements ColumnIncluderator
{ {
return columns.contains(columnName); return columns.contains(columnName);
} }
@Override
public byte[] getCacheKey()
{
int size = 1;
List<byte[]> columns = Lists.newArrayListWithExpectedSize(this.columns.size());
for (String column : this.columns) {
final byte[] bytes = column.getBytes(Charsets.UTF_8);
columns.add(bytes);
size += bytes.length;
}
final ByteBuffer bytes = ByteBuffer.allocate(size).put(LIST_CACHE_PREFIX);
for (byte[] column : columns) {
bytes.put(column);
}
return bytes.array();
}
} }

View File

@ -28,4 +28,10 @@ public class NoneColumnIncluderator implements ColumnIncluderator
{ {
return false; return false;
} }
@Override
public byte[] getCacheKey()
{
return NONE_CACHE_PREFIX;
}
} }

View File

@ -27,7 +27,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.druid.Query; import com.metamx.druid.Query;
@ -44,8 +44,7 @@ import org.joda.time.Interval;
import org.joda.time.Minutes; import org.joda.time.Minutes;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Comparator; import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -54,6 +53,7 @@ import java.util.Set;
public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<SegmentAnalysis, SegmentMetadataQuery> public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<SegmentAnalysis, SegmentMetadataQuery>
{ {
private static final TypeReference<SegmentAnalysis> TYPE_REFERENCE = new TypeReference<SegmentAnalysis>(){}; private static final TypeReference<SegmentAnalysis> TYPE_REFERENCE = new TypeReference<SegmentAnalysis>(){};
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
@Override @Override
public QueryRunner<SegmentAnalysis> mergeResults(final QueryRunner<SegmentAnalysis> runner) public QueryRunner<SegmentAnalysis> mergeResults(final QueryRunner<SegmentAnalysis> runner)
@ -165,9 +165,58 @@ public class SegmentMetadataQueryQueryToolChest implements QueryToolChest<Segmen
} }
@Override @Override
public CacheStrategy<SegmentAnalysis, SegmentMetadataQuery> getCacheStrategy(SegmentMetadataQuery query) public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> getCacheStrategy(SegmentMetadataQuery query)
{ {
return null; return new CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery>()
{
@Override
public byte[] computeCacheKey(SegmentMetadataQuery query)
{
byte[] includerBytes = query.getToInclude().getCacheKey();
return ByteBuffer.allocate(1 + includerBytes.length)
.put(SEGMENT_METADATA_CACHE_PREFIX)
.put(includerBytes)
.array();
}
@Override
public TypeReference<SegmentAnalysis> getCacheObjectClazz()
{
return getResultTypeReference();
}
@Override
public Function<SegmentAnalysis, SegmentAnalysis> prepareForCache()
{
return new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(@Nullable SegmentAnalysis input)
{
return input;
}
};
}
@Override
public Function<SegmentAnalysis, SegmentAnalysis> pullFromCache()
{
return new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(@Nullable SegmentAnalysis input)
{
return input;
}
};
}
@Override
public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysis>> seqOfSequences)
{
return new MergeSequence<SegmentAnalysis>(getOrdering(), seqOfSequences);
}
};
} }
@Override @Override

View File

@ -82,6 +82,10 @@ public class SearchQueryQueryToolChest implements QueryToolChest<Result<SearchRe
maxSearchLimit = PropUtils.getPropertyAsInt(props, "com.metamx.query.search.maxSearchLimit", 1000); maxSearchLimit = PropUtils.getPropertyAsInt(props, "com.metamx.query.search.maxSearchLimit", 1000);
} }
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
@Override @Override
public QueryRunner<Result<SearchResultValue>> mergeResults(QueryRunner<Result<SearchResultValue>> runner) public QueryRunner<Result<SearchResultValue>> mergeResults(QueryRunner<Result<SearchResultValue>> runner)
{ {
@ -143,9 +147,9 @@ public class SearchQueryQueryToolChest implements QueryToolChest<Result<SearchRe
} }
@Override @Override
public CacheStrategy<Result<SearchResultValue>, SearchQuery> getCacheStrategy(SearchQuery query) public CacheStrategy<Result<SearchResultValue>, Object, SearchQuery> getCacheStrategy(SearchQuery query)
{ {
return new CacheStrategy<Result<SearchResultValue>, SearchQuery>() return new CacheStrategy<Result<SearchResultValue>, Object, SearchQuery>()
{ {
@Override @Override
public byte[] computeCacheKey(SearchQuery query) public byte[] computeCacheKey(SearchQuery query)
@ -183,6 +187,12 @@ public class SearchQueryQueryToolChest implements QueryToolChest<Result<SearchRe
return queryCacheKey.array(); return queryCacheKey.array();
} }
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override @Override
public Function<Result<SearchResultValue>, Object> prepareForCache() public Function<Result<SearchResultValue>, Object> prepareForCache()
{ {

View File

@ -53,6 +53,9 @@ public class TimeBoundaryQueryQueryToolChest
private static final TypeReference<Result<TimeBoundaryResultValue>> TYPE_REFERENCE = new TypeReference<Result<TimeBoundaryResultValue>>() private static final TypeReference<Result<TimeBoundaryResultValue>> TYPE_REFERENCE = new TypeReference<Result<TimeBoundaryResultValue>>()
{ {
}; };
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
@Override @Override
public QueryRunner<Result<TimeBoundaryResultValue>> mergeResults( public QueryRunner<Result<TimeBoundaryResultValue>> mergeResults(
@ -106,9 +109,9 @@ public class TimeBoundaryQueryQueryToolChest
} }
@Override @Override
public CacheStrategy<Result<TimeBoundaryResultValue>, TimeBoundaryQuery> getCacheStrategy(TimeBoundaryQuery query) public CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery> getCacheStrategy(TimeBoundaryQuery query)
{ {
return new CacheStrategy<Result<TimeBoundaryResultValue>, TimeBoundaryQuery>() return new CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>()
{ {
@Override @Override
public byte[] computeCacheKey(TimeBoundaryQuery query) public byte[] computeCacheKey(TimeBoundaryQuery query)
@ -119,6 +122,12 @@ public class TimeBoundaryQueryQueryToolChest
.array(); .array();
} }
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override @Override
public Function<Result<TimeBoundaryResultValue>, Object> prepareForCache() public Function<Result<TimeBoundaryResultValue>, Object> prepareForCache()
{ {

View File

@ -68,6 +68,9 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
private static final TypeReference<Result<TimeseriesResultValue>> TYPE_REFERENCE = new TypeReference<Result<TimeseriesResultValue>>() private static final TypeReference<Result<TimeseriesResultValue>> TYPE_REFERENCE = new TypeReference<Result<TimeseriesResultValue>>()
{ {
}; };
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
@Override @Override
public QueryRunner<Result<TimeseriesResultValue>> mergeResults(QueryRunner<Result<TimeseriesResultValue>> queryRunner) public QueryRunner<Result<TimeseriesResultValue>> mergeResults(QueryRunner<Result<TimeseriesResultValue>> queryRunner)
@ -155,9 +158,9 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
} }
@Override @Override
public CacheStrategy<Result<TimeseriesResultValue>, TimeseriesQuery> getCacheStrategy(final TimeseriesQuery query) public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery> getCacheStrategy(final TimeseriesQuery query)
{ {
return new CacheStrategy<Result<TimeseriesResultValue>, TimeseriesQuery>() return new CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery>()
{ {
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs(); private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<PostAggregator> postAggs = query.getPostAggregatorSpecs(); private final List<PostAggregator> postAggs = query.getPostAggregatorSpecs();
@ -179,6 +182,12 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
.array(); .array();
} }
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override @Override
public Function<Result<TimeseriesResultValue>, Object> prepareForCache() public Function<Result<TimeseriesResultValue>, Object> prepareForCache()
{ {
@ -259,6 +268,4 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
{ {
return Ordering.natural(); return Ordering.natural();
} }
} }

View File

@ -421,7 +421,7 @@ public class ServerManagerTest
} }
@Override @Override
public CacheStrategy<T, QueryType> getCacheStrategy(QueryType query) public <Typer> CacheStrategy<T, Typer, QueryType> getCacheStrategy(QueryType query)
{ {
return null; return null;
} }