Merge pull request #1066 from metamx/fix-background-caching-ordering

fix background caching not preserving result order
This commit is contained in:
Fangjin Yang 2015-01-28 06:09:17 +08:00
commit 2d96b62e6c
2 changed files with 162 additions and 12 deletions

View File

@ -70,13 +70,13 @@ import org.joda.time.Interval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -321,7 +321,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final Query<Result<BySegmentResultValueClass<T>>> rewrittenQuery = (Query<Result<BySegmentResultValueClass<T>>>) query
.withOverriddenContext(contextBuilder.build());
final Queue<ListenableFuture<?>> cacheFutures = new ConcurrentLinkedQueue<>();
// Loop through each server, setting up the query and initiating it.
// The data gets handled as a Future and parsed in the long Sequence chain in the resultSeqToAdd setter.
for (Map.Entry<DruidServer, List<SegmentDescriptor>> entry : serverSegments.entrySet()) {
@ -401,7 +400,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
String.format("%s_%s", value.getSegmentId(), value.getInterval())
);
final Collection<Object> cacheData = new ConcurrentLinkedQueue<>();
final Queue<ListenableFuture<Object>> cacheFutures = new ConcurrentLinkedQueue<>();
return Sequences.<T>withEffect(
Sequences.<T, T>map(
@ -416,12 +415,12 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// only compute cache data if populating cache
cacheFutures.add(
backgroundExecutorService.submit(
new Runnable()
new Callable<Object>()
{
@Override
public void run()
public Object call()
{
cacheData.add(cacheFn.apply(input));
return cacheFn.apply(input);
}
}
)
@ -449,12 +448,11 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
new FutureCallback<List<Object>>()
{
@Override
public void onSuccess(List<Object> objects)
public void onSuccess(List<Object> cacheData)
{
cachePopulator.populate(cacheData);
// Help out GC by making sure all references are gone
cacheFutures.clear();
cacheData.clear();
}
@Override

View File

@ -26,6 +26,7 @@ import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -34,7 +35,13 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.FunctionalIterable;
@ -130,12 +137,15 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
/**
@ -200,6 +210,8 @@ public class CachingClusteredClientTest
private static final String TOP_DIM = "a_dim";
private final Random random;
public CachingClusteredClient client;
private Runnable queryCompletedCallback;
protected VersionedIntervalTimeline<String, ServerSelector> timeline;
protected TimelineServerView serverView;
protected Cache cache;
@ -232,8 +244,7 @@ public class CachingClusteredClientTest
timeline = new VersionedIntervalTimeline<>(Ordering.<String>natural());
serverView = EasyMock.createStrictMock(TimelineServerView.class);
cache = MapCache.create(100000);
client = makeClient();
client = makeClient(MoreExecutors.sameThreadExecutor());
servers = new DruidServer[]{
new DruidServer("test1", "test1", 10, "historical", "bye", 0),
@ -244,6 +255,142 @@ public class CachingClusteredClientTest
};
}
@Test
public void testOutOfOrderBackgroundCachePopulation() throws Exception
{
// This test is a bit whacky, but I couldn't find a better way to do it in the current framework.
// The purpose of this special executor is to randomize execution of tasks on purpose.
// Since we don't know the number of tasks to be executed, a special DrainTask is used
// to trigger the actual execution when we are ready to shuffle the order.
abstract class DrainTask implements Runnable {}
final ForwardingListeningExecutorService randomizingExecutorService = new ForwardingListeningExecutorService()
{
final ConcurrentLinkedDeque<Pair<SettableFuture, Object>> taskQueue = new ConcurrentLinkedDeque();
final ListeningExecutorService delegate = MoreExecutors.listeningDecorator(
// we need to run everything in the same thread to ensure all callbacks on futures in CachingClusteredClient
// are complete before moving on to the next query run.
MoreExecutors.sameThreadExecutor()
);
@Override
protected ListeningExecutorService delegate()
{
return delegate;
}
private <T> ListenableFuture<T> maybeSubmitTask(Object task, boolean wait)
{
if (wait) {
SettableFuture<T> future = SettableFuture.create();
taskQueue.addFirst(Pair.<SettableFuture, Object>of(future, task));
return future;
} else {
List<Pair<SettableFuture, Object>> tasks = Lists.newArrayList(taskQueue.iterator());
Collections.shuffle(tasks, new Random(0));
for (final Pair<SettableFuture, Object> pair : tasks) {
ListenableFuture future = pair.rhs instanceof Callable ?
delegate.submit((Callable) pair.rhs) :
delegate.submit((Runnable) pair.rhs);
Futures.addCallback(
future, new FutureCallback()
{
@Override
public void onSuccess(@Nullable Object result)
{
pair.lhs.set(result);
}
@Override
public void onFailure(Throwable t)
{
pair.lhs.setException(t);
}
}
);
}
}
return task instanceof Callable ?
delegate.submit((Callable) task) :
delegate.submit((Runnable) task);
}
@Override
public <T> ListenableFuture<T> submit(Callable<T> task)
{
return maybeSubmitTask(task, true);
}
@Override
public ListenableFuture<?> submit(Runnable task)
{
if (task instanceof DrainTask) {
return maybeSubmitTask(task, false);
} else {
return maybeSubmitTask(task, true);
}
}
};
client = makeClient(randomizingExecutorService);
// callback to be run every time a query run is complete, to ensure all background
// caching tasks are executed, and cache is populated before we move onto the next query
queryCompletedCallback = new Runnable()
{
@Override
public void run()
{
try {
randomizingExecutorService.submit(
new DrainTask()
{
@Override
public void run()
{
// no-op
}
}
).get();
}
catch (Exception e) {
Throwables.propagate(e);
}
}
};
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
testQueryCaching(
runner,
builder.build(),
new Interval("2011-01-05/2011-01-10"),
makeTimeResults(
new DateTime("2011-01-05"), 85, 102,
new DateTime("2011-01-06"), 412, 521,
new DateTime("2011-01-07"), 122, 21894,
new DateTime("2011-01-08"), 5, 20,
new DateTime("2011-01-09"), 18, 521
),
new Interval("2011-01-10/2011-01-13"),
makeTimeResults(
new DateTime("2011-01-10"), 85, 102,
new DateTime("2011-01-11"), 412, 521,
new DateTime("2011-01-12"), 122, 21894
)
);
}
@Test
@SuppressWarnings("unchecked")
public void testTimeseriesCaching() throws Exception
@ -284,6 +431,8 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09T01"), 181, 52
)
);
HashMap<String,List> context = new HashMap<String, List>();
TestHelper.assertExpectedResults(
makeRenamedTimeResults(
@ -1327,6 +1476,9 @@ public class CachingClusteredClientTest
context
)
);
if(queryCompletedCallback != null) {
queryCompletedCallback.run();
}
}
}
},
@ -1833,7 +1985,7 @@ public class CachingClusteredClientTest
EasyMock.reset(mocks);
}
protected CachingClusteredClient makeClient()
protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService)
{
final Supplier<GroupByQueryConfig> groupByQueryConfigSupplier = Suppliers.ofInstance(new GroupByQueryConfig());
@ -1901,7 +2053,7 @@ public class CachingClusteredClientTest
},
cache,
jsonMapper,
MoreExecutors.sameThreadExecutor(),
backgroundExecutorService,
new CacheConfig()
{
@Override