From b334d7973df39b4a3458df59eff863b29a04ae18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 26 Jan 2015 09:43:24 -0800 Subject: [PATCH 1/2] fix background caching not preserving result order --- .../io/druid/client/CachingClusteredClient.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 14da6225f9a..1b94b78edae 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -70,13 +70,13 @@ import org.joda.time.Interval; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -321,7 +321,6 @@ public class CachingClusteredClient implements QueryRunner final Query>> rewrittenQuery = (Query>>) query .withOverriddenContext(contextBuilder.build()); - final Queue> cacheFutures = new ConcurrentLinkedQueue<>(); // Loop through each server, setting up the query and initiating it. // The data gets handled as a Future and parsed in the long Sequence chain in the resultSeqToAdd setter. for (Map.Entry> entry : serverSegments.entrySet()) { @@ -401,7 +400,7 @@ public class CachingClusteredClient implements QueryRunner String.format("%s_%s", value.getSegmentId(), value.getInterval()) ); - final Collection cacheData = new ConcurrentLinkedQueue<>(); + final Queue> cacheFutures = new ConcurrentLinkedQueue<>(); return Sequences.withEffect( Sequences.map( @@ -416,12 +415,12 @@ public class CachingClusteredClient implements QueryRunner // only compute cache data if populating cache cacheFutures.add( backgroundExecutorService.submit( - new Runnable() + new Callable() { @Override - public void run() + public Object call() { - cacheData.add(cacheFn.apply(input)); + return cacheFn.apply(input); } } ) @@ -449,12 +448,11 @@ public class CachingClusteredClient implements QueryRunner new FutureCallback>() { @Override - public void onSuccess(List objects) + public void onSuccess(List cacheData) { cachePopulator.populate(cacheData); // Help out GC by making sure all references are gone cacheFutures.clear(); - cacheData.clear(); } @Override From 59c190732243b60edef87031698a419aaa2bedd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 27 Jan 2015 12:51:35 -0800 Subject: [PATCH 2/2] add test for out-of-order background-caching execution --- .../client/CachingClusteredClientTest.java | 160 +++++++++++++++++- 1 file changed, 156 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index e3b261c6ea4..f6193a62c7d 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -26,6 +26,7 @@ import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -34,7 +35,13 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; +import com.google.common.util.concurrent.ForwardingListeningExecutorService; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.FunctionalIterable; @@ -130,12 +137,15 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executor; /** @@ -200,6 +210,8 @@ public class CachingClusteredClientTest private static final String TOP_DIM = "a_dim"; private final Random random; public CachingClusteredClient client; + private Runnable queryCompletedCallback; + protected VersionedIntervalTimeline timeline; protected TimelineServerView serverView; protected Cache cache; @@ -232,8 +244,7 @@ public class CachingClusteredClientTest timeline = new VersionedIntervalTimeline<>(Ordering.natural()); serverView = EasyMock.createStrictMock(TimelineServerView.class); cache = MapCache.create(100000); - - client = makeClient(); + client = makeClient(MoreExecutors.sameThreadExecutor()); servers = new DruidServer[]{ new DruidServer("test1", "test1", 10, "historical", "bye", 0), @@ -244,6 +255,142 @@ public class CachingClusteredClientTest }; } + @Test + public void testOutOfOrderBackgroundCachePopulation() throws Exception + { + // This test is a bit whacky, but I couldn't find a better way to do it in the current framework. + + // The purpose of this special executor is to randomize execution of tasks on purpose. + // Since we don't know the number of tasks to be executed, a special DrainTask is used + // to trigger the actual execution when we are ready to shuffle the order. + abstract class DrainTask implements Runnable {} + final ForwardingListeningExecutorService randomizingExecutorService = new ForwardingListeningExecutorService() + { + final ConcurrentLinkedDeque> taskQueue = new ConcurrentLinkedDeque(); + final ListeningExecutorService delegate = MoreExecutors.listeningDecorator( + // we need to run everything in the same thread to ensure all callbacks on futures in CachingClusteredClient + // are complete before moving on to the next query run. + MoreExecutors.sameThreadExecutor() + ); + + @Override + protected ListeningExecutorService delegate() + { + return delegate; + } + + private ListenableFuture maybeSubmitTask(Object task, boolean wait) + { + if (wait) { + SettableFuture future = SettableFuture.create(); + taskQueue.addFirst(Pair.of(future, task)); + return future; + } else { + List> tasks = Lists.newArrayList(taskQueue.iterator()); + Collections.shuffle(tasks, new Random(0)); + + for (final Pair pair : tasks) { + ListenableFuture future = pair.rhs instanceof Callable ? + delegate.submit((Callable) pair.rhs) : + delegate.submit((Runnable) pair.rhs); + Futures.addCallback( + future, new FutureCallback() + { + @Override + public void onSuccess(@Nullable Object result) + { + pair.lhs.set(result); + } + + @Override + public void onFailure(Throwable t) + { + pair.lhs.setException(t); + } + } + ); + } + } + return task instanceof Callable ? + delegate.submit((Callable) task) : + delegate.submit((Runnable) task); + } + + @Override + public ListenableFuture submit(Callable task) + { + return maybeSubmitTask(task, true); + } + + @Override + public ListenableFuture submit(Runnable task) + { + if (task instanceof DrainTask) { + return maybeSubmitTask(task, false); + } else { + return maybeSubmitTask(task, true); + } + } + }; + + client = makeClient(randomizingExecutorService); + + // callback to be run every time a query run is complete, to ensure all background + // caching tasks are executed, and cache is populated before we move onto the next query + queryCompletedCallback = new Runnable() + { + @Override + public void run() + { + try { + randomizingExecutorService.submit( + new DrainTask() + { + @Override + public void run() + { + // no-op + } + } + ).get(); + } + catch (Exception e) { + Throwables.propagate(e); + } + } + }; + + final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(CONTEXT); + + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); + + testQueryCaching( + runner, + builder.build(), + new Interval("2011-01-05/2011-01-10"), + makeTimeResults( + new DateTime("2011-01-05"), 85, 102, + new DateTime("2011-01-06"), 412, 521, + new DateTime("2011-01-07"), 122, 21894, + new DateTime("2011-01-08"), 5, 20, + new DateTime("2011-01-09"), 18, 521 + ), + new Interval("2011-01-10/2011-01-13"), + makeTimeResults( + new DateTime("2011-01-10"), 85, 102, + new DateTime("2011-01-11"), 412, 521, + new DateTime("2011-01-12"), 122, 21894 + ) + ); + } + @Test @SuppressWarnings("unchecked") public void testTimeseriesCaching() throws Exception @@ -284,6 +431,8 @@ public class CachingClusteredClientTest new DateTime("2011-01-09T01"), 181, 52 ) ); + + HashMap context = new HashMap(); TestHelper.assertExpectedResults( makeRenamedTimeResults( @@ -1327,6 +1476,9 @@ public class CachingClusteredClientTest context ) ); + if(queryCompletedCallback != null) { + queryCompletedCallback.run(); + } } } }, @@ -1833,7 +1985,7 @@ public class CachingClusteredClientTest EasyMock.reset(mocks); } - protected CachingClusteredClient makeClient() + protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService) { final Supplier groupByQueryConfigSupplier = Suppliers.ofInstance(new GroupByQueryConfig()); @@ -1901,7 +2053,7 @@ public class CachingClusteredClientTest }, cache, jsonMapper, - MoreExecutors.sameThreadExecutor(), + backgroundExecutorService, new CacheConfig() { @Override