fix background caching not preserving result order

This commit is contained in:
Xavier Léauté 2015-01-26 09:43:24 -08:00
parent 6993d84f02
commit b334d7973d
1 changed files with 6 additions and 8 deletions

View File

@ -70,13 +70,13 @@ import org.joda.time.Interval;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -321,7 +321,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final Query<Result<BySegmentResultValueClass<T>>> rewrittenQuery = (Query<Result<BySegmentResultValueClass<T>>>) query final Query<Result<BySegmentResultValueClass<T>>> rewrittenQuery = (Query<Result<BySegmentResultValueClass<T>>>) query
.withOverriddenContext(contextBuilder.build()); .withOverriddenContext(contextBuilder.build());
final Queue<ListenableFuture<?>> cacheFutures = new ConcurrentLinkedQueue<>();
// Loop through each server, setting up the query and initiating it. // Loop through each server, setting up the query and initiating it.
// The data gets handled as a Future and parsed in the long Sequence chain in the resultSeqToAdd setter. // The data gets handled as a Future and parsed in the long Sequence chain in the resultSeqToAdd setter.
for (Map.Entry<DruidServer, List<SegmentDescriptor>> entry : serverSegments.entrySet()) { for (Map.Entry<DruidServer, List<SegmentDescriptor>> entry : serverSegments.entrySet()) {
@ -401,7 +400,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
String.format("%s_%s", value.getSegmentId(), value.getInterval()) String.format("%s_%s", value.getSegmentId(), value.getInterval())
); );
final Collection<Object> cacheData = new ConcurrentLinkedQueue<>(); final Queue<ListenableFuture<Object>> cacheFutures = new ConcurrentLinkedQueue<>();
return Sequences.<T>withEffect( return Sequences.<T>withEffect(
Sequences.<T, T>map( Sequences.<T, T>map(
@ -416,12 +415,12 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// only compute cache data if populating cache // only compute cache data if populating cache
cacheFutures.add( cacheFutures.add(
backgroundExecutorService.submit( backgroundExecutorService.submit(
new Runnable() new Callable<Object>()
{ {
@Override @Override
public void run() public Object call()
{ {
cacheData.add(cacheFn.apply(input)); return cacheFn.apply(input);
} }
} }
) )
@ -449,12 +448,11 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
new FutureCallback<List<Object>>() new FutureCallback<List<Object>>()
{ {
@Override @Override
public void onSuccess(List<Object> objects) public void onSuccess(List<Object> cacheData)
{ {
cachePopulator.populate(cacheData); cachePopulator.populate(cacheData);
// Help out GC by making sure all references are gone // Help out GC by making sure all references are gone
cacheFutures.clear(); cacheFutures.clear();
cacheData.clear();
} }
@Override @Override