got rid of custom MoreExecutors.sameThreadExecutor

This commit is contained in:
Adrian Cole 2013-01-12 15:46:06 -08:00
parent f1819fe8b9
commit 547f574eea
14 changed files with 83 additions and 575 deletions

View File

@ -18,6 +18,7 @@
*/
package org.jclouds.s3.blobstore.functions;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static org.testng.Assert.assertEquals;
import org.jclouds.blobstore.domain.MutableStorageMetadata;
@ -25,7 +26,6 @@ import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.blobstore.domain.StorageType;
import org.jclouds.blobstore.domain.internal.MutableStorageMetadataImpl;
import org.jclouds.blobstore.domain.internal.PageSetImpl;
import org.jclouds.concurrent.MoreExecutors;
import org.jclouds.domain.Location;
import org.jclouds.domain.LocationBuilder;
import org.jclouds.domain.LocationScope;
@ -52,9 +52,8 @@ public class BucketsToStorageMetadataTest {
.description("us-east-1").parent(provider).build();
public void test() {
BucketsToStorageMetadata fn = new BucketsToStorageMetadata(
MoreExecutors.sameThreadExecutor(),
sameThreadExecutor(),
new BucketToResourceMetadata(Functions.forMap(ImmutableMap.<String, Location> of("mycontainer", region))));
MutableStorageMetadata expected = new MutableStorageMetadataImpl();

View File

@ -18,12 +18,13 @@
*/
package org.jclouds.sqs.features;
import static org.jclouds.concurrent.MoreExecutors.sameThreadExecutor;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static org.jclouds.providers.AnonymousProviderMetadata.forClientMappedToAsyncClientOnEndpoint;
import static org.jclouds.sqs.reference.SQSParameters.ACTION;
import static org.testng.Assert.assertEquals;
import java.net.URI;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

View File

@ -18,22 +18,24 @@
*/
package org.jclouds.compute.callables;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static com.google.inject.name.Names.named;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
import static org.jclouds.compute.config.ComputeServiceProperties.TIMEOUT_SCRIPT_COMPLETE;
import static org.jclouds.scriptbuilder.domain.Statements.exec;
import static org.testng.Assert.assertEquals;
import org.jclouds.Constants;
import org.jclouds.compute.config.ComputeServiceProperties;
import org.jclouds.compute.domain.ExecResponse;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeMetadataBuilder;
import org.jclouds.compute.domain.NodeMetadata.Status;
import org.jclouds.compute.domain.NodeMetadataBuilder;
import org.jclouds.compute.options.RunScriptOptions;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.concurrent.MoreExecutors;
import org.jclouds.concurrent.config.ExecutorServiceModule;
import org.jclouds.domain.LoginCredentials;
import org.jclouds.scriptbuilder.InitScript;
@ -47,40 +49,29 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
/**
* @author Adrian Cole
*/
@Test(groups = "unit", singleThreaded = true, testName = "RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilCompleteTest")
public class RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilCompleteTest {
Injector injector = Guice.createInjector(new ExecutorServiceModule(sameThreadExecutor(), sameThreadExecutor()),
new AbstractModule() {
protected void configure() {
bindConstant().annotatedWith(named(PROPERTY_USER_THREADS)).to(1);
bindConstant().annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).to(1);
bindConstant().annotatedWith(named(TIMEOUT_SCRIPT_COMPLETE)).to(100);
install(new FactoryModuleBuilder().build(BlockUntilInitScriptStatusIsZeroThenReturnOutput.Factory.class));
}
});
EventBus eventBus = new EventBus();
BlockUntilInitScriptStatusIsZeroThenReturnOutput.Factory statusFactory = Guice.createInjector(
new ExecutorServiceModule(MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()),
new AbstractModule() {
@Override
protected void configure() {
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_USER_THREADS)).to(1);
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_IO_WORKER_THREADS)).to(1);
bindConstant().annotatedWith(Names.named(ComputeServiceProperties.TIMEOUT_SCRIPT_COMPLETE))
.to(100);
install(new FactoryModuleBuilder()
.build(BlockUntilInitScriptStatusIsZeroThenReturnOutput.Factory.class));
}
}).getInstance(BlockUntilInitScriptStatusIsZeroThenReturnOutput.Factory.class);
BlockUntilInitScriptStatusIsZeroThenReturnOutput.Factory statusFactory = injector
.getInstance(BlockUntilInitScriptStatusIsZeroThenReturnOutput.Factory.class);
// fail faster than normal
Timeouts timeouts = Guice.createInjector(new AbstractModule() {
@Override
protected void configure() {
bindConstant().annotatedWith(Names.named(ComputeServiceProperties.TIMEOUT_SCRIPT_COMPLETE)).to(100l);
}
}).getInstance(Timeouts.class);
Timeouts timeouts = injector.getInstance(Timeouts.class);
@Test(expectedExceptions = IllegalStateException.class)
public void testWithoutInitThrowsIllegalStateException() {

View File

@ -30,6 +30,7 @@ import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.find;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static org.jclouds.Constants.PROPERTY_API;
import static org.jclouds.Constants.PROPERTY_API_VERSION;
import static org.jclouds.Constants.PROPERTY_BUILD_VERSION;
@ -48,7 +49,6 @@ import java.util.Set;
import org.jclouds.apis.ApiMetadata;
import org.jclouds.apis.Apis;
import org.jclouds.concurrent.MoreExecutors;
import org.jclouds.concurrent.SingleThreaded;
import org.jclouds.concurrent.config.ConfiguresExecutorService;
import org.jclouds.concurrent.config.ExecutorServiceModule;
@ -61,6 +61,7 @@ import org.jclouds.events.config.EventBusModule;
import org.jclouds.http.config.ConfiguresHttpCommandExecutorService;
import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule;
import org.jclouds.javax.annotation.Nullable;
import org.jclouds.lifecycle.Closer;
import org.jclouds.lifecycle.config.LifeCycleModule;
import org.jclouds.logging.config.LoggingModule;
import org.jclouds.logging.jdk.config.JDKLoggingModule;
@ -71,6 +72,7 @@ import org.jclouds.providers.internal.UpdateProviderMetadataFromProperties;
import org.jclouds.rest.ConfiguresCredentialStore;
import org.jclouds.rest.ConfiguresRestClient;
import org.jclouds.rest.RestApiMetadata;
import org.jclouds.rest.RestContext;
import org.jclouds.rest.config.CredentialStoreModule;
import org.jclouds.rest.config.RestClientModule;
import org.jclouds.rest.config.RestModule;
@ -84,10 +86,10 @@ import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableMultimap.Builder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.ImmutableMultimap.Builder;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.ExecutionList;
import com.google.inject.Guice;
@ -490,8 +492,7 @@ public class ContextBuilder {
return input.getClass().isAnnotationPresent(SingleThreaded.class);
}
})) {
modules.add(new ExecutorServiceModule(MoreExecutors.sameThreadExecutor(), MoreExecutors
.sameThreadExecutor()));
modules.add(new ExecutorServiceModule(sameThreadExecutor(), sameThreadExecutor()));
} else {
modules.add(new ExecutorServiceModule());
}

View File

@ -1,214 +0,0 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2007 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jclouds.concurrent;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableList;
/**
* functions related to or replacing those in
* {@link com.google.common.util.concurrent.MoreExecutors}
*
* @author Adrian Cole
*/
@Beta
public class MoreExecutors {
/**
* Taken from @link com.google.common.util.concurrent.MoreExecutors} as it was hidden and
* therefore incapable of instanceof checks.
*
*
* Creates an executor service that runs each task in the thread that invokes {@code
* execute/submit}, as in {@link CallerRunsPolicy} This applies both to individually submitted
* tasks and to collections of tasks submitted via {@code invokeAll} or {@code invokeAny}. In the
* latter case, tasks will run serially on the calling thread. Tasks are run to completion before
* a {@code Future} is returned to the caller (unless the executor has been shutdown).
*
* <p>
* Although all tasks are immediately executed in the thread that submitted the task, this
* {@code ExecutorService} imposes a small locking overhead on each task submission in order to
* implement shutdown and termination behavior.
*
* <p>
* The implementation deviates from the {@code ExecutorService} specification with regards to the
* {@code shutdownNow} method. First, "best-effort" with regards to canceling running tasks is
* implemented as "no-effort". No interrupts or other attempts are made to stop threads executing
* tasks. Second, the returned list will always be empty, as any submitted task is considered to
* have started execution. This applies also to tasks given to {@code invokeAll} or {@code
* invokeAny} which are pending serial execution, even the subset of the tasks that have not yet
* started execution. It is unclear from the {@code ExecutorService} specification if these
* should be included, and it's much easier to implement the interpretation that they not be.
* Finally, a call to {@code shutdown} or {@code shutdownNow} may result in concurrent calls to
* {@code invokeAll/invokeAny} throwing RejectedExecutionException, although a subset of the
* tasks may already have been executed.
*/
public static ExecutorService sameThreadExecutor() {
return new SameThreadExecutorService();
}
// See sameThreadExecutor javadoc for behavioral notes.
@SingleThreaded
public static class SameThreadExecutorService extends AbstractExecutorService {
/**
* Lock used whenever accessing the state variables (runningTasks, shutdown,
* terminationCondition) of the executor
*/
private final Lock lock = new ReentrantLock();
/** Signaled after the executor is shutdown and running tasks are done */
private final Condition termination = lock.newCondition();
private SameThreadExecutorService() {
}
/*
* Conceptually, these two variables describe the executor being in one of three states: -
* Active: shutdown == false - Shutdown: runningTasks > 0 and shutdown == true - Terminated:
* runningTasks == 0 and shutdown == true
*/
private int runningTasks = 0;
private boolean shutdown = false;
@Override
public void execute(Runnable command) {
startTask();
try {
command.run();
} finally {
endTask();
}
}
@Override
public boolean isShutdown() {
lock.lock();
try {
return shutdown;
} finally {
lock.unlock();
}
}
@Override
public void shutdown() {
lock.lock();
try {
shutdown = true;
} finally {
lock.unlock();
}
}
// See sameThreadExecutor javadoc for unusual behavior of this method.
@Override
public List<Runnable> shutdownNow() {
shutdown();
return ImmutableList.of();
}
@Override
public boolean isTerminated() {
lock.lock();
try {
return shutdown && runningTasks == 0;
} finally {
lock.unlock();
}
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
lock.lock();
try {
for (;;) {
if (isTerminated()) {
return true;
} else if (nanos <= 0) {
return false;
} else {
nanos = termination.awaitNanos(nanos);
}
}
} finally {
lock.unlock();
}
}
/**
* Checks if the executor has been shut down and increments the running task count.
*
* @throws RejectedExecutionException
* if the executor has been previously shutdown
*/
private void startTask() {
lock.lock();
try {
if (isShutdown()) {
throw new RejectedExecutionException("Executor already shutdown");
}
runningTasks++;
} finally {
lock.unlock();
}
}
/**
* Decrements the running task count.
*/
private void endTask() {
lock.lock();
try {
runningTasks--;
if (isTerminated()) {
termination.signalAll();
}
} finally {
lock.unlock();
}
}
}
}

View File

@ -34,8 +34,6 @@ import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.concurrent.MoreExecutors;
import org.jclouds.concurrent.SingleThreaded;
import org.jclouds.lifecycle.Closer;
import org.jclouds.logging.Logger;
@ -83,8 +81,8 @@ public class ExecutorServiceModule extends AbstractModule {
@Inject
public ExecutorServiceModule(@Named(PROPERTY_USER_THREADS) ExecutorService userThreads,
@Named(PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) {
this.userExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(userThreads));
this.ioExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(ioThreads));
this.userExecutorFromConstructor = addToStringOnSubmit(userThreads);
this.ioExecutorFromConstructor = addToStringOnSubmit(ioThreads);
}
private ExecutorService addToStringOnSubmit(ExecutorService executor) {
@ -94,18 +92,6 @@ public class ExecutorServiceModule extends AbstractModule {
return executor;
}
private ExecutorService checkNotGuavaSameThreadExecutor(ExecutorService executor) {
// we detect behavior based on the class
if (executor != null && !(executor.getClass().isAnnotationPresent(SingleThreaded.class))
&& executor.getClass().getSimpleName().indexOf("SameThread") != -1) {
Logger.CONSOLE.warn(
"please switch from %s to %s or annotate your same threaded executor with @SingleThreaded", executor
.getClass().getName(), MoreExecutors.SameThreadExecutorService.class.getName());
return MoreExecutors.sameThreadExecutor();
}
return executor;
}
public ExecutorServiceModule() {
this(null, null);
}

View File

@ -18,13 +18,16 @@
*/
package org.jclouds.lifecycle.config;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Sets.newHashSet;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static com.google.inject.matcher.Matchers.any;
import static java.util.Arrays.asList;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@ -34,14 +37,12 @@ import javax.annotation.PreDestroy;
import javax.inject.Named;
import org.jclouds.Constants;
import org.jclouds.concurrent.MoreExecutors;
import org.jclouds.lifecycle.Closer;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ExecutionList;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Stage;
import com.google.inject.TypeLiteral;
import com.google.inject.spi.InjectionListener;
import com.google.inject.spi.TypeEncounter;
@ -103,10 +104,10 @@ public class LifeCycleModule extends AbstractModule {
protected void bindPostInjectionInvoke(final Closer closer, final ExecutionList list) {
bindListener(any(), new TypeListener() {
public <I> void hear(TypeLiteral<I> injectableType, TypeEncounter<I> encounter) {
Set<Method> methods = Sets.newHashSet();
Set<Method> methods = newHashSet();
Class<? super I> type = injectableType.getRawType();
while (type != null) {
methods.addAll(Arrays.asList(type.getDeclaredMethods()));
methods.addAll(asList(type.getDeclaredMethods()));
type = type.getSuperclass();
}
for (final Method method : methods) {
@ -150,12 +151,12 @@ public class LifeCycleModule extends AbstractModule {
method.invoke(injectee);
} catch (InvocationTargetException ie) {
Throwable e = ie.getTargetException();
throw Throwables.propagate(e);
throw propagate(e);
} catch (IllegalAccessException e) {
throw Throwables.propagate(e);
throw propagate(e);
}
}
}, MoreExecutors.sameThreadExecutor());
}, sameThreadExecutor());
}
});
}

View File

@ -1,250 +0,0 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.concurrent;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Maps.newHashMap;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
import static org.testng.Assert.assertEquals;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.jclouds.logging.Logger;
import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
/**
* Tests behavior of FutureIterables
*
* @author Adrian Cole
*/
@Test(groups = "performance", enabled = false, sequential = true, testName = "FutureIterablesPerformanceTest")
public class FutureIterablesPerformanceTest {
ExecutorService ioFunctionExecutor = newCachedThreadPool();
@Test(enabled = false)
public void testMakeListenableDoesntSerializeFutures() throws InterruptedException, ExecutionException {
long expectedMax = IO_DURATION;
long expectedMin = IO_DURATION;
long expectedOverhead = COUNT + FUDGE;
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
long start = System.currentTimeMillis();
Map<String, Future<Long>> responses = runCallables(chainExecutor);
checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses);
}
@Test(enabled = false)
public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures()
throws InterruptedException, ExecutionException, TimeoutException {
long expectedMax = IO_DURATION;
long expectedMin = IO_DURATION;
long expectedOverhead = COUNT + FUDGE;
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
long start = System.currentTimeMillis();
Map<String, Future<Long>> responses = runCallables(chainExecutor);
Map<String, Exception> exceptions = awaitCompletion(responses, MoreExecutors.sameThreadExecutor(), null,
Logger.CONSOLE, "test same thread");
assertEquals(exceptions.size(), 0);
checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses);
}
@Test(enabled = false)
public void whenCachedThreadPoolIsUsedForChainAndListenerMaxDurationIsSumOfCallableAndListener()
throws InterruptedException, ExecutionException {
long expectedMax = IO_DURATION + LISTENER_DURATION;
long expectedMin = IO_DURATION + LISTENER_DURATION;
long expectedOverhead = COUNT * 4 + FUDGE;
ExecutorService userthreads = newCachedThreadPool();
try {
ExecutorService chainExecutor = userthreads;
ExecutorService listenerExecutor = userthreads;
checkThresholdsUsingCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
} finally {
userthreads.shutdownNow();
}
}
@Test(enabled = false)
public void whenCachedThreadPoolIsUsedForChainButSameThreadForListenerMaxDurationIsSumOfCallableAndListener()
throws InterruptedException, ExecutionException {
long expectedMax = IO_DURATION + LISTENER_DURATION;
long expectedMin = IO_DURATION + LISTENER_DURATION;
long expectedOverhead = COUNT + FUDGE;
ExecutorService userthreads = newCachedThreadPool();
try {
ExecutorService chainExecutor = userthreads;
ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor();
checkThresholdsUsingCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
} finally {
userthreads.shutdownNow();
}
}
@Test(enabled = false)
public void whenSameThreadIsUsedForChainButCachedThreadPoolForListenerMaxDurationIsIOAndSumOfAllListeners()
throws InterruptedException, ExecutionException {
long expectedMax = IO_DURATION + (LISTENER_DURATION * COUNT);
long expectedMin = IO_DURATION + LISTENER_DURATION;
long expectedOverhead = COUNT + FUDGE;
ExecutorService userthreads = newCachedThreadPool();
try {
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
ExecutorService listenerExecutor = userthreads;
checkThresholdsUsingCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
} finally {
userthreads.shutdownNow();
}
}
@Test(enabled = false)
public void whenSameThreadIsUsedForChainAndListenerMaxDurationIsIOAndSumOfAllListeners()
throws InterruptedException, ExecutionException {
long expectedMax = IO_DURATION + (LISTENER_DURATION * COUNT);
long expectedMin = IO_DURATION + LISTENER_DURATION;
long expectedOverhead = COUNT + FUDGE;
ExecutorService userthreads = newCachedThreadPool();
try {
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor();
checkThresholdsUsingCompose(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
} finally {
userthreads.shutdownNow();
}
}
public static final int FUDGE = 5;
public static final int COUNT = 100;
public static final int IO_DURATION = 50;
public static final int LISTENER_DURATION = 100;
private void checkThresholdsUsingCompose(long expectedMin, long expectedMax, long expectedOverhead,
ExecutorService chainExecutor, final ExecutorService listenerExecutor) {
long start = System.currentTimeMillis();
Map<String, Future<Long>> responses = newHashMap();
for (int i = 0; i < COUNT; i++)
responses.put(i + "", org.jclouds.concurrent.Futures.compose(org.jclouds.concurrent.Futures.makeListenable(
simultateIO(), chainExecutor), new Function<Long, Long>() {
@Override
public Long apply(Long from) {
try {
Thread.sleep(LISTENER_DURATION);
} catch (InterruptedException e) {
propagate(e);
}
return System.currentTimeMillis();
}
}, listenerExecutor));
checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses);
}
private Map<String, Future<Long>> runCallables(ExecutorService chainExecutor) {
Map<String, Future<Long>> responses = newHashMap();
for (int i = 0; i < COUNT; i++)
responses.put(i + "", org.jclouds.concurrent.Futures.makeListenable(simultateIO(), chainExecutor));
return responses;
}
private Future<Long> simultateIO() {
return ioFunctionExecutor.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
Thread.sleep(IO_DURATION);
return System.currentTimeMillis();
}
});
}
public static long getMaxIn(Map<String, Future<Long>> responses) {
Iterable<Long> collection = Iterables.transform(responses.values(), new Function<Future<Long>, Long>() {
@Override
public Long apply(Future<Long> from) {
try {
return from.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
return null;
}
});
long time = Collections.max(Sets.newHashSet(collection));
return time;
}
public static long getMinIn(Map<String, Future<Long>> responses) {
Iterable<Long> collection = Iterables.transform(responses.values(), new Function<Future<Long>, Long>() {
@Override
public Long apply(Future<Long> from) {
try {
return from.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
return null;
}
});
long time = Collections.min(Sets.newHashSet(collection));
return time;
}
private static void checkTimeThresholds(long expectedMin, long expectedMax, long expectedOverhead, long start,
Map<String, Future<Long>> responses) {
long time = getMaxIn(responses) - start;
assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d",
expectedMax, time);
time = getMinIn(responses) - start;
assert time >= expectedMin && time < expectedMin + expectedOverhead : String.format("expectedMin %d, min %d",
expectedMin, time);
time = getMaxIn(responses) - start;
assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d",
expectedMax, time);
}
}

View File

@ -18,13 +18,16 @@
*/
package org.jclouds.concurrent;
import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static org.jclouds.concurrent.FutureIterables.transformParallel;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@ -35,7 +38,6 @@ import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
/**
* Tests behavior of FutureIterables
@ -54,17 +56,17 @@ public class FutureIterablesTest {
@Override
public Future<String> apply(String input) {
counter.incrementAndGet();
return com.google.common.util.concurrent.Futures.immediateFailedFuture(new AuthorizationException());
return immediateFailedFuture(new AuthorizationException());
}
}, MoreExecutors.sameThreadExecutor(), null, Logger.CONSOLE, "");
}, sameThreadExecutor(), null, Logger.CONSOLE, "");
fail("Expected AuthorizationException");
} catch (AuthorizationException e) {
assertEquals(counter.get(), 2);
}
}
public void testNormalExceptionPropagatesAsTransformParallelExceptionAndTries5XPerElement() {
final AtomicInteger counter = new AtomicInteger();
@ -74,10 +76,10 @@ public class FutureIterablesTest {
@Override
public Future<String> apply(String input) {
counter.incrementAndGet();
return com.google.common.util.concurrent.Futures.immediateFailedFuture(new RuntimeException());
return immediateFailedFuture(new RuntimeException());
}
}, MoreExecutors.sameThreadExecutor(), null, Logger.CONSOLE, "");
}, sameThreadExecutor(), null, Logger.CONSOLE, "");
fail("Expected TransformParallelException");
} catch (TransformParallelException e) {
assertEquals(e.getFromToException().size(), 2);
@ -89,21 +91,21 @@ public class FutureIterablesTest {
public void testAwaitCompletionTimeout() throws Exception {
final long timeoutMs = 1000;
ExecutorService executorService = Executors.newSingleThreadExecutor();
Map<Void, Future<?>> responses = Maps.newHashMap();
Map<Void, Future<?>> responses = newHashMap();
try {
responses.put(null, executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2 * timeoutMs);
} catch (InterruptedException ie) {
// triggered during shutdown
}
@Override
public void run() {
try {
Thread.sleep(2 * timeoutMs);
} catch (InterruptedException ie) {
// triggered during shutdown
}
}
}));
Map<Void, Exception> errors = FutureIterables.awaitCompletion(
responses, executorService, timeoutMs, Logger.CONSOLE,
/*prefix=*/ "");
Map<Void, Exception> errors = FutureIterables.awaitCompletion(responses, executorService, timeoutMs,
Logger.CONSOLE,
/* prefix= */"");
if (!errors.isEmpty()) {
throw errors.values().iterator().next();
}

View File

@ -18,6 +18,7 @@
*/
package org.jclouds.concurrent;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
@ -41,7 +42,7 @@ import com.google.common.util.concurrent.ExecutionList;
*/
@Test(groups = "unit")
public class FuturesTest {
ExecutorService executorService = MoreExecutors.sameThreadExecutor();
ExecutorService executorService = sameThreadExecutor();
@Test
public void testCallGetAndRunRunnableRunsListOnRuntimeException() throws InterruptedException, ExecutionException {

View File

@ -19,6 +19,11 @@
package org.jclouds.rest.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static com.google.inject.name.Names.named;
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
import static org.jclouds.Constants.PROPERTY_MAX_RETRIES;
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
import static org.testng.Assert.assertEquals;
import java.io.IOException;
@ -43,10 +48,8 @@ import org.custommonkey.xmlunit.DifferenceConstants;
import org.custommonkey.xmlunit.DifferenceListener;
import org.custommonkey.xmlunit.NodeDetail;
import org.custommonkey.xmlunit.XMLUnit;
import org.jclouds.Constants;
import org.jclouds.ContextBuilder;
import org.jclouds.apis.ApiMetadata;
import org.jclouds.concurrent.MoreExecutors;
import org.jclouds.concurrent.SingleThreaded;
import org.jclouds.concurrent.config.ConfiguresExecutorService;
import org.jclouds.date.internal.DateServiceDateCodecFactory;
@ -88,7 +91,6 @@ import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
/**
*
@ -193,7 +195,7 @@ public abstract class BaseRestApiExpectTest<S> {
@Inject
public ExpectHttpCommandExecutorService(Function<HttpRequest, HttpResponse> fn, HttpUtils utils,
ContentMetadataCodec contentMetadataCodec,
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioExecutor,
@Named(PROPERTY_IO_WORKER_THREADS) ExecutorService ioExecutor,
IOExceptionRetryHandler ioRetryHandler, DelegatingRetryHandler retryHandler,
DelegatingErrorHandler errorHandler, HttpWire wire) {
super(utils, contentMetadataCodec, ioExecutor, retryHandler, ioRetryHandler, errorHandler, wire);
@ -228,10 +230,8 @@ public abstract class BaseRestApiExpectTest<S> {
@Override
public void configure() {
bind(ExecutorService.class).annotatedWith(Names.named(Constants.PROPERTY_USER_THREADS)).toInstance(
MoreExecutors.sameThreadExecutor());
bind(ExecutorService.class).annotatedWith(Names.named(Constants.PROPERTY_IO_WORKER_THREADS)).toInstance(
MoreExecutors.sameThreadExecutor());
bind(ExecutorService.class).annotatedWith(named(PROPERTY_USER_THREADS)).toInstance(sameThreadExecutor());
bind(ExecutorService.class).annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).toInstance(sameThreadExecutor());
bind(new TypeLiteral<Function<HttpRequest, HttpResponse>>() {
}).toInstance(fn);
bind(HttpCommandExecutorService.class).to(ExpectHttpCommandExecutorService.class);
@ -564,7 +564,7 @@ public abstract class BaseRestApiExpectTest<S> {
*/
protected Properties setupProperties() {
Properties props = new Properties();
props.put(Constants.PROPERTY_MAX_RETRIES, 1);
props.put(PROPERTY_MAX_RETRIES, 1);
return props;
}
}

View File

@ -19,8 +19,12 @@
package org.jclouds.rest.internal;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.hash.Hashing.md5;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static com.google.inject.name.Names.named;
import static org.easymock.EasyMock.createMock;
import static org.eclipse.jetty.http.HttpHeaders.TRANSFER_ENCODING;
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
import static org.jclouds.io.ByteSources.asByteSource;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
@ -30,8 +34,6 @@ import java.util.Date;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import org.jclouds.Constants;
import org.jclouds.concurrent.MoreExecutors;
import org.jclouds.concurrent.config.ConfiguresExecutorService;
import org.jclouds.fallbacks.MapHttp4xxCodesToExceptions;
import org.jclouds.http.HttpCommandExecutorService;
@ -40,7 +42,6 @@ import org.jclouds.http.config.ConfiguresHttpCommandExecutorService;
import org.jclouds.http.functions.ParseSax;
import org.jclouds.io.MutableContentMetadata;
import org.jclouds.javax.annotation.Nullable;
import com.google.common.reflect.Invokable;
import org.jclouds.rest.annotations.Fallback;
import org.jclouds.rest.annotations.XMLResponseParser;
import org.jclouds.util.Strings2;
@ -49,12 +50,12 @@ import org.testng.annotations.Test;
import com.google.common.collect.Multimap;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
import com.google.common.reflect.Invokable;
import com.google.common.reflect.TypeParameter;
import com.google.common.reflect.TypeToken;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.name.Names;
/**
*
@ -81,10 +82,8 @@ public abstract class BaseRestApiTest {
@Override
protected void configure() {
bind(ExecutorService.class).annotatedWith(Names.named(Constants.PROPERTY_USER_THREADS)).toInstance(
MoreExecutors.sameThreadExecutor());
bind(ExecutorService.class).annotatedWith(Names.named(Constants.PROPERTY_IO_WORKER_THREADS)).toInstance(
MoreExecutors.sameThreadExecutor());
bind(ExecutorService.class).annotatedWith(named(PROPERTY_USER_THREADS)).toInstance(sameThreadExecutor());
bind(ExecutorService.class).annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).toInstance(sameThreadExecutor());
bind(HttpCommandExecutorService.class).toInstance(mock);
}
}

View File

@ -18,7 +18,8 @@
*/
package org.jclouds.gae.config;
import org.jclouds.concurrent.MoreExecutors;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import org.jclouds.concurrent.SingleThreaded;
import org.jclouds.concurrent.config.ConfiguresExecutorService;
import org.jclouds.concurrent.config.ExecutorServiceModule;
@ -49,7 +50,7 @@ public class GoogleAppEngineConfigurationModule extends AbstractModule {
private final Module executorServiceModule;
public GoogleAppEngineConfigurationModule() {
this(new ExecutorServiceModule(MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()));
this(new ExecutorServiceModule(sameThreadExecutor(), sameThreadExecutor()));
}
/**

View File

@ -20,15 +20,11 @@
package org.jclouds.virtualbox;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.compute.options.RunScriptOptions.Builder.runAsRoot;
import static org.jclouds.scriptbuilder.domain.Statements.findPid;
import static org.jclouds.scriptbuilder.domain.Statements.kill;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static org.jclouds.virtualbox.config.VirtualBoxConstants.VIRTUALBOX_IMAGE_PREFIX;
import static org.jclouds.virtualbox.config.VirtualBoxConstants.VIRTUALBOX_INSTALLATION_KEY_SEQUENCE;
import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@ -36,18 +32,14 @@ import javax.inject.Inject;
import javax.inject.Named;
import org.jclouds.compute.callables.RunScriptOnNode.Factory;
import org.jclouds.compute.domain.ExecResponse;
import org.jclouds.compute.domain.Image;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.Template;
import org.jclouds.compute.internal.BaseComputeServiceContextLiveTest;
import org.jclouds.compute.strategy.PrioritizeCredentialsFromTemplate;
import org.jclouds.concurrent.MoreExecutors;
import org.jclouds.concurrent.config.ExecutorServiceModule;
import org.jclouds.config.ValueOfConfigurationKeyOrNull;
import org.jclouds.rest.annotations.BuildVersion;
import org.jclouds.scriptbuilder.domain.Statement;
import org.jclouds.scriptbuilder.domain.StatementList;
import org.jclouds.sshj.config.SshjSshClientModule;
import org.jclouds.virtualbox.config.VirtualBoxConstants;
import org.jclouds.virtualbox.domain.HardDisk;
@ -81,10 +73,8 @@ import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.base.Supplier;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.Injector;
import com.google.inject.Key;
@ -138,7 +128,7 @@ public class BaseVirtualBoxClientLiveTest extends BaseComputeServiceContextLiveT
@Inject
protected LoadingCache<Image, Master> mastersCache;
private final ExecutorService singleThreadExec = MoreExecutors.sameThreadExecutor();
private final ExecutorService singleThreadExec = sameThreadExecutor();
private String masterName;
@Override