Issue 932:transition to multi-threaded google appengine

This commit is contained in:
Adrian Cole 2012-05-17 00:18:08 -07:00
parent 96e272a91d
commit 7854d85f13
12 changed files with 506 additions and 295 deletions

View File

@ -0,0 +1,130 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.concurrent.config;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class DescribedFuture<T> implements Future<T> {
protected final Future<T> delegate;
private final String description;
private StackTraceElement[] submissionTrace;
public DescribedFuture(Future<T> delegate, String description, StackTraceElement[] submissionTrace) {
this.delegate = delegate;
this.description = description;
this.submissionTrace = submissionTrace;
}
@Override
public boolean cancel(boolean arg0) {
return delegate.cancel(arg0);
}
@Override
public T get() throws InterruptedException, ExecutionException {
try {
return delegate.get();
} catch (ExecutionException e) {
throw ensureCauseHasSubmissionTrace(e);
} catch (InterruptedException e) {
throw ensureCauseHasSubmissionTrace(e);
}
}
@Override
public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
try {
return delegate.get(arg0, arg1);
} catch (ExecutionException e) {
throw ensureCauseHasSubmissionTrace(e);
} catch (InterruptedException e) {
throw ensureCauseHasSubmissionTrace(e);
} catch (TimeoutException e) {
throw ensureCauseHasSubmissionTrace(e);
}
}
/** This method does the work to ensure _if_ a submission stack trace was provided,
* it is included in the exception. most errors are thrown from the frame of the
* Future.get call, with a cause that took place in the executor's thread.
* We extend the stack trace of that cause with the submission stack trace.
* (An alternative would be to put the stack trace as a root cause,
* at the bottom of the stack, or appended to all traces, or inserted
* after the second cause, etc ... but since we can't change the "Caused by:"
* method in Throwable the compromise made here seems best.)
*/
private <ET extends Exception> ET ensureCauseHasSubmissionTrace(ET e) {
if (submissionTrace==null) return e;
if (e.getCause()==null) {
ExecutionException ee = new ExecutionException("task submitted from the following trace", null);
e.initCause(ee);
return e;
}
Throwable cause = e.getCause();
StackTraceElement[] causeTrace = cause.getStackTrace();
boolean causeIncludesSubmissionTrace = submissionTrace.length >= causeTrace.length;
for (int i=0; causeIncludesSubmissionTrace && i<submissionTrace.length; i++) {
if (!causeTrace[causeTrace.length-1-i].equals(submissionTrace[submissionTrace.length-1-i])) {
causeIncludesSubmissionTrace = false;
}
}
if (!causeIncludesSubmissionTrace) {
cause.setStackTrace(merge(causeTrace, submissionTrace));
}
return e;
}
private StackTraceElement[] merge(StackTraceElement[] t1, StackTraceElement[] t2) {
StackTraceElement[] t12 = new StackTraceElement[t1.length + t2.length];
System.arraycopy(t1, 0, t12, 0, t1.length);
System.arraycopy(t2, 0, t12, t1.length, t2.length);
return t12;
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public boolean equals(Object obj) {
return delegate.equals(obj);
}
@Override
public int hashCode() {
return delegate.hashCode();
}
@Override
public String toString() {
return description;
}
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.concurrent.config;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class DescribingExecutorService implements ExecutorService {
protected final ExecutorService delegate;
public DescribingExecutorService(ExecutorService delegate) {
this.delegate = checkNotNull(delegate, "delegate");
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}
@Override
public boolean isShutdown() {
return delegate.isShutdown();
}
@Override
public boolean isTerminated() {
return delegate.isTerminated();
}
@Override
public void shutdown() {
delegate.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return new DescribedFuture<T>(delegate.submit(task), task.toString(), ExecutorServiceModule.getStackTraceHere());
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public Future<?> submit(Runnable task) {
return new DescribedFuture(delegate.submit(task), task.toString(), ExecutorServiceModule.getStackTraceHere());
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return new DescribedFuture<T>(delegate.submit(task, result), task.toString(), ExecutorServiceModule.getStackTraceHere());
}
@Override
public void execute(Runnable arg0) {
delegate.execute(arg0);
}
@Override
public boolean equals(Object obj) {
return delegate.equals(obj);
}
@Override
public int hashCode() {
return delegate.hashCode();
}
@Override
public String toString() {
return delegate.toString();
}
}

View File

@ -18,20 +18,14 @@
*/
package org.jclouds.concurrent.config;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.concurrent.DynamicExecutors.newScalingThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Resource;
import javax.inject.Inject;
@ -95,14 +89,14 @@ public class ExecutorServiceModule extends AbstractModule {
this.ioExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(ioThreads));
}
static ExecutorService addToStringOnSubmit(ExecutorService executor) {
ExecutorService addToStringOnSubmit(ExecutorService executor) {
if (executor != null) {
return new DescribingExecutorService(executor);
}
return executor;
}
static ExecutorService checkNotGuavaSameThreadExecutor(ExecutorService executor) {
ExecutorService checkNotGuavaSameThreadExecutor(ExecutorService executor) {
// we detect behavior based on the class
if (executor != null && !(executor.getClass().isAnnotationPresent(SingleThreaded.class))
&& executor.getClass().getSimpleName().indexOf("SameThread") != -1) {
@ -122,205 +116,6 @@ public class ExecutorServiceModule extends AbstractModule {
protected void configure() {
}
static class DescribingExecutorService implements ExecutorService {
protected final ExecutorService delegate;
public DescribingExecutorService(ExecutorService delegate) {
this.delegate = checkNotNull(delegate, "delegate");
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}
@Override
public boolean isShutdown() {
return delegate.isShutdown();
}
@Override
public boolean isTerminated() {
return delegate.isTerminated();
}
@Override
public void shutdown() {
delegate.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return new DescribedFuture<T>(delegate.submit(task), task.toString(), getStackTraceHere());
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public Future<?> submit(Runnable task) {
return new DescribedFuture(delegate.submit(task), task.toString(), getStackTraceHere());
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return new DescribedFuture<T>(delegate.submit(task, result), task.toString(), getStackTraceHere());
}
@Override
public void execute(Runnable arg0) {
delegate.execute(arg0);
}
@Override
public boolean equals(Object obj) {
return delegate.equals(obj);
}
@Override
public int hashCode() {
return delegate.hashCode();
}
@Override
public String toString() {
return delegate.toString();
}
}
static class DescribedFuture<T> implements Future<T> {
protected final Future<T> delegate;
private final String description;
private StackTraceElement[] submissionTrace;
public DescribedFuture(Future<T> delegate, String description, StackTraceElement[] submissionTrace) {
this.delegate = delegate;
this.description = description;
this.submissionTrace = submissionTrace;
}
@Override
public boolean cancel(boolean arg0) {
return delegate.cancel(arg0);
}
@Override
public T get() throws InterruptedException, ExecutionException {
try {
return delegate.get();
} catch (ExecutionException e) {
throw ensureCauseHasSubmissionTrace(e);
} catch (InterruptedException e) {
throw ensureCauseHasSubmissionTrace(e);
}
}
@Override
public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
try {
return delegate.get(arg0, arg1);
} catch (ExecutionException e) {
throw ensureCauseHasSubmissionTrace(e);
} catch (InterruptedException e) {
throw ensureCauseHasSubmissionTrace(e);
} catch (TimeoutException e) {
throw ensureCauseHasSubmissionTrace(e);
}
}
/** This method does the work to ensure _if_ a submission stack trace was provided,
* it is included in the exception. most errors are thrown from the frame of the
* Future.get call, with a cause that took place in the executor's thread.
* We extend the stack trace of that cause with the submission stack trace.
* (An alternative would be to put the stack trace as a root cause,
* at the bottom of the stack, or appended to all traces, or inserted
* after the second cause, etc ... but since we can't change the "Caused by:"
* method in Throwable the compromise made here seems best.)
*/
private <ET extends Exception> ET ensureCauseHasSubmissionTrace(ET e) {
if (submissionTrace==null) return e;
if (e.getCause()==null) {
ExecutionException ee = new ExecutionException("task submitted from the following trace", null);
e.initCause(ee);
return e;
}
Throwable cause = e.getCause();
StackTraceElement[] causeTrace = cause.getStackTrace();
boolean causeIncludesSubmissionTrace = submissionTrace.length >= causeTrace.length;
for (int i=0; causeIncludesSubmissionTrace && i<submissionTrace.length; i++) {
if (!causeTrace[causeTrace.length-1-i].equals(submissionTrace[submissionTrace.length-1-i])) {
causeIncludesSubmissionTrace = false;
}
}
if (!causeIncludesSubmissionTrace) {
cause.setStackTrace(merge(causeTrace, submissionTrace));
}
return e;
}
private StackTraceElement[] merge(StackTraceElement[] t1, StackTraceElement[] t2) {
StackTraceElement[] t12 = new StackTraceElement[t1.length + t2.length];
System.arraycopy(t1, 0, t12, 0, t1.length);
System.arraycopy(t2, 0, t12, t1.length, t2.length);
return t12;
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public boolean equals(Object obj) {
return delegate.equals(obj);
}
@Override
public int hashCode() {
return delegate.hashCode();
}
@Override
public String toString() {
return description;
}
}
@Provides
@Singleton
@Named(Constants.PROPERTY_USER_THREADS)
@ -346,22 +141,25 @@ public class ExecutorServiceModule extends AbstractModule {
}
@VisibleForTesting
static ExecutorService newCachedThreadPoolNamed(String name) {
return Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(name).setThreadFactory(
Executors.defaultThreadFactory()).build());
ExecutorService newCachedThreadPoolNamed(String name) {
return Executors.newCachedThreadPool(namedThreadFactory(name));
}
@VisibleForTesting
static ExecutorService newThreadPoolNamed(String name, int maxCount) {
ExecutorService newThreadPoolNamed(String name, int maxCount) {
return maxCount == 0 ? newCachedThreadPoolNamed(name) : newScalingThreadPoolNamed(name, maxCount);
}
@VisibleForTesting
static ExecutorService newScalingThreadPoolNamed(String name, int maxCount) {
return newScalingThreadPool(1, maxCount, 60L * 1000, new ThreadFactoryBuilder().setNameFormat(name)
.setThreadFactory(Executors.defaultThreadFactory()).build());
ExecutorService newScalingThreadPoolNamed(String name, int maxCount) {
return newScalingThreadPool(1, maxCount, 60L * 1000, namedThreadFactory(name));
}
protected ThreadFactory namedThreadFactory(String name) {
return new ThreadFactoryBuilder().setNameFormat(name).setThreadFactory(Executors.defaultThreadFactory()).build();
}
/** returns the stack trace at the caller */
static StackTraceElement[] getStackTraceHere() {
// remove the first two items in the stack trace (because the first one refers to the call to

View File

@ -15,8 +15,6 @@ import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.Constants;
import org.jclouds.concurrent.config.ExecutorServiceModule.DescribedFuture;
import org.jclouds.concurrent.config.ExecutorServiceModule.DescribingExecutorService;
import org.jclouds.lifecycle.Closer;
import com.google.common.annotations.VisibleForTesting;

View File

@ -31,7 +31,7 @@ import org.testng.annotations.Test;
import com.google.common.cache.CacheLoader;
@Test(groups = "unit", testName = "BackoffExponentiallyAndRetryOnThrowableCacheLoaderTest")
@Test(groups = "unit", singleThreaded = true, testName = "BackoffExponentiallyAndRetryOnThrowableCacheLoaderTest")
public class BackoffExponentiallyAndRetryOnThrowableCacheLoaderTest {
private CacheLoader<String, Boolean> mock;

View File

@ -32,7 +32,6 @@ import javax.inject.Singleton;
import org.jclouds.Constants;
import org.jclouds.concurrent.Futures;
import org.jclouds.concurrent.SingleThreaded;
import org.jclouds.http.HttpCommand;
import org.jclouds.http.HttpCommandExecutorService;
import org.jclouds.http.HttpRequest;
@ -57,7 +56,6 @@ import com.google.common.util.concurrent.ListenableFuture;
*
* @author Adrian Cole
*/
@SingleThreaded
@Singleton
public class AsyncGaeHttpCommandExecutorService implements HttpCommandExecutorService {
private final ExecutorService service;

View File

@ -24,6 +24,10 @@ import org.jclouds.gae.AsyncGaeHttpCommandExecutorService;
import org.jclouds.http.HttpCommandExecutorService;
import org.jclouds.http.config.ConfiguresHttpCommandExecutorService;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Module;
/**
* Configures {@link AsyncGaeHttpCommandExecutorService}.
*
@ -33,7 +37,23 @@ import org.jclouds.http.config.ConfiguresHttpCommandExecutorService;
@ConfiguresExecutorService
@SingleThreaded
public class AsyncGoogleAppEngineConfigurationModule extends GoogleAppEngineConfigurationModule {
public AsyncGoogleAppEngineConfigurationModule() {
super();
}
public AsyncGoogleAppEngineConfigurationModule(Module executorServiceModule) {
super(executorServiceModule);
}
/**
* Used when you are creating multiple contexts in the same app.
* @param memoizedCurrentRequestExecutorService
* @see CurrentRequestExecutorServiceModule#memoizedCurrentRequestExecutorService
*/
public AsyncGoogleAppEngineConfigurationModule(Supplier<ListeningExecutorService> memoizedCurrentRequestExecutorService) {
super(memoizedCurrentRequestExecutorService);
}
protected void bindHttpCommandExecutorService() {
bind(HttpCommandExecutorService.class).to(AsyncGaeHttpCommandExecutorService.class);
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.gae.config;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.concurrent.DynamicExecutors.newScalingThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.Constants;
import org.jclouds.concurrent.config.ConfiguresExecutorService;
import org.jclouds.concurrent.config.DescribingExecutorService;
import com.google.appengine.api.ThreadManager;
import com.google.apphosting.api.ApiProxy;
import com.google.common.annotations.Beta;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
/**
*
* @author Adrian Cole
*/
@Beta
@ConfiguresExecutorService
public class CurrentRequestExecutorServiceModule extends AbstractModule {
private final Supplier<ListeningExecutorService> memoizedCurrentRequestExecutorService;
public CurrentRequestExecutorServiceModule() {
this(memoizedCurrentRequestExecutorService());
}
/**
* Used when you are creating multiple contexts in the same app.
*
* @param memoizedCurrentRequestExecutorService
* @see #memoizedCurrentRequestExecutorService
*/
public CurrentRequestExecutorServiceModule(Supplier<ListeningExecutorService> memoizedCurrentRequestExecutorService) {
this.memoizedCurrentRequestExecutorService = memoizedCurrentRequestExecutorService;
}
/**
* Used when you are creating multiple contexts in the same app.
*
* @param currentRequestExecutorService
* @see #currentRequestExecutorService
*/
public CurrentRequestExecutorServiceModule(ListeningExecutorService currentRequestExecutorService) {
this.memoizedCurrentRequestExecutorService = Suppliers.ofInstance(currentRequestExecutorService);
}
@Override
protected void configure() {
}
public static Supplier<ListeningExecutorService> memoizedCurrentRequestExecutorService() {
return Suppliers.memoize(new Supplier<ListeningExecutorService>() {
// important that these are lazy bound vs in configure, as GAE may not
// quite be initialized, yet!
@Override
public ListeningExecutorService get() {
return currentRequestExecutorService();
}
});
}
public static ListeningExecutorService currentRequestExecutorService() {
ThreadFactory factory = checkNotNull(ThreadManager.currentRequestThreadFactory(),
"ThreadManager.currentRequestThreadFactory()");
// GAE requests cannot exceed 10 threads per request
int maxThreads = 10;
long keepAlive = ApiProxy.getCurrentEnvironment().getRemainingMillis();
ExecutorService pool = newScalingThreadPool(0, maxThreads, keepAlive, factory);
return MoreExecutors.listeningDecorator(new DescribingExecutorService(pool));
}
@Provides
@Singleton
@Named(Constants.PROPERTY_USER_THREADS)
protected ExecutorService userThreads() {
return memoizedCurrentRequestExecutorService.get();
}
@Provides
@Singleton
@Named(Constants.PROPERTY_IO_WORKER_THREADS)
protected ExecutorService ioThreads() {
return memoizedCurrentRequestExecutorService.get();
}
}

View File

@ -30,6 +30,10 @@ import org.jclouds.http.config.ConfiguresHttpCommandExecutorService;
import com.google.appengine.api.urlfetch.URLFetchService;
import com.google.appengine.api.urlfetch.URLFetchServiceFactory;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.AbstractModule;
import com.google.inject.Module;
import com.google.inject.Provides;
/**
@ -40,15 +44,36 @@ import com.google.inject.Provides;
@ConfiguresHttpCommandExecutorService
@ConfiguresExecutorService
@SingleThreaded
public class GoogleAppEngineConfigurationModule extends ExecutorServiceModule {
public class GoogleAppEngineConfigurationModule extends AbstractModule {
private final Module executorServiceModule;
public GoogleAppEngineConfigurationModule() {
super(MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
this(new ExecutorServiceModule(MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()));
}
/**
* Used when you are creating multiple contexts in the same app.
*
* @param currentRequestExecutorService
* @see CurrentRequestExecutorServiceModule#currentRequestExecutorService
*/
public GoogleAppEngineConfigurationModule(Module executorServiceModule) {
this.executorServiceModule = executorServiceModule;
}
/**
* Used when you are creating multiple contexts in the same app.
*
* @param memoizedCurrentRequestExecutorService
* @see CurrentRequestExecutorServiceModule#memoizedCurrentRequestExecutorService
*/
public GoogleAppEngineConfigurationModule(Supplier<ListeningExecutorService> memoizedCurrentRequestExecutorService) {
this.executorServiceModule = new CurrentRequestExecutorServiceModule(memoizedCurrentRequestExecutorService);
}
@Override
protected void configure() {
super.configure();
install(executorServiceModule);
bind(TransformingHttpCommandExecutorService.class).to(TransformingHttpCommandExecutorServiceImpl.class);
bindHttpCommandExecutorService();
}
@ -58,7 +83,7 @@ public class GoogleAppEngineConfigurationModule extends ExecutorServiceModule {
}
@Provides
URLFetchService provideURLFetchService() {
protected URLFetchService provideURLFetchService() {
return URLFetchServiceFactory.getURLFetchService();
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.gae.config;
import org.jclouds.concurrent.config.ConfiguresExecutorService;
import org.jclouds.gae.AsyncGaeHttpCommandExecutorService;
import org.jclouds.http.HttpCommandExecutorService;
import org.jclouds.http.config.ConfiguresHttpCommandExecutorService;
import com.google.common.annotations.Beta;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListeningExecutorService;
/**
* Configures {@link AsyncGaeHttpCommandExecutorService}.
*
* @author Adrian Cole
*/
@Beta
@ConfiguresHttpCommandExecutorService
@ConfiguresExecutorService
public class MultithreadedAsyncGoogleAppEngineConfigurationModule extends GoogleAppEngineConfigurationModule {
public MultithreadedAsyncGoogleAppEngineConfigurationModule() {
super(new CurrentRequestExecutorServiceModule());
}
/**
* Used when you are creating multiple contexts in the same app.
*
* @param currentRequestThreadFactory
* @see CurrentRequestExecutorServiceModule#currentRequestThreadFactory
*/
public MultithreadedAsyncGoogleAppEngineConfigurationModule(ListeningExecutorService currentRequestThreadFactory) {
super(new CurrentRequestExecutorServiceModule(currentRequestThreadFactory));
}
/**
* Used when you are creating multiple contexts in the same app.
*
* @param memoizedCurrentRequestExecutorService
* @see CurrentRequestExecutorServiceModule#memoizedCurrentRequestExecutorService
*/
public MultithreadedAsyncGoogleAppEngineConfigurationModule(
Supplier<ListeningExecutorService> memoizedCurrentRequestExecutorService) {
super(memoizedCurrentRequestExecutorService);
}
protected void bindHttpCommandExecutorService() {
bind(HttpCommandExecutorService.class).to(AsyncGaeHttpCommandExecutorService.class);
}
}

View File

@ -205,7 +205,8 @@ public class AsyncGaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpC
@BeforeMethod
void setupApiProxy() {
new LocalServiceTestHelper(new LocalURLFetchServiceTestConfig()).setUp();
LocalServiceTestHelper helper = new LocalServiceTestHelper(new LocalURLFetchServiceTestConfig());
helper.setUp();
}
@Override
@ -316,6 +317,7 @@ public class AsyncGaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpC
}
protected Module createConnectionModule() {
setupApiProxy();
return new AsyncGoogleAppEngineConfigurationModule();
}

View File

@ -1,69 +0,0 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.gae.config;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import javax.ws.rs.core.UriBuilder;
import org.jclouds.Constants;
import org.jclouds.gae.GaeHttpCommandExecutorService;
import org.jclouds.http.HttpCommandExecutorService;
import org.jclouds.logging.Logger;
import org.jclouds.logging.Logger.LoggerFactory;
import org.jclouds.rest.internal.BaseRestApiMetadata;
import org.testng.annotations.Test;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.name.Names;
import com.sun.jersey.api.uri.UriBuilderImpl;
/**
* Tests the ability to configure a {@link GoogleAppEngineConfigurationModule}
*
* @author Adrian Cole
*/
@Test
public class GoogleAppEngineConfigurationModuleTest {
public void testConfigureBindsClient() {
final Properties properties = BaseRestApiMetadata.defaultProperties();
Injector i = Guice.createInjector(new GoogleAppEngineConfigurationModule() {
@Override
protected void configure() {
Names.bindProperties(binder(), properties);
bind(Logger.LoggerFactory.class).toInstance(new LoggerFactory() {
public Logger getLogger(String category) {
return Logger.NULL;
}
});
bind(UriBuilder.class).to(UriBuilderImpl.class);
super.configure();
}
});
HttpCommandExecutorService client = i.getInstance(HttpCommandExecutorService.class);
i.getInstance(Key.get(ExecutorService.class, Names.named(Constants.PROPERTY_USER_THREADS)));
// TODO check single threaded;
assert client instanceof GaeHttpCommandExecutorService;
}
}