CachePopulatingQueryRunner -> CachingQueryRunner

Populate and use cache at compute node level
This commit is contained in:
Xavier Léauté 2014-04-04 10:49:00 -07:00
parent 3e6ffbd5ac
commit 60b95dabad
3 changed files with 136 additions and 29 deletions

View File

@ -19,10 +19,14 @@
package io.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.client.cache.Cache;
@ -34,9 +38,11 @@ import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
public class CachingQueryRunner<T> implements QueryRunner<T>
{
private final String segmentIdentifier;
@ -47,9 +53,10 @@ public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
private final ObjectMapper mapper;
private final CacheConfig cacheConfig;
public CachePopulatingQueryRunner(
public CachingQueryRunner(
String segmentIdentifier,
SegmentDescriptor segmentDescriptor, ObjectMapper mapper,
SegmentDescriptor segmentDescriptor,
ObjectMapper mapper,
Cache cache,
QueryToolChest toolchest,
QueryRunner<T> base,
@ -73,13 +80,57 @@ public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
final boolean populateCache = query.getContextPopulateCache(true)
&& strategy != null
&& cacheConfig.isPopulateCache();
if (populateCache) {
final Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
segmentIdentifier,
segmentDescriptor,
strategy.computeCacheKey(query)
);
final boolean useCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.USE_CACHE, "true"))
&& strategy != null
&& cacheConfig.isPopulateCache();
final Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
segmentIdentifier,
segmentDescriptor,
strategy.computeCacheKey(query)
);
if(useCache) {
final Function cacheFn = strategy.pullFromCache();
final byte[] cachedResult = cache.get(key);
if(cachedResult != null) {
final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz();
return Sequences.map(
new BaseSequence<>(
new BaseSequence.IteratorMaker<T, Iterator<T>>()
{
@Override
public Iterator<T> make()
{
try {
if (cachedResult.length == 0) {
return Iterators.emptyIterator();
}
return mapper.readValues(
mapper.getFactory().createParser(cachedResult),
cacheObjectClazz
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Override
public void cleanup(Iterator<T> iterFromMake)
{
}
}
),
cacheFn
);
}
}
if (populateCache) {
final Function cacheFn = strategy.prepareForCache();
final List<Object> cacheResults = Lists.newLinkedList();

View File

@ -29,7 +29,7 @@ import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachePopulatingQueryRunner;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.collections.CountingMap;
@ -410,7 +410,7 @@ public class ServerManager implements QuerySegmentWalker
new BySegmentQueryRunner<T>(
adapter.getIdentifier(),
adapter.getDataInterval().getStart(),
new CachePopulatingQueryRunner<T>(
new CachingQueryRunner<T>(
adapter.getIdentifier(),
segmentDescriptor,
objectMapper,
@ -424,4 +424,4 @@ public class ServerManager implements QuerySegmentWalker
segmentSpec
);
}
}
}

View File

@ -19,9 +19,9 @@
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
@ -57,12 +57,11 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class CachePopulatingQueryRunnerTest
public class CachingQueryRunnerTest
{
private static final List<AggregatorFactory> AGGS = Arrays.asList(
@ -71,17 +70,17 @@ public class CachePopulatingQueryRunnerTest
new LongSumAggregatorFactory("impers", "imps")
);
@Test
public void testCachePopulatingQueryRunnerResourceClosing() throws Exception
{
Object[] objects = new Object[]{
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
};
private static final Object[] objects = new Object[]{
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
};
@Test
public void testCloseAndPopulate() throws Exception
{
Iterable<Result<TopNResultValue>> expectedRes = makeTopNResults(false,objects);
final TopNQueryBuilder builder = new TopNQueryBuilder()
.dataSource("ds")
@ -114,10 +113,11 @@ public class CachePopulatingQueryRunnerTest
SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0);
TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig());
CachePopulatingQueryRunner runner = new CachePopulatingQueryRunner(
DefaultObjectMapper objectMapper = new DefaultObjectMapper();
CachingQueryRunner runner = new CachingQueryRunner(
segmentIdentifier,
segmentDescriptor,
new DefaultObjectMapper(),
objectMapper,
cache,
toolchest,
new QueryRunner()
@ -152,8 +152,6 @@ public class CachePopulatingQueryRunnerTest
Iterable<Result<TopNResultValue>> expectedCacheRes = makeTopNResults(true, objects);
ObjectMapper objectMapper = new DefaultObjectMapper();
byte[] cacheValue = cache.get(cacheKey);
Assert.assertNotNull(cacheValue);
@ -170,6 +168,64 @@ public class CachePopulatingQueryRunnerTest
Assert.assertEquals(expectedCacheRes, cacheResults);
}
@Test
public void testUseCache() throws Exception
{
DefaultObjectMapper objectMapper = new DefaultObjectMapper();
Iterable<Result<TopNResultValue>> expectedResults = makeTopNResults(true, objects);
String segmentIdentifier = "segment";
SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0);
TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig());
final TopNQueryBuilder builder = new TopNQueryBuilder()
.dataSource("ds")
.dimension("top_dim")
.metric("imps")
.threshold(3)
.intervals("2011-01-05/2011-01-10")
.aggregators(AGGS)
.granularity(AllGranularity.ALL);
final TopNQuery query = builder.build();
CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> cacheStrategy = toolchest.getCacheStrategy(query);
Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey(
segmentIdentifier,
segmentDescriptor,
cacheStrategy.computeCacheKey(query)
);
Cache cache = MapCache.create(1024 * 1024);
CacheUtil.populate(
cache,
objectMapper,
cacheKey,
Iterables.transform(expectedResults, cacheStrategy.prepareForCache())
);
CachingQueryRunner runner = new CachingQueryRunner(
segmentIdentifier,
segmentDescriptor,
objectMapper,
cache,
toolchest,
// return an empty sequence since results should get pulled from cache
new QueryRunner()
{
@Override
public Sequence run(Query query)
{
return Sequences.empty();
}
},
new CacheConfig()
);
List<Object> results = Sequences.toList(runner.run(query), new ArrayList());
Assert.assertEquals(expectedResults, results);
}
private Iterable<Result<TopNResultValue>> makeTopNResults
(boolean cachedResults, Object... objects)
{