From e79e651d78ffcf31dbe2b89ad323aeec5d798c61 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Mon, 4 Apr 2011 21:10:11 -0700 Subject: [PATCH] reworked futures test classes --- .../FuturesComposePerformanceTest.java | 254 ++++++++++++++++++ .../concurrent/FutureIterablesTest.java | 198 ++++++++++++-- .../FuturesComposePerformanceTest.java | 215 --------------- .../concurrent/FuturesTestingUtils.java | 162 ----------- 4 files changed, 434 insertions(+), 395 deletions(-) create mode 100644 core/src/test/java/com/google/common/util/concurrent/FuturesComposePerformanceTest.java delete mode 100644 core/src/test/java/org/jclouds/concurrent/FuturesComposePerformanceTest.java delete mode 100644 core/src/test/java/org/jclouds/concurrent/FuturesTestingUtils.java diff --git a/core/src/test/java/com/google/common/util/concurrent/FuturesComposePerformanceTest.java b/core/src/test/java/com/google/common/util/concurrent/FuturesComposePerformanceTest.java new file mode 100644 index 0000000000..ebccfda170 --- /dev/null +++ b/core/src/test/java/com/google/common/util/concurrent/FuturesComposePerformanceTest.java @@ -0,0 +1,254 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package com.google.common.util.concurrent; + +import static com.google.common.base.Throwables.propagate; +import static com.google.common.collect.Maps.newHashMap; +import static java.util.concurrent.Executors.newCachedThreadPool; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.testng.annotations.Test; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +/** + * In google appengine, we can get a future without using an executorservice, using its async http + * fetch command. However, we still may need to do some conversions, or add listeners. In + * googleappengine, we cannot employ a *real* executorservice, but we can employ a same thread + * executor. This test identifies efficiencies that can be made by strengthening guava's handling of + * same thread execution. + * + *

+ * + * We simulate an i/o future by running a callable that simply sleeps. How this is created isn't + * important. + * + *

    + *
  1. {@code IO_DURATION} is the time that the source future spends doing work
  2. + *
  3. {@code LISTENER_DURATION} is the time that the attached listener or function
  4. + *
+ * + * The execution time of a composed task within a composite should not be more than {@code + * IO_DURATION} + {@code LISTENER_DURATION} + overhead when a threadpool is used. This is because + * the listener should be invoked as soon as the result is available. + *

+ * The execution time of a composed task within a composite should not be more than {@code + * IO_DURATION} + {@code LISTENER_DURATION} * {@code COUNT} + overhead when caller thread is used + * for handling the listeners. + *

+ * This test shows that Futures.compose eagerly issues a get() on the source future. code iterating + * over futures and assigning listeners will take the same amount of time as calling get() on each + * one, if using a within thread executor. This exposes an inefficiency which can make some use + * cases in google appengine impossible to achieve within the cutoff limits. + * + * @author Adrian Cole + */ +@Test(groups = "performance", sequential = true, testName = "FuturesComposePerformanceTest") +public class FuturesComposePerformanceTest { + private static final int FUDGE = 5; + private static final int COUNT = 100; + private static final int IO_DURATION = 50; + private static final int LISTENER_DURATION = 100; + + ExecutorService ioFunctionExecutor = newCachedThreadPool(); + + /** + * When we use threadpools for both the chain and invoking listener, user experience is + * consistent. + */ + public void whenCachedThreadPoolIsUsedForChainAndListenerMaxDurationIsSumOfCallableAndListener() + throws InterruptedException, ExecutionException { + long expectedMax = IO_DURATION + LISTENER_DURATION; + long expectedMin = IO_DURATION + LISTENER_DURATION; + long expectedOverhead = COUNT * 4 + FUDGE; + + ExecutorService userthreads = newCachedThreadPool(); + try { + ExecutorService chainExecutor = userthreads; + ExecutorService listenerExecutor = userthreads; + + checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor); + } finally { + userthreads.shutdownNow(); + } + } + + /** + * When we use threadpools for the chain, but same thread for invoking listener, user experience + * is still consistent. + */ + public void whenCachedThreadPoolIsUsedForChainButSameThreadForListenerMaxDurationIsSumOfCallableAndListener() + throws InterruptedException, ExecutionException { + long expectedMax = IO_DURATION + LISTENER_DURATION; + long expectedMin = IO_DURATION + LISTENER_DURATION; + long expectedOverhead = COUNT + FUDGE; + + ExecutorService userthreads = newCachedThreadPool(); + try { + ExecutorService chainExecutor = userthreads; + ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor(); + + checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor); + } finally { + userthreads.shutdownNow(); + } + } + + /** + * When using same thread for the chain, the futures are being called (get()) eagerly, resulting + * in the max duration being the sum of all i/o plus the cost of executing the listeners. In this + * case, listeners are executed in a different thread pool. + * + */ + public void whenSameThreadIsUsedForChainButCachedThreadPoolForListenerMaxDurationIsSumOfAllIOAndOneListener() + throws InterruptedException, ExecutionException { + long expectedMax = (IO_DURATION * COUNT) + LISTENER_DURATION; + long expectedMin = IO_DURATION + LISTENER_DURATION; + long expectedOverhead = COUNT + FUDGE; + + ExecutorService userthreads = newCachedThreadPool(); + try { + ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor(); + ExecutorService listenerExecutor = userthreads; + + checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor); + } finally { + userthreads.shutdownNow(); + } + } + + /** + * This case can be optimized for sure. The side effect of the eager get() is that all i/o must + * complete before *any* listeners are run. In this case, if you are inside google appengine and + * using same thread executors, worst experience is sum of all io duration plus the sum of all + * listener duration. An efficient implementation would call get() on the i/o future lazily. Such + * an impl would have a max duration of I/O + Listener * count. + */ + public void whenSameThreadIsUsedForChainAndListenerMaxDurationIsSumOfAllIOAndAllListeners() + throws InterruptedException, ExecutionException { + + long expectedMax = (IO_DURATION * COUNT) + (LISTENER_DURATION * COUNT); + long expectedMin = IO_DURATION + LISTENER_DURATION; + long expectedOverhead = COUNT + FUDGE; + + ExecutorService userthreads = newCachedThreadPool(); + try { + ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor(); + ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor(); + + checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor); + } finally { + userthreads.shutdownNow(); + } + } + + private void checkThresholdsUsingFuturesCompose(long expectedMin, long expectedMax, long expectedOverhead, + ExecutorService chainExecutor, final ExecutorService listenerExecutor) { + long start = System.currentTimeMillis(); + Map> responses = newHashMap(); + for (int i = 0; i < COUNT; i++) + responses.put(i + "", Futures.compose(Futures.makeListenable(simultateIO(), chainExecutor), + new Function() { + + @Override + public Long apply(Long from) { + try { + Thread.sleep(LISTENER_DURATION); + } catch (InterruptedException e) { + propagate(e); + } + return System.currentTimeMillis(); + } + + }, listenerExecutor)); + checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses); + } + + private Future simultateIO() { + return ioFunctionExecutor.submit(new Callable() { + + @Override + public Long call() throws Exception { + Thread.sleep(IO_DURATION); + return System.currentTimeMillis(); + } + + }); + } + + private static long getMaxIn(Map> responses) { + Iterable collection = Iterables.transform(responses.values(), new Function, Long>() { + + @Override + public Long apply(Future from) { + try { + return from.get(); + } catch (InterruptedException e) { + } catch (ExecutionException e) { + } + return null; + } + + }); + long time = Collections.max(Sets.newHashSet(collection)); + return time; + } + + private static long getMinIn(Map> responses) { + Iterable collection = Iterables.transform(responses.values(), new Function, Long>() { + + @Override + public Long apply(Future from) { + try { + return from.get(); + } catch (InterruptedException e) { + } catch (ExecutionException e) { + } + return null; + } + + }); + long time = Collections.min(Sets.newHashSet(collection)); + return time; + } + + private static void checkTimeThresholds(long expectedMin, long expectedMax, long expectedOverhead, long start, + Map> responses) { + long time = getMaxIn(responses) - start; + assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d", + expectedMax, time); + + time = getMinIn(responses) - start; + assert time >= expectedMin && time < expectedMin + expectedOverhead : String.format("expectedMin %d, min %d", + expectedMin, time); + + time = getMaxIn(responses) - start; + assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d", + expectedMax, time); + } +} diff --git a/core/src/test/java/org/jclouds/concurrent/FutureIterablesTest.java b/core/src/test/java/org/jclouds/concurrent/FutureIterablesTest.java index 5e98999f55..14b2e33b04 100644 --- a/core/src/test/java/org/jclouds/concurrent/FutureIterablesTest.java +++ b/core/src/test/java/org/jclouds/concurrent/FutureIterablesTest.java @@ -19,16 +19,15 @@ package org.jclouds.concurrent; +import static com.google.common.base.Throwables.propagate; +import static com.google.common.collect.Maps.newHashMap; import static java.util.concurrent.Executors.newCachedThreadPool; import static org.jclouds.concurrent.FutureIterables.awaitCompletion; -import static org.jclouds.concurrent.FuturesTestingUtils.CALLABLE_DURATION; -import static org.jclouds.concurrent.FuturesTestingUtils.COUNT; -import static org.jclouds.concurrent.FuturesTestingUtils.FUDGE; -import static org.jclouds.concurrent.FuturesTestingUtils.checkTimeThresholds; -import static org.jclouds.concurrent.FuturesTestingUtils.runCallables; import static org.testng.Assert.assertEquals; +import java.util.Collections; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -36,43 +35,206 @@ import java.util.concurrent.Future; import org.jclouds.logging.Logger; import org.testng.annotations.Test; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + /** * Tests behavior of FutureIterables * * @author Adrian Cole */ -@Test(enabled = false, groups = "performance", sequential = true) +@Test(groups = "performance", sequential = true, testName = "FutureIterablesTest") public class FutureIterablesTest { - @Test(enabled = false) + ExecutorService ioFunctionExecutor = newCachedThreadPool(); + public void testMakeListenableDoesntSerializeFutures() throws InterruptedException, ExecutionException { - long expectedMax = CALLABLE_DURATION; - long expectedMin = CALLABLE_DURATION; + long expectedMax = IO_DURATION; + long expectedMin = IO_DURATION; long expectedOverhead = COUNT + FUDGE; - ExecutorService callableExecutor = newCachedThreadPool(); ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor(); long start = System.currentTimeMillis(); - Map> responses = runCallables(callableExecutor, chainExecutor); + Map> responses = runCallables(chainExecutor); checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses); } - @Test(enabled = false) public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures() throws InterruptedException, - ExecutionException { - long expectedMax = CALLABLE_DURATION; - long expectedMin = CALLABLE_DURATION; + ExecutionException { + long expectedMax = IO_DURATION; + long expectedMin = IO_DURATION; long expectedOverhead = COUNT + FUDGE; - ExecutorService callableExecutor = newCachedThreadPool(); ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor(); long start = System.currentTimeMillis(); - Map> responses = runCallables(callableExecutor, chainExecutor); + Map> responses = runCallables(chainExecutor); Map exceptions = awaitCompletion(responses, MoreExecutors.sameThreadExecutor(), null, - Logger.CONSOLE, "test same thread"); + Logger.CONSOLE, "test same thread"); assertEquals(exceptions.size(), 0); checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses); } + public void whenCachedThreadPoolIsUsedForChainAndListenerMaxDurationIsSumOfCallableAndListener() throws InterruptedException, ExecutionException { + long expectedMax = IO_DURATION + LISTENER_DURATION; + long expectedMin = IO_DURATION + LISTENER_DURATION; + long expectedOverhead = COUNT * 4 + FUDGE; + + ExecutorService userthreads = newCachedThreadPool(); + try { + ExecutorService chainExecutor = userthreads; + ExecutorService listenerExecutor = userthreads; + + checkThresholdsUsingCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor); + } finally { + userthreads.shutdownNow(); + } + } + + public void whenCachedThreadPoolIsUsedForChainButSameThreadForListenerMaxDurationIsSumOfCallableAndListener() throws InterruptedException, ExecutionException { + long expectedMax = IO_DURATION + LISTENER_DURATION; + long expectedMin = IO_DURATION + LISTENER_DURATION; + long expectedOverhead = COUNT + FUDGE; + + ExecutorService userthreads = newCachedThreadPool(); + try { + ExecutorService chainExecutor = userthreads; + ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor(); + + checkThresholdsUsingCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor); + } finally { + userthreads.shutdownNow(); + } + } + + public void whenSameThreadIsUsedForChainButCachedThreadPoolForListenerMaxDurationIsIOAndSumOfAllListeners() throws InterruptedException, ExecutionException { + long expectedMax = IO_DURATION + (LISTENER_DURATION * COUNT); + long expectedMin = IO_DURATION + LISTENER_DURATION; + long expectedOverhead = COUNT + FUDGE; + + ExecutorService userthreads = newCachedThreadPool(); + try { + ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor(); + ExecutorService listenerExecutor = userthreads; + + checkThresholdsUsingCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor); + } finally { + userthreads.shutdownNow(); + } + } + + public void whenSameThreadIsUsedForChainAndListenerMaxDurationIsIOAndSumOfAllListeners() throws InterruptedException, ExecutionException { + + long expectedMax = IO_DURATION + (LISTENER_DURATION * COUNT); + long expectedMin = IO_DURATION + LISTENER_DURATION; + long expectedOverhead = COUNT + FUDGE; + + ExecutorService userthreads = newCachedThreadPool(); + try { + ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor(); + ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor(); + + checkThresholdsUsingCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor); + } finally { + userthreads.shutdownNow(); + } + } + + public static final int FUDGE = 5; + public static final int COUNT = 100; + public static final int IO_DURATION = 50; + public static final int LISTENER_DURATION = 100; + + private void checkThresholdsUsingCompose(long expectedMin, long expectedMax, long expectedOverhead, + ExecutorService chainExecutor, final ExecutorService listenerExecutor) { + long start = System.currentTimeMillis(); + Map> responses = newHashMap(); + for (int i = 0; i < COUNT; i++) + responses.put(i + "", org.jclouds.concurrent.Futures.compose(org.jclouds.concurrent.Futures.makeListenable( + simultateIO(), chainExecutor), new Function() { + + @Override + public Long apply(Long from) { + try { + Thread.sleep(LISTENER_DURATION); + } catch (InterruptedException e) { + propagate(e); + } + return System.currentTimeMillis(); + } + + }, listenerExecutor)); + checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses); + } + + private Map> runCallables(ExecutorService chainExecutor) { + Map> responses = newHashMap(); + for (int i = 0; i < COUNT; i++) + responses.put(i + "", org.jclouds.concurrent.Futures.makeListenable(simultateIO(), chainExecutor)); + return responses; + } + + private Future simultateIO() { + return ioFunctionExecutor.submit(new Callable() { + + @Override + public Long call() throws Exception { + Thread.sleep(IO_DURATION); + return System.currentTimeMillis(); + } + + }); + } + + public static long getMaxIn(Map> responses) { + Iterable collection = Iterables.transform(responses.values(), new Function, Long>() { + + @Override + public Long apply(Future from) { + try { + return from.get(); + } catch (InterruptedException e) { + } catch (ExecutionException e) { + } + return null; + } + + }); + long time = Collections.max(Sets.newHashSet(collection)); + return time; + } + + public static long getMinIn(Map> responses) { + Iterable collection = Iterables.transform(responses.values(), new Function, Long>() { + + @Override + public Long apply(Future from) { + try { + return from.get(); + } catch (InterruptedException e) { + } catch (ExecutionException e) { + } + return null; + } + + }); + long time = Collections.min(Sets.newHashSet(collection)); + return time; + } + + private static void checkTimeThresholds(long expectedMin, long expectedMax, long expectedOverhead, long start, + Map> responses) { + long time = getMaxIn(responses) - start; + assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d", + expectedMax, time); + + time = getMinIn(responses) - start; + assert time >= expectedMin && time < expectedMin + expectedOverhead : String.format("expectedMin %d, min %d", + expectedMin, time); + + time = getMaxIn(responses) - start; + assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d", + expectedMax, time); + } } diff --git a/core/src/test/java/org/jclouds/concurrent/FuturesComposePerformanceTest.java b/core/src/test/java/org/jclouds/concurrent/FuturesComposePerformanceTest.java deleted file mode 100644 index 4b2fd25b85..0000000000 --- a/core/src/test/java/org/jclouds/concurrent/FuturesComposePerformanceTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/** - * - * Copyright (C) 2010 Cloud Conscious, LLC. - * - * ==================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ==================================================================== - */ - -package org.jclouds.concurrent; - -import static java.util.concurrent.Executors.newCachedThreadPool; -import static org.jclouds.concurrent.FuturesTestingUtils.CALLABLE_DURATION; -import static org.jclouds.concurrent.FuturesTestingUtils.COUNT; -import static org.jclouds.concurrent.FuturesTestingUtils.FUDGE; -import static org.jclouds.concurrent.FuturesTestingUtils.LISTENER_DURATION; -import static org.jclouds.concurrent.FuturesTestingUtils.checkThresholdsUsingConcurrentUtilsCompose; -import static org.jclouds.concurrent.FuturesTestingUtils.checkThresholdsUsingFuturesCompose; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; - -import org.testng.annotations.Test; - -/** - * - * - *

- * All of these tests simulate a future by invoking callables in a separate executor. The point of - * this test is to see what happens when we chain futures together. - * - *

    - *
  1. {@code CALLABLE_DURATION} is the time that the source future spends doing work
  2. - *
  3. {@code LISTENER_DURATION} is the time that the attached listener or function
  4. - *
- * - * The execution time of a composed task within a composite should not be more than {@code - * CALLABLE_DURATION} + {@code LISTENER_DURATION} + overhead when a threadpool is used. This is - * because the listener should be invoked as soon as the result is available. - *

- * The execution time of a composed task within a composite should not be more than {@code - * CALLABLE_DURATION} + {@code LISTENER_DURATION} * {@code COUNT} + overhead when caller thread is - * used for handling the listeners. - *

- * ConcurrentUtils overcomes a shortcoming found in Google Guava r06, where Futures.compose eagerly - * issues a get() on the source future. This has the effect of serializing the futures as you - * iterate. It overcomes this by tagging the ExecutorService we associate with sameThread execution - * and lazy convert values accordingly. - * - * @author Adrian Cole - */ -@Test(enabled = false, groups = "performance", sequential = true) -public class FuturesComposePerformanceTest { - ExecutorService callableExecutor = newCachedThreadPool(); - - /** - * When Futures.compose is - */ - @Test(enabled = false) - public void testFuturesCompose1() throws InterruptedException, ExecutionException { - long expectedMax = CALLABLE_DURATION + LISTENER_DURATION; - long expectedMin = CALLABLE_DURATION + LISTENER_DURATION; - long expectedOverhead = COUNT * 4 + FUDGE; - - ExecutorService userthreads = newCachedThreadPool(); - try { - ExecutorService chainExecutor = userthreads; - ExecutorService listenerExecutor = userthreads; - - checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor, - chainExecutor, listenerExecutor); - } finally { - userthreads.shutdownNow(); - } - } - - @Test(enabled = false) - public void testFuturesCompose2() throws InterruptedException, ExecutionException { - long expectedMax = CALLABLE_DURATION + LISTENER_DURATION; - long expectedMin = CALLABLE_DURATION + LISTENER_DURATION; - long expectedOverhead = COUNT + FUDGE; - - ExecutorService userthreads = newCachedThreadPool(); - try { - ExecutorService chainExecutor = userthreads; - ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor(); - - checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor, - chainExecutor, listenerExecutor); - } finally { - userthreads.shutdownNow(); - } - } - - @Test(enabled = false) - public void testFuturesCompose3() throws InterruptedException, ExecutionException { - long expectedMax = (CALLABLE_DURATION * COUNT) + LISTENER_DURATION; - long expectedMin = CALLABLE_DURATION + LISTENER_DURATION; - long expectedOverhead = COUNT + FUDGE; - - ExecutorService userthreads = newCachedThreadPool(); - try { - ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor(); - ExecutorService listenerExecutor = userthreads; - - checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor, - chainExecutor, listenerExecutor); - } finally { - userthreads.shutdownNow(); - } - } - - @Test(enabled = false) - public void testFuturesCompose4() throws InterruptedException, ExecutionException { - - long expectedMax = (CALLABLE_DURATION * COUNT) + (LISTENER_DURATION * COUNT); - long expectedMin = CALLABLE_DURATION + LISTENER_DURATION; - long expectedOverhead = COUNT + FUDGE; - - ExecutorService userthreads = newCachedThreadPool(); - try { - ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor(); - ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor(); - - checkThresholdsUsingFuturesCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor, - chainExecutor, listenerExecutor); - } finally { - userthreads.shutdownNow(); - } - } - - @Test(enabled = false) - public void testConcurrentUtilsCompose1() throws InterruptedException, ExecutionException { - long expectedMax = CALLABLE_DURATION + LISTENER_DURATION; - long expectedMin = CALLABLE_DURATION + LISTENER_DURATION; - long expectedOverhead = COUNT * 4 + FUDGE; - - ExecutorService userthreads = newCachedThreadPool(); - try { - ExecutorService chainExecutor = userthreads; - ExecutorService listenerExecutor = userthreads; - - checkThresholdsUsingConcurrentUtilsCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor, - chainExecutor, listenerExecutor); - } finally { - userthreads.shutdownNow(); - } - } - - @Test(enabled = false) - public void testConcurrentUtilsCompose2() throws InterruptedException, ExecutionException { - long expectedMax = CALLABLE_DURATION + LISTENER_DURATION; - long expectedMin = CALLABLE_DURATION + LISTENER_DURATION; - long expectedOverhead = COUNT + FUDGE; - - ExecutorService userthreads = newCachedThreadPool(); - try { - ExecutorService chainExecutor = userthreads; - ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor(); - - checkThresholdsUsingConcurrentUtilsCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor, - chainExecutor, listenerExecutor); - } finally { - userthreads.shutdownNow(); - } - } - - @Test(enabled = false) - public void testConcurrentUtilsCompose3() throws InterruptedException, ExecutionException { - long expectedMax = CALLABLE_DURATION + (LISTENER_DURATION * COUNT); - long expectedMin = CALLABLE_DURATION + LISTENER_DURATION; - long expectedOverhead = COUNT + FUDGE; - - ExecutorService userthreads = newCachedThreadPool(); - try { - ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor(); - ExecutorService listenerExecutor = userthreads; - - checkThresholdsUsingConcurrentUtilsCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor, - chainExecutor, listenerExecutor); - } finally { - userthreads.shutdownNow(); - } - } - - @Test(enabled = false) - public void testConcurrentUtilsCompose4() throws InterruptedException, ExecutionException { - - long expectedMax = CALLABLE_DURATION + (LISTENER_DURATION * COUNT); - long expectedMin = CALLABLE_DURATION; - long expectedOverhead = COUNT + FUDGE; - - ExecutorService userthreads = newCachedThreadPool(); - try { - ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor(); - ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor(); - - checkThresholdsUsingConcurrentUtilsCompose(expectedMin, expectedMax, expectedOverhead, callableExecutor, - chainExecutor, listenerExecutor); - } finally { - userthreads.shutdownNow(); - } - } - -} diff --git a/core/src/test/java/org/jclouds/concurrent/FuturesTestingUtils.java b/core/src/test/java/org/jclouds/concurrent/FuturesTestingUtils.java deleted file mode 100644 index 2ca98a4f5a..0000000000 --- a/core/src/test/java/org/jclouds/concurrent/FuturesTestingUtils.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * - * Copyright (C) 2010 Cloud Conscious, LLC. - * - * ==================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ==================================================================== - */ - -package org.jclouds.concurrent; - -import static com.google.common.base.Throwables.propagate; -import static com.google.common.collect.Maps.newHashMap; - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; - -/** - * Tests behavior of ConcurrentUtils - * - * @author Adrian Cole - */ -public class FuturesTestingUtils { - public static final int FUDGE = 5; - public static final int COUNT = 100; - public static final int CALLABLE_DURATION = 50; - public static final int LISTENER_DURATION = 100; - - public static void checkThresholdsUsingFuturesCompose(long expectedMin, long expectedMax, long expectedOverhead, - ExecutorService callableExecutor, ExecutorService chainExecutor, final ExecutorService listenerExecutor) { - long start = System.currentTimeMillis(); - Map> responses = newHashMap(); - for (int i = 0; i < COUNT; i++) - responses.put(i + "", Futures.compose(createFuture(callableExecutor, chainExecutor), - new Function() { - - @Override - public Long apply(Long from) { - try { - Thread.sleep(LISTENER_DURATION); - } catch (InterruptedException e) { - propagate(e); - } - return System.currentTimeMillis(); - } - - }, listenerExecutor)); - checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses); - } - - public static void checkThresholdsUsingConcurrentUtilsCompose(long expectedMin, long expectedMax, - long expectedOverhead, ExecutorService callableExecutor, ExecutorService chainExecutor, - final ExecutorService listenerExecutor) { - long start = System.currentTimeMillis(); - Map> responses = newHashMap(); - for (int i = 0; i < COUNT; i++) - responses.put(i + "", org.jclouds.concurrent.Futures.compose(createFuture(callableExecutor, chainExecutor), new Function() { - - @Override - public Long apply(Long from) { - try { - Thread.sleep(LISTENER_DURATION); - } catch (InterruptedException e) { - propagate(e); - } - return System.currentTimeMillis(); - } - - }, listenerExecutor)); - checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses); - } - - public static Map> runCallables(ExecutorService callableExecutor, ExecutorService chainExecutor) { - Map> responses = newHashMap(); - for (int i = 0; i < COUNT; i++) - responses.put(i + "", createFuture(callableExecutor, chainExecutor)); - return responses; - } - - private static ListenableFuture createFuture(ExecutorService callableExecutor, ExecutorService chainExecutor) { - return org.jclouds.concurrent.Futures.makeListenable(callableExecutor.submit(new Callable() { - - @Override - public Long call() throws Exception { - Thread.sleep(CALLABLE_DURATION); - return System.currentTimeMillis(); - } - - }), chainExecutor); - } - - public static long getMaxIn(Map> responses) { - Iterable collection = Iterables.transform(responses.values(), new Function, Long>() { - - @Override - public Long apply(Future from) { - try { - return from.get(); - } catch (InterruptedException e) { - } catch (ExecutionException e) { - } - return null; - } - - }); - long time = Collections.max(Sets.newHashSet(collection)); - return time; - } - - public static long getMinIn(Map> responses) { - Iterable collection = Iterables.transform(responses.values(), new Function, Long>() { - - @Override - public Long apply(Future from) { - try { - return from.get(); - } catch (InterruptedException e) { - } catch (ExecutionException e) { - } - return null; - } - - }); - long time = Collections.min(Sets.newHashSet(collection)); - return time; - } - - public static void checkTimeThresholds(long expectedMin, long expectedMax, long expectedOverhead, long start, - Map> responses) { - long time = getMaxIn(responses) - start; - assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d", - expectedMax, time); - - time = getMinIn(responses) - start; - assert time >= expectedMin && time < expectedMin + expectedOverhead : String.format("expectedMin %d, min %d", - expectedMin, time); - - time = getMaxIn(responses) - start; - assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d", - expectedMax, time); - } -}