let broker populate cache lazily

This commit is contained in:
Xavier Léauté 2014-04-17 14:27:42 -07:00
parent e81479db44
commit 6a52112034
1 changed files with 32 additions and 13 deletions

View File

@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.Pair; import com.metamx.common.Pair;
@ -332,26 +333,30 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec)), clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec)),
new Function<Object, Sequence<T>>() new Function<Object, Sequence<T>>()
{ {
private final Function<T, Object> prepareForCache = strategy.prepareForCache(); private final Function<T, Object> cacheFn = strategy.prepareForCache();
@Override @Override
public Sequence<T> apply(Object input) public Sequence<T> apply(Object input)
{ {
Result<Object> result = (Result<Object>) input; Result<Object> result = (Result<Object>) input;
final BySegmentResultValueClass<T> value = (BySegmentResultValueClass<T>) result.getValue(); final BySegmentResultValueClass<T> value = (BySegmentResultValueClass<T>) result.getValue();
String segmentIdentifier = value.getSegmentId();
final Iterable<T> segmentResults = value.getResults();
CachePopulator cachePopulator = cachePopulatorMap.get( final List<Object> cacheData = Lists.newLinkedList();
String.format("%s_%s", segmentIdentifier, value.getInterval())
);
if (cachePopulator != null) {
cachePopulator.populate(Iterables.transform(segmentResults, prepareForCache));
}
return Sequences.simple( return Sequences.withEffect(
Iterables.transform( Sequences.map(
segmentResults, Sequences.map(
Sequences.simple(value.getResults()),
new Function<T, T>()
{
@Override
public T apply(T input)
{
cacheData.add(cacheFn.apply(input));
return input;
}
}
),
toolChest.makeMetricManipulatorFn( toolChest.makeMetricManipulatorFn(
rewrittenQuery, rewrittenQuery,
new MetricManipulationFn() new MetricManipulationFn()
@ -363,7 +368,21 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
} }
} }
) )
) ),
new Runnable()
{
@Override
public void run()
{
CachePopulator cachePopulator = cachePopulatorMap.get(
String.format("%s_%s", value.getSegmentId(), value.getInterval())
);
if (cachePopulator != null) {
cachePopulator.populate(cacheData);
}
}
},
MoreExecutors.sameThreadExecutor()
); );
} }
} }