mirror of https://github.com/apache/druid.git
Merge pull request #458 from metamx/stream-cache
use and populate cache at compute node level
This commit is contained in:
commit
c05f169171
2
pom.xml
2
pom.xml
|
@ -39,7 +39,7 @@
|
|||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<metamx.java-util.version>0.25.3</metamx.java-util.version>
|
||||
<metamx.java-util.version>0.25.4</metamx.java-util.version>
|
||||
<apache.curator.version>2.4.0</apache.curator.version>
|
||||
<druid.api.version>0.1.11</druid.api.version>
|
||||
</properties>
|
||||
|
|
|
@ -1,98 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.MapCache;
|
||||
import io.druid.query.CacheStrategy;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
|
||||
private final String segmentIdentifier;
|
||||
private final SegmentDescriptor segmentDescriptor;
|
||||
private final QueryRunner<T> base;
|
||||
private final QueryToolChest toolChest;
|
||||
private final Cache cache;
|
||||
private final ObjectMapper mapper;
|
||||
private final CacheConfig cacheConfig;
|
||||
|
||||
public CachePopulatingQueryRunner(
|
||||
String segmentIdentifier,
|
||||
SegmentDescriptor segmentDescriptor, ObjectMapper mapper,
|
||||
Cache cache,
|
||||
QueryToolChest toolchest,
|
||||
QueryRunner<T> base,
|
||||
CacheConfig cacheConfig
|
||||
)
|
||||
{
|
||||
this.base = base;
|
||||
this.segmentIdentifier = segmentIdentifier;
|
||||
this.segmentDescriptor = segmentDescriptor;
|
||||
this.toolChest = toolchest;
|
||||
this.cache = cache;
|
||||
this.mapper = mapper;
|
||||
this.cacheConfig = cacheConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(Query<T> query)
|
||||
{
|
||||
|
||||
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
|
||||
|
||||
final boolean populateCache = query.getContextPopulateCache(true)
|
||||
&& strategy != null
|
||||
&& cacheConfig.isPopulateCache()
|
||||
// historical only populates distributed cache since the cache lookups are done at broker.
|
||||
&& !(cache instanceof MapCache);
|
||||
if (populateCache) {
|
||||
Sequence<T> results = base.run(query);
|
||||
Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
|
||||
segmentIdentifier,
|
||||
segmentDescriptor,
|
||||
strategy.computeCacheKey(query)
|
||||
);
|
||||
ArrayList<T> resultAsList = Sequences.toList(results, new ArrayList<T>());
|
||||
CacheUtil.populate(
|
||||
cache,
|
||||
mapper,
|
||||
key,
|
||||
Lists.transform(resultAsList, strategy.prepareForCache())
|
||||
);
|
||||
return Sequences.simple(resultAsList);
|
||||
} else {
|
||||
return base.run(query);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.guava.BaseSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.query.CacheStrategy;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class CachingQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
|
||||
private final String segmentIdentifier;
|
||||
private final SegmentDescriptor segmentDescriptor;
|
||||
private final QueryRunner<T> base;
|
||||
private final QueryToolChest toolChest;
|
||||
private final Cache cache;
|
||||
private final ObjectMapper mapper;
|
||||
private final CacheConfig cacheConfig;
|
||||
|
||||
public CachingQueryRunner(
|
||||
String segmentIdentifier,
|
||||
SegmentDescriptor segmentDescriptor,
|
||||
ObjectMapper mapper,
|
||||
Cache cache,
|
||||
QueryToolChest toolchest,
|
||||
QueryRunner<T> base,
|
||||
CacheConfig cacheConfig
|
||||
)
|
||||
{
|
||||
this.base = base;
|
||||
this.segmentIdentifier = segmentIdentifier;
|
||||
this.segmentDescriptor = segmentDescriptor;
|
||||
this.toolChest = toolchest;
|
||||
this.cache = cache;
|
||||
this.mapper = mapper;
|
||||
this.cacheConfig = cacheConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(Query<T> query)
|
||||
{
|
||||
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
|
||||
|
||||
final boolean populateCache = query.getContextPopulateCache(true)
|
||||
&& strategy != null
|
||||
&& cacheConfig.isPopulateCache();
|
||||
|
||||
final boolean useCache = query.getContextUseCache(true)
|
||||
&& strategy != null
|
||||
&& cacheConfig.isPopulateCache();
|
||||
|
||||
final Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
|
||||
segmentIdentifier,
|
||||
segmentDescriptor,
|
||||
strategy.computeCacheKey(query)
|
||||
);
|
||||
|
||||
if(useCache) {
|
||||
final Function cacheFn = strategy.pullFromCache();
|
||||
final byte[] cachedResult = cache.get(key);
|
||||
if(cachedResult != null) {
|
||||
final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz();
|
||||
|
||||
return Sequences.map(
|
||||
new BaseSequence<>(
|
||||
new BaseSequence.IteratorMaker<T, Iterator<T>>()
|
||||
{
|
||||
@Override
|
||||
public Iterator<T> make()
|
||||
{
|
||||
try {
|
||||
if (cachedResult.length == 0) {
|
||||
return Iterators.emptyIterator();
|
||||
}
|
||||
|
||||
return mapper.readValues(
|
||||
mapper.getFactory().createParser(cachedResult),
|
||||
cacheObjectClazz
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup(Iterator<T> iterFromMake)
|
||||
{
|
||||
}
|
||||
}
|
||||
),
|
||||
cacheFn
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (populateCache) {
|
||||
final Function cacheFn = strategy.prepareForCache();
|
||||
final List<Object> cacheResults = Lists.newLinkedList();
|
||||
|
||||
return Sequences.withEffect(
|
||||
Sequences.map(
|
||||
base.run(query),
|
||||
new Function<T, T>()
|
||||
{
|
||||
@Override
|
||||
public T apply(T input)
|
||||
{
|
||||
cacheResults.add(cacheFn.apply(input));
|
||||
return input;
|
||||
}
|
||||
}
|
||||
),
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
CacheUtil.populate(cache, mapper, key, cacheResults);
|
||||
}
|
||||
},
|
||||
MoreExecutors.sameThreadExecutor()
|
||||
);
|
||||
} else {
|
||||
return base.run(query);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -29,7 +29,7 @@ import com.metamx.common.guava.FunctionalIterable;
|
|||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.client.CachePopulatingQueryRunner;
|
||||
import io.druid.client.CachingQueryRunner;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.collections.CountingMap;
|
||||
|
@ -410,7 +410,7 @@ public class ServerManager implements QuerySegmentWalker
|
|||
new BySegmentQueryRunner<T>(
|
||||
adapter.getIdentifier(),
|
||||
adapter.getDataInterval().getStart(),
|
||||
new CachePopulatingQueryRunner<T>(
|
||||
new CachingQueryRunner<T>(
|
||||
adapter.getIdentifier(),
|
||||
segmentDescriptor,
|
||||
objectMapper,
|
||||
|
|
|
@ -1,185 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.ResourceClosingSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.guava.Yielder;
|
||||
import com.metamx.common.guava.YieldingAccumulator;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.granularity.AllGranularity;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.topn.TopNQueryBuilder;
|
||||
import io.druid.query.topn.TopNQueryConfig;
|
||||
import io.druid.query.topn.TopNQueryQueryToolChest;
|
||||
import io.druid.query.topn.TopNResultValue;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class CachePopulatingQueryRunnerTest
|
||||
{
|
||||
|
||||
private static final List<AggregatorFactory> AGGS = Arrays.asList(
|
||||
new CountAggregatorFactory("rows"),
|
||||
new LongSumAggregatorFactory("imps", "imps"),
|
||||
new LongSumAggregatorFactory("impers", "imps")
|
||||
);
|
||||
|
||||
@Test
|
||||
public void testCachePopulatingQueryRunnerResourceClosing() throws Exception
|
||||
{
|
||||
Iterable<Result<TopNResultValue>> expectedRes = makeTopNResults(
|
||||
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
|
||||
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
|
||||
);
|
||||
final TopNQueryBuilder builder = new TopNQueryBuilder()
|
||||
.dataSource("ds")
|
||||
.dimension("top_dim")
|
||||
.metric("imps")
|
||||
.threshold(3)
|
||||
.intervals("2011-01-05/2011-01-10")
|
||||
.aggregators(AGGS)
|
||||
.granularity(AllGranularity.ALL);
|
||||
|
||||
final AssertingClosable closable = new AssertingClosable();
|
||||
final Sequence resultSeq = new ResourceClosingSequence(
|
||||
Sequences.simple(expectedRes), closable
|
||||
)
|
||||
{
|
||||
@Override
|
||||
public Yielder toYielder(Object initValue, YieldingAccumulator accumulator)
|
||||
{
|
||||
Assert.assertFalse(closable.isClosed());
|
||||
return super.toYielder(
|
||||
initValue,
|
||||
accumulator
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
Cache cache = EasyMock.createMock(Cache.class);
|
||||
// cache populater ignores populating for local cache, so a dummy cache
|
||||
EasyMock.expect(cache.isLocal()).andReturn(false);
|
||||
CachePopulatingQueryRunner runner = new CachePopulatingQueryRunner(
|
||||
"segment",
|
||||
new SegmentDescriptor(new Interval("2011/2012"), "version", 0),
|
||||
new DefaultObjectMapper(),
|
||||
cache,
|
||||
new TopNQueryQueryToolChest(new TopNQueryConfig()),
|
||||
new QueryRunner()
|
||||
{
|
||||
@Override
|
||||
public Sequence run(Query query)
|
||||
{
|
||||
return resultSeq;
|
||||
}
|
||||
},
|
||||
new CacheConfig()
|
||||
|
||||
);
|
||||
|
||||
Sequence res = runner.run(builder.build());
|
||||
// base sequence is not closed yet
|
||||
Assert.assertTrue(closable.isClosed());
|
||||
ArrayList results = Sequences.toList(res, new ArrayList());
|
||||
Assert.assertTrue(closable.isClosed());
|
||||
Assert.assertEquals(expectedRes, results);
|
||||
|
||||
}
|
||||
|
||||
private Iterable<Result<TopNResultValue>> makeTopNResults
|
||||
(Object... objects)
|
||||
{
|
||||
List<Result<TopNResultValue>> retVal = Lists.newArrayList();
|
||||
int index = 0;
|
||||
while (index < objects.length) {
|
||||
DateTime timestamp = (DateTime) objects[index++];
|
||||
|
||||
List<Map<String, Object>> values = Lists.newArrayList();
|
||||
while (index < objects.length && !(objects[index] instanceof DateTime)) {
|
||||
if (objects.length - index < 3) {
|
||||
throw new ISE(
|
||||
"expect 3 values for each entry in the top list, had %d values left.", objects.length - index
|
||||
);
|
||||
}
|
||||
final double imps = ((Number) objects[index + 2]).doubleValue();
|
||||
final double rows = ((Number) objects[index + 1]).doubleValue();
|
||||
values.add(
|
||||
ImmutableMap.of(
|
||||
"top_dim", objects[index],
|
||||
"rows", rows,
|
||||
"imps", imps,
|
||||
"impers", imps,
|
||||
"avg_imps_per_row", imps / rows
|
||||
)
|
||||
);
|
||||
index += 3;
|
||||
}
|
||||
|
||||
retVal.add(new Result<>(timestamp, new TopNResultValue(values)));
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private static class AssertingClosable implements Closeable
|
||||
{
|
||||
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
Assert.assertFalse(closed.get());
|
||||
Assert.assertTrue(closed.compareAndSet(false, true));
|
||||
}
|
||||
|
||||
public boolean isClosed()
|
||||
{
|
||||
return closed.get();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,293 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.ResourceClosingSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.guava.Yielder;
|
||||
import com.metamx.common.guava.YieldingAccumulator;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.MapCache;
|
||||
import io.druid.granularity.AllGranularity;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.CacheStrategy;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.topn.TopNQuery;
|
||||
import io.druid.query.topn.TopNQueryBuilder;
|
||||
import io.druid.query.topn.TopNQueryConfig;
|
||||
import io.druid.query.topn.TopNQueryQueryToolChest;
|
||||
import io.druid.query.topn.TopNResultValue;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class CachingQueryRunnerTest
|
||||
{
|
||||
|
||||
private static final List<AggregatorFactory> AGGS = Arrays.asList(
|
||||
new CountAggregatorFactory("rows"),
|
||||
new LongSumAggregatorFactory("imps", "imps"),
|
||||
new LongSumAggregatorFactory("impers", "imps")
|
||||
);
|
||||
|
||||
private static final Object[] objects = new Object[]{
|
||||
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
|
||||
new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
|
||||
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
|
||||
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
|
||||
};
|
||||
|
||||
@Test
|
||||
public void testCloseAndPopulate() throws Exception
|
||||
{
|
||||
Iterable<Result<TopNResultValue>> expectedRes = makeTopNResults(false,objects);
|
||||
final TopNQueryBuilder builder = new TopNQueryBuilder()
|
||||
.dataSource("ds")
|
||||
.dimension("top_dim")
|
||||
.metric("imps")
|
||||
.threshold(3)
|
||||
.intervals("2011-01-05/2011-01-10")
|
||||
.aggregators(AGGS)
|
||||
.granularity(AllGranularity.ALL);
|
||||
|
||||
final AssertingClosable closable = new AssertingClosable();
|
||||
final Sequence resultSeq = new ResourceClosingSequence(
|
||||
Sequences.simple(expectedRes), closable
|
||||
)
|
||||
{
|
||||
@Override
|
||||
public Yielder toYielder(Object initValue, YieldingAccumulator accumulator)
|
||||
{
|
||||
Assert.assertFalse(closable.isClosed());
|
||||
return super.toYielder(
|
||||
initValue,
|
||||
accumulator
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
Cache cache = MapCache.create(1024 * 1024);
|
||||
|
||||
String segmentIdentifier = "segment";
|
||||
SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0);
|
||||
|
||||
TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig());
|
||||
DefaultObjectMapper objectMapper = new DefaultObjectMapper();
|
||||
CachingQueryRunner runner = new CachingQueryRunner(
|
||||
segmentIdentifier,
|
||||
segmentDescriptor,
|
||||
objectMapper,
|
||||
cache,
|
||||
toolchest,
|
||||
new QueryRunner()
|
||||
{
|
||||
@Override
|
||||
public Sequence run(Query query)
|
||||
{
|
||||
return resultSeq;
|
||||
}
|
||||
},
|
||||
new CacheConfig()
|
||||
|
||||
);
|
||||
|
||||
TopNQuery query = builder.build();
|
||||
CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> cacheStrategy = toolchest.getCacheStrategy(query);
|
||||
Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey(
|
||||
segmentIdentifier,
|
||||
segmentDescriptor,
|
||||
cacheStrategy.computeCacheKey(query)
|
||||
);
|
||||
|
||||
|
||||
Sequence res = runner.run(query);
|
||||
// base sequence is not closed yet
|
||||
Assert.assertFalse("sequence must not be closed", closable.isClosed());
|
||||
Assert.assertNull("cache must be empty", cache.get(cacheKey));
|
||||
|
||||
ArrayList results = Sequences.toList(res, new ArrayList());
|
||||
Assert.assertTrue(closable.isClosed());
|
||||
Assert.assertEquals(expectedRes, results);
|
||||
|
||||
Iterable<Result<TopNResultValue>> expectedCacheRes = makeTopNResults(true, objects);
|
||||
|
||||
byte[] cacheValue = cache.get(cacheKey);
|
||||
Assert.assertNotNull(cacheValue);
|
||||
|
||||
Function<Object, Result<TopNResultValue>> fn = cacheStrategy.pullFromCache();
|
||||
List<Result<TopNResultValue>> cacheResults = Lists.newArrayList(
|
||||
Iterators.transform(
|
||||
objectMapper.readValues(
|
||||
objectMapper.getFactory().createParser(cacheValue),
|
||||
cacheStrategy.getCacheObjectClazz()
|
||||
),
|
||||
fn
|
||||
)
|
||||
);
|
||||
Assert.assertEquals(expectedCacheRes, cacheResults);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUseCache() throws Exception
|
||||
{
|
||||
DefaultObjectMapper objectMapper = new DefaultObjectMapper();
|
||||
Iterable<Result<TopNResultValue>> expectedResults = makeTopNResults(true, objects);
|
||||
String segmentIdentifier = "segment";
|
||||
SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0);
|
||||
TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig());
|
||||
|
||||
final TopNQueryBuilder builder = new TopNQueryBuilder()
|
||||
.dataSource("ds")
|
||||
.dimension("top_dim")
|
||||
.metric("imps")
|
||||
.threshold(3)
|
||||
.intervals("2011-01-05/2011-01-10")
|
||||
.aggregators(AGGS)
|
||||
.granularity(AllGranularity.ALL);
|
||||
|
||||
final TopNQuery query = builder.build();
|
||||
|
||||
CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> cacheStrategy = toolchest.getCacheStrategy(query);
|
||||
Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey(
|
||||
segmentIdentifier,
|
||||
segmentDescriptor,
|
||||
cacheStrategy.computeCacheKey(query)
|
||||
);
|
||||
|
||||
Cache cache = MapCache.create(1024 * 1024);
|
||||
CacheUtil.populate(
|
||||
cache,
|
||||
objectMapper,
|
||||
cacheKey,
|
||||
Iterables.transform(expectedResults, cacheStrategy.prepareForCache())
|
||||
);
|
||||
|
||||
CachingQueryRunner runner = new CachingQueryRunner(
|
||||
segmentIdentifier,
|
||||
segmentDescriptor,
|
||||
objectMapper,
|
||||
cache,
|
||||
toolchest,
|
||||
// return an empty sequence since results should get pulled from cache
|
||||
new QueryRunner()
|
||||
{
|
||||
@Override
|
||||
public Sequence run(Query query)
|
||||
{
|
||||
return Sequences.empty();
|
||||
}
|
||||
},
|
||||
new CacheConfig()
|
||||
|
||||
);
|
||||
|
||||
List<Object> results = Sequences.toList(runner.run(query), new ArrayList());
|
||||
Assert.assertEquals(expectedResults, results);
|
||||
}
|
||||
|
||||
private Iterable<Result<TopNResultValue>> makeTopNResults
|
||||
(boolean cachedResults, Object... objects)
|
||||
{
|
||||
List<Result<TopNResultValue>> retVal = Lists.newArrayList();
|
||||
int index = 0;
|
||||
while (index < objects.length) {
|
||||
DateTime timestamp = (DateTime) objects[index++];
|
||||
|
||||
List<Map<String, Object>> values = Lists.newArrayList();
|
||||
while (index < objects.length && !(objects[index] instanceof DateTime)) {
|
||||
if (objects.length - index < 3) {
|
||||
throw new ISE(
|
||||
"expect 3 values for each entry in the top list, had %d values left.", objects.length - index
|
||||
);
|
||||
}
|
||||
final double imps = ((Number) objects[index + 2]).doubleValue();
|
||||
final double rows = ((Number) objects[index + 1]).doubleValue();
|
||||
|
||||
if (cachedResults) {
|
||||
values.add(
|
||||
ImmutableMap.of(
|
||||
"top_dim", objects[index],
|
||||
"rows", rows,
|
||||
"imps", imps,
|
||||
"impers", imps
|
||||
)
|
||||
);
|
||||
} else {
|
||||
values.add(
|
||||
ImmutableMap.of(
|
||||
"top_dim", objects[index],
|
||||
"rows", rows,
|
||||
"imps", imps,
|
||||
"impers", imps,
|
||||
"avg_imps_per_row", imps / rows
|
||||
)
|
||||
);
|
||||
}
|
||||
index += 3;
|
||||
}
|
||||
|
||||
retVal.add(new Result<>(timestamp, new TopNResultValue(values)));
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private static class AssertingClosable implements Closeable
|
||||
{
|
||||
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
Assert.assertFalse(closed.get());
|
||||
Assert.assertTrue(closed.compareAndSet(false, true));
|
||||
}
|
||||
|
||||
public boolean isClosed()
|
||||
{
|
||||
return closed.get();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue