diff --git a/core/src/test/java/org/jclouds/concurrent/ConcurrentUtilsTest.java b/core/src/test/java/org/jclouds/concurrent/ConcurrentUtilsTest.java new file mode 100644 index 0000000000..417bc04186 --- /dev/null +++ b/core/src/test/java/org/jclouds/concurrent/ConcurrentUtilsTest.java @@ -0,0 +1,78 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ +package org.jclouds.concurrent; + +import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; +import static java.util.concurrent.Executors.newCachedThreadPool; +import static org.jclouds.concurrent.ConcurrentUtils.awaitCompletion; +import static org.jclouds.concurrent.FuturesTestingUtils.CALLABLE_DURATION; +import static org.jclouds.concurrent.FuturesTestingUtils.COUNT; +import static org.jclouds.concurrent.FuturesTestingUtils.FUDGE; +import static org.jclouds.concurrent.FuturesTestingUtils.checkTimeThresholds; +import static org.jclouds.concurrent.FuturesTestingUtils.runCallables; +import static org.testng.Assert.assertEquals; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; + +import org.jclouds.logging.Logger; +import org.testng.annotations.Test; + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * Tests behavior of ConcurrentUtils + * + * @author Adrian Cole + */ +@Test(groups = "unit", sequential = true, testName = "concurrent.ConcurrentUtilsTest") +public class ConcurrentUtilsTest { + + public void testMakeListenableDoesntSerializeFutures() throws InterruptedException, ExecutionException { + long expectedMax = CALLABLE_DURATION; + long expectedMin = CALLABLE_DURATION; + long expectedOverhead = COUNT + FUDGE; + + ExecutorService callableExecutor = newCachedThreadPool(); + ExecutorService chainExecutor = sameThreadExecutor(); + + long start = System.currentTimeMillis(); + Map> responses = runCallables(callableExecutor, chainExecutor); + checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses); + } + + public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures() throws InterruptedException, + ExecutionException { + long expectedMax = CALLABLE_DURATION; + long expectedMin = CALLABLE_DURATION; + long expectedOverhead = COUNT + FUDGE; + + ExecutorService callableExecutor = newCachedThreadPool(); + ExecutorService chainExecutor = sameThreadExecutor(); + + long start = System.currentTimeMillis(); + Map> responses = runCallables(callableExecutor, chainExecutor); + Map exceptions = awaitCompletion(responses, sameThreadExecutor(), null, Logger.CONSOLE, + "test same thread"); + assertEquals(exceptions.size(), 0); + checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses); + } + +} diff --git a/core/src/test/java/org/jclouds/concurrent/FuturesComposePerformanceTest.java b/core/src/test/java/org/jclouds/concurrent/FuturesComposePerformanceTest.java new file mode 100644 index 0000000000..903d1a99ea --- /dev/null +++ b/core/src/test/java/org/jclouds/concurrent/FuturesComposePerformanceTest.java @@ -0,0 +1,88 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ +package org.jclouds.concurrent; + +import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; +import static java.util.concurrent.Executors.newCachedThreadPool; +import static org.jclouds.concurrent.FuturesTestingUtils.CALLABLE_DURATION; +import static org.jclouds.concurrent.FuturesTestingUtils.COUNT; +import static org.jclouds.concurrent.FuturesTestingUtils.FUDGE; +import static org.jclouds.concurrent.FuturesTestingUtils.LISTENER_DURATION; +import static org.jclouds.concurrent.FuturesTestingUtils.checkThresholds; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; + +import org.testng.annotations.Test; + +/** + * Tests behavior of ConcurrentUtils + * + * @author Adrian Cole + */ +@Test(groups = "performance", sequential = true, testName = "concurrent.ConcurrentUtilsTest") +public class FuturesComposePerformanceTest { + ExecutorService callableExecutor = newCachedThreadPool(); + + public void test1() throws InterruptedException, ExecutionException { + long expectedMax = CALLABLE_DURATION + LISTENER_DURATION; + long expectedMin = CALLABLE_DURATION + LISTENER_DURATION; + long expectedOverhead = COUNT * 4 + FUDGE; + + ExecutorService chainExecutor = callableExecutor; + ExecutorService listenerExecutor = callableExecutor; + + checkThresholds(expectedMin, expectedMax, expectedOverhead, callableExecutor, chainExecutor, listenerExecutor); + } + + public void test2() throws InterruptedException, ExecutionException { + long expectedMax = CALLABLE_DURATION + LISTENER_DURATION; + long expectedMin = CALLABLE_DURATION + LISTENER_DURATION; + long expectedOverhead = COUNT + FUDGE; + + ExecutorService chainExecutor = callableExecutor; + ExecutorService listenerExecutor = sameThreadExecutor(); + + checkThresholds(expectedMin, expectedMax, expectedOverhead, callableExecutor, chainExecutor, listenerExecutor); + } + + public void test3() throws InterruptedException, ExecutionException { + long expectedMax = (CALLABLE_DURATION * COUNT) + LISTENER_DURATION; + long expectedMin = CALLABLE_DURATION + LISTENER_DURATION; + long expectedOverhead = COUNT + FUDGE; + + ExecutorService chainExecutor = sameThreadExecutor(); + ExecutorService listenerExecutor = callableExecutor; + + checkThresholds(expectedMin, expectedMax, expectedOverhead, callableExecutor, chainExecutor, listenerExecutor); + } + + public void test4() throws InterruptedException, ExecutionException { + + long expectedMax = (CALLABLE_DURATION * COUNT) + (LISTENER_DURATION * COUNT); + long expectedMin = CALLABLE_DURATION + LISTENER_DURATION; + long expectedOverhead = COUNT + FUDGE; + + ExecutorService chainExecutor = sameThreadExecutor(); + ExecutorService listenerExecutor = sameThreadExecutor(); + + checkThresholds(expectedMin, expectedMax, expectedOverhead, callableExecutor, chainExecutor, listenerExecutor); + } + +} diff --git a/core/src/test/java/org/jclouds/concurrent/FuturesTestingUtils.java b/core/src/test/java/org/jclouds/concurrent/FuturesTestingUtils.java new file mode 100644 index 0000000000..f55f591d62 --- /dev/null +++ b/core/src/test/java/org/jclouds/concurrent/FuturesTestingUtils.java @@ -0,0 +1,141 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ +package org.jclouds.concurrent; + +import static com.google.common.base.Throwables.propagate; +import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.util.concurrent.Futures.compose; +import static org.jclouds.concurrent.ConcurrentUtils.makeListenable; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * Tests behavior of ConcurrentUtils + * + * @author Adrian Cole + */ +public class FuturesTestingUtils { + public static final int FUDGE = 5; + public static final int COUNT = 100; + public static final int CALLABLE_DURATION = 10; + public static final int LISTENER_DURATION = 10; + + public static void checkThresholds(long expectedMin, long expectedMax, long expectedOverhead, + ExecutorService callableExecutor, ExecutorService chainExecutor, final ExecutorService listenerExecutor) { + long start = System.currentTimeMillis(); + Map> responses = newHashMap(); + for (int i = 0; i < COUNT; i++) + responses.put(i + "", compose(createListenableFuture(callableExecutor, chainExecutor), + new Function() { + + @Override + public Long apply(Long from) { + try { + Thread.sleep(LISTENER_DURATION); + } catch (InterruptedException e) { + propagate(e); + } + return System.currentTimeMillis(); + } + + }, listenerExecutor)); + checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses); + } + + public static Map> runCallables(ExecutorService callableExecutor, + ExecutorService chainExecutor) { + Map> responses = newHashMap(); + for (int i = 0; i < COUNT; i++) + responses.put(i + "", createListenableFuture(callableExecutor, chainExecutor)); + return responses; + } + + private static ListenableFuture createListenableFuture(ExecutorService callableExecutor, + ExecutorService chainExecutor) { + return makeListenable(callableExecutor.submit(new Callable() { + + @Override + public Long call() throws Exception { + Thread.sleep(CALLABLE_DURATION); + return System.currentTimeMillis(); + } + + }), chainExecutor); + } + + public static long getMaxIn(Map> responses) { + Iterable collection = Iterables.transform(responses.values(), new Function, Long>() { + + @Override + public Long apply(ListenableFuture from) { + try { + return from.get(); + } catch (InterruptedException e) { + } catch (ExecutionException e) { + } + return null; + } + + }); + long time = Collections.max(Sets.newHashSet(collection)); + return time; + } + + public static long getMinIn(Map> responses) { + Iterable collection = Iterables.transform(responses.values(), new Function, Long>() { + + @Override + public Long apply(ListenableFuture from) { + try { + return from.get(); + } catch (InterruptedException e) { + } catch (ExecutionException e) { + } + return null; + } + + }); + long time = Collections.min(Sets.newHashSet(collection)); + return time; + } + + public static void checkTimeThresholds(long expectedMin, long expectedMax, long expectedOverhead, long start, + Map> responses) { + long time = getMaxIn(responses) - start; + assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expected %d, was %d", + expectedMax, time); + + time = getMinIn(responses) - start; + assert time >= expectedMin && time < expectedMin + expectedOverhead : String.format("expected %d, was %d", + expectedMin, time); + + time = getMaxIn(responses) - start; + assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expected %d, was %d", + expectedMax, time); + } +}