mirror of https://github.com/apache/druid.git
Revert "Merge pull request #457 from metamx/stream-cache"
This reverts commit5fae4d9abc
, reversing changes made to43a0554179
.
This commit is contained in:
parent
133c624f69
commit
d6eebce998
|
@ -20,8 +20,6 @@
|
||||||
package io.druid.client;
|
package io.druid.client;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.metamx.common.guava.Accumulator;
|
import com.metamx.common.guava.Accumulator;
|
||||||
import com.metamx.common.guava.Sequence;
|
import com.metamx.common.guava.Sequence;
|
||||||
|
@ -35,7 +33,6 @@ import io.druid.query.QueryRunner;
|
||||||
import io.druid.query.QueryToolChest;
|
import io.druid.query.QueryToolChest;
|
||||||
import io.druid.query.SegmentDescriptor;
|
import io.druid.query.SegmentDescriptor;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
||||||
public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
|
public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
|
||||||
|
@ -75,33 +72,26 @@ public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
|
||||||
|
|
||||||
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true"))
|
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true"))
|
||||||
&& strategy != null
|
&& strategy != null
|
||||||
&& cacheConfig.isPopulateCache();
|
&& cacheConfig.isPopulateCache()
|
||||||
|
// historical only populates distributed cache since the cache lookups are done at broker.
|
||||||
final Sequence<T> results = base.run(query);
|
&& !(cache instanceof MapCache);
|
||||||
|
|
||||||
if (populateCache) {
|
if (populateCache) {
|
||||||
final Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
|
Sequence<T> results = base.run(query);
|
||||||
|
Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
|
||||||
segmentIdentifier,
|
segmentIdentifier,
|
||||||
segmentDescriptor,
|
segmentDescriptor,
|
||||||
strategy.computeCacheKey(query)
|
strategy.computeCacheKey(query)
|
||||||
);
|
);
|
||||||
|
ArrayList<T> resultAsList = Sequences.toList(results, new ArrayList<T>());
|
||||||
final Function cacheFn = strategy.prepareForCache();
|
CacheUtil.populate(
|
||||||
return Sequences.map(
|
cache,
|
||||||
results,
|
mapper,
|
||||||
new Function<T, T>()
|
key,
|
||||||
{
|
Lists.transform(resultAsList, strategy.prepareForCache())
|
||||||
@Nullable
|
|
||||||
@Override
|
|
||||||
public T apply(@Nullable T input)
|
|
||||||
{
|
|
||||||
CacheUtil.populate(cache, mapper, key, ImmutableList.of(cacheFn.apply(input)));
|
|
||||||
return input;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
);
|
||||||
|
return Sequences.simple(resultAsList);
|
||||||
} else {
|
} else {
|
||||||
return results;
|
return base.run(query);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -123,7 +123,7 @@ public class CachePopulatingQueryRunnerTest
|
||||||
|
|
||||||
Sequence res = runner.run(builder.build());
|
Sequence res = runner.run(builder.build());
|
||||||
// base sequence is not closed yet
|
// base sequence is not closed yet
|
||||||
Assert.assertFalse("sequence must not be closed", closable.isClosed());
|
Assert.assertTrue(closable.isClosed());
|
||||||
ArrayList results = Sequences.toList(res, new ArrayList());
|
ArrayList results = Sequences.toList(res, new ArrayList());
|
||||||
Assert.assertTrue(closable.isClosed());
|
Assert.assertTrue(closable.isClosed());
|
||||||
Assert.assertEquals(expectedRes, results);
|
Assert.assertEquals(expectedRes, results);
|
||||||
|
|
Loading…
Reference in New Issue