mirror of https://github.com/apache/jclouds.git
JCLOUDS-153 remove IO Executor and usage of it.
This commit is contained in:
parent
d3d92d06da
commit
0a74e923d2
|
@ -57,7 +57,7 @@ public class BaseOpenStackMockTest<A extends Closeable> {
|
|||
public static final String accessRackspace = "{\"access\":{\"token\":{\"id\":\"b84f4a37-5126-4603-9521-ccd0665fbde1\",\"expires\":\"2013-04-13T16:49:57.000-05:00\",\"tenant\":{\"id\":\"123123\",\"name\":\"123123\"}},\"serviceCatalog\":[{\"endpoints\":[{\"tenantId\":\"123123\",\"publicURL\":\"URL/v1.0/123123\"}],\"name\":\"cloudMonitoring\",\"type\":\"rax:monitor\"},{\"endpoints\":[{\"region\":\"DFW\",\"tenantId\":\"MossoCloudFS_5bcf396e-39dd-45ff-93a1-712b9aba90a9\",\"publicURL\":\"URL/v1/MossoCloudFS_5bcf396e-39dd-45ff-93a1-712b9aba90a9\"},{\"region\":\"ORD\",\"tenantId\":\"MossoCloudFS_5bcf396e-39dd-45ff-93a1-712b9aba90a9\",\"publicURL\":\"URL/v1/MossoCloudFS_5bcf396e-39dd-45ff-93a1-712b9aba90a9\"}],\"name\":\"cloudFilesCDN\",\"type\":\"rax:object-cdn\"},{\"endpoints\":[{\"region\":\"ORD\",\"tenantId\":\"123123\",\"publicURL\":\"URL/v1.0/123123\"},{\"region\":\"DFW\",\"tenantId\":\"123123\",\"publicURL\":\"URL/v1.0/123123\"}],\"name\":\"cloudLoadBalancers\",\"type\":\"rax:load-balancer\"},{\"endpoints\":[{\"region\":\"DFW\",\"tenantId\":\"123123\",\"publicURL\":\"URL/v1.0/123123\"},{\"region\":\"ORD\",\"tenantId\":\"123123\",\"publicURL\":\"URL/v1.0/123123\"}],\"name\":\"cloudDatabases\",\"type\":\"rax:database\"},{\"endpoints\":[{\"region\":\"DFW\",\"tenantId\":\"MossoCloudFS_5bcf396e-39dd-45ff-93a1-712b9aba90a9\",\"publicURL\":\"URL/v1/MossoCloudFS_5bcf396e-39dd-45ff-93a1-712b9aba90a9\",\"internalURL\":\"URL/v1/MossoCloudFS_5bcf396e-39dd-45ff-93a1-712b9aba90a9\"},{\"region\":\"ORD\",\"tenantId\":\"MossoCloudFS_5bcf396e-39dd-45ff-93a1-712b9aba90a9\",\"publicURL\":\"URL/v1/MossoCloudFS_5bcf396e-39dd-45ff-93a1-712b9aba90a9\",\"internalURL\":\"URL/v1/MossoCloudFS_5bcf396e-39dd-45ff-93a1-712b9aba90a9\"}],\"name\":\"cloudFiles\",\"type\":\"object-store\"},{\"endpoints\":[{\"tenantId\":\"123123\",\"publicURL\":\"URL/v1.0/123123\",\"versionInfo\":\"URL/v1.0\",\"versionList\":\"URL/\",\"versionId\":\"1.0\"}],\"name\":\"cloudServers\",\"type\":\"compute\"},{\"endpoints\":[{\"region\":\"DFW\",\"tenantId\":\"123123\",\"publicURL\":\"URL/v2/123123\",\"versionInfo\":\"URL/v2\",\"versionList\":\"URL/\",\"versionId\":\"2\"},{\"region\":\"ORD\",\"tenantId\":\"123123\",\"publicURL\":\"URL/v2/123123\",\"versionInfo\":\"URL/v2\",\"versionList\":\"URL/\",\"versionId\":\"2\"}],\"name\":\"cloudServersOpenStack\",\"type\":\"compute\"},{\"endpoints\":[{\"tenantId\":\"123123\",\"publicURL\":\"URL/v1.0/123123\"}],\"name\":\"cloudDNS\",\"type\":\"rax:dns\"},{\"endpoints\":[{\"tenantId\":\"123123\",\"publicURL\":\"URL/v1.0/123123\"}],\"name\":\"cloudBackup\",\"type\":\"rax:backup\"},{\"endpoints\":[{\"region\":\"DFW\",\"tenantId\":\"123123\",\"publicURL\":\"URL/v1/123123\"},{\"region\":\"ORD\",\"tenantId\":\"123123\",\"publicURL\":\"URL/v1/123123\"}],\"name\":\"cloudBlockStorage\",\"type\":\"volume\"},{\"endpoints\":[{\"region\":\"DFW\",\"tenantId\":\"123123\",\"publicURL\":\"URL/v1/123123\",\"internalURL\":\"URL/v1/123123\"},{\"region\":\"ORD\",\"tenantId\":\"123123\",\"publicURL\":\"URL/v1/123123\",\"internalURL\":\"URL/v1/123123\"}],\"name\":\"marconi\",\"type\":\"queuing\"},{\"endpoints\":[{\"region\":\"DFW\",\"tenantId\":\"123123\",\"publicURL\":\"URL/v1/123123\",\"internalURL\":\"URL/v1/123123\"},{\"region\":\"ORD\",\"tenantId\":\"123123\",\"publicURL\":\"URL/v1/123123\",\"internalURL\":\"URL/v1/123123\"}],\"name\":\"autoscale\",\"type\":\"rax:autoscale\"}],\"user\":{\"id\":\"1234\",\"roles\":[{\"id\":\"3\",\"description\":\"User Admin Role.\",\"name\":\"identity:user-admin\"}],\"name\":\"jclouds-joe\",\"RAX-AUTH:defaultRegion\":\"DFW\"}}}";
|
||||
|
||||
private final Set<Module> modules = ImmutableSet.<Module> of(
|
||||
new ExecutorServiceModule(sameThreadExecutor(), sameThreadExecutor()));
|
||||
new ExecutorServiceModule(sameThreadExecutor()));
|
||||
|
||||
/**
|
||||
* Pattern for replacing the URL token with the correct local address.
|
||||
|
|
|
@ -46,7 +46,7 @@ import com.squareup.okhttp.mockwebserver.RecordedRequest;
|
|||
public class S3ClientMockTest {
|
||||
|
||||
private static final Set<Module> modules = ImmutableSet.<Module> of(new OkHttpCommandExecutorServiceModule(),
|
||||
new ExecutorServiceModule(sameThreadExecutor(), sameThreadExecutor()));
|
||||
new ExecutorServiceModule(sameThreadExecutor()));
|
||||
|
||||
static S3Client getS3Client(URL server) {
|
||||
Properties overrides = new Properties();
|
||||
|
|
|
@ -91,7 +91,7 @@ public class PermissionApiLiveTest extends BaseSQSApiLiveTest {
|
|||
|
||||
private AnonymousAttributesApi getAnonymousAttributesApi(URI queue) {
|
||||
return ContextBuilder.newBuilder(forApiOnEndpoint(AnonymousAttributesApi.class, queue.toASCIIString()))
|
||||
.modules(ImmutableSet.<Module> of(new ExecutorServiceModule(sameThreadExecutor(), sameThreadExecutor())))
|
||||
.modules(ImmutableSet.<Module> of(new ExecutorServiceModule(sameThreadExecutor())))
|
||||
.buildApi(AnonymousAttributesApi.class);
|
||||
}
|
||||
|
||||
|
|
|
@ -89,17 +89,17 @@ public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStra
|
|||
@Named(Constants.PROPERTY_REQUEST_TIMEOUT)
|
||||
protected Long maxTime;
|
||||
|
||||
private final ListeningExecutorService ioExecutor;
|
||||
private final ListeningExecutorService executor;
|
||||
|
||||
protected final SwiftBlobStore blobstore;
|
||||
protected final PayloadSlicer slicer;
|
||||
|
||||
@Inject
|
||||
public ParallelMultipartUploadStrategy(SwiftBlobStore blobstore, PayloadSlicer slicer,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor) {
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService executor) {
|
||||
this.blobstore = checkNotNull(blobstore, "blobstore");
|
||||
this.slicer = checkNotNull(slicer, "slicer");
|
||||
this.ioExecutor = checkNotNull(ioExecutor, "ioExecutor");
|
||||
this.executor = checkNotNull(executor, "executor");
|
||||
}
|
||||
|
||||
|
||||
|
@ -125,7 +125,7 @@ public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStra
|
|||
|
||||
final Blob blobPart = blobstore.blobBuilder(blobPartName).payload(chunkedPart).
|
||||
contentDisposition(blobPartName).build();
|
||||
final ListenableFuture<String> futureETag = ioExecutor.submit(new Callable<String>() {
|
||||
final ListenableFuture<String> futureETag = executor.submit(new Callable<String>() {
|
||||
@Override public String call() throws Exception {
|
||||
return client.putObject(container, blob2Object.apply(blobPart));
|
||||
}
|
||||
|
@ -155,13 +155,13 @@ public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStra
|
|||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}, ioExecutor);
|
||||
}, executor);
|
||||
futureParts.put(part, futureETag);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<String> execute(final String container, final Blob blob, final PutOptions options, final BlobToObject blob2Object) {
|
||||
return ioExecutor.submit(new Callable<String>() {
|
||||
return executor.submit(new Callable<String>() {
|
||||
@Override
|
||||
public String call() throws Exception {
|
||||
String key = blob.getMetadata().getName();
|
||||
|
@ -248,7 +248,7 @@ public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStra
|
|||
throw rtex;
|
||||
}
|
||||
} else {
|
||||
ListenableFuture<String> futureETag = ioExecutor.submit(new Callable<String>() {
|
||||
ListenableFuture<String> futureETag = executor.submit(new Callable<String>() {
|
||||
@Override public String call() throws Exception {
|
||||
return blobstore.putBlob(container, blob, options);
|
||||
}
|
||||
|
|
|
@ -97,8 +97,8 @@ public class SequentialMultipartUploadStrategyMockTest {
|
|||
}
|
||||
}
|
||||
|
||||
private static final Set<Module> modules = ImmutableSet.<Module> of(new ExecutorServiceModule(sameThreadExecutor(),
|
||||
sameThreadExecutor()));
|
||||
private static final Set<Module> modules = ImmutableSet.<Module> of(
|
||||
new ExecutorServiceModule(sameThreadExecutor()));
|
||||
|
||||
static SequentialMultipartUploadStrategy mockSequentialMultipartUploadStrategy(String uri, int partSize) {
|
||||
Properties overrides = new Properties();
|
||||
|
|
|
@ -66,9 +66,8 @@ public class StubComputeServiceAdapter implements JCloudsNativeComputeServiceAda
|
|||
private final Supplier<Location> location;
|
||||
private final ConcurrentMap<String, NodeMetadata> nodes;
|
||||
private final Multimap<String, SecurityGroup> groupsForNodes;
|
||||
private final ListeningExecutorService ioExecutor;
|
||||
private final ListeningExecutorService executor;
|
||||
private final Provider<Integer> idProvider;
|
||||
private final Provider<Integer> groupIdProvider;
|
||||
private final String publicIpPrefix;
|
||||
private final String privateIpPrefix;
|
||||
private final String passwordPrefix;
|
||||
|
@ -78,14 +77,13 @@ public class StubComputeServiceAdapter implements JCloudsNativeComputeServiceAda
|
|||
|
||||
@Inject
|
||||
public StubComputeServiceAdapter(ConcurrentMap<String, NodeMetadata> nodes,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor, Supplier<Location> location,
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService executor, Supplier<Location> location,
|
||||
@Named("NODE_ID") Provider<Integer> idProvider, @Named("PUBLIC_IP_PREFIX") String publicIpPrefix,
|
||||
@Named("PRIVATE_IP_PREFIX") String privateIpPrefix, @Named("PASSWORD_PREFIX") String passwordPrefix,
|
||||
JustProvider locationSupplier, Map<OsFamily, Map<String, String>> osToVersionMap,
|
||||
Multimap<String, SecurityGroup> groupsForNodes, @Named("GROUP_ID") Provider<Integer> groupIdProvider,
|
||||
Optional<SecurityGroupExtension> securityGroupExtension) {
|
||||
Multimap<String, SecurityGroup> groupsForNodes, Optional<SecurityGroupExtension> securityGroupExtension) {
|
||||
this.nodes = nodes;
|
||||
this.ioExecutor = ioExecutor;
|
||||
this.executor = executor;
|
||||
this.location = location;
|
||||
this.idProvider = idProvider;
|
||||
this.publicIpPrefix = publicIpPrefix;
|
||||
|
@ -94,7 +92,6 @@ public class StubComputeServiceAdapter implements JCloudsNativeComputeServiceAda
|
|||
this.locationSupplier = locationSupplier;
|
||||
this.osToVersionMap = osToVersionMap;
|
||||
this.groupsForNodes = groupsForNodes;
|
||||
this.groupIdProvider = groupIdProvider;
|
||||
this.securityGroupExtension = securityGroupExtension;
|
||||
}
|
||||
|
||||
|
@ -106,7 +103,7 @@ public class StubComputeServiceAdapter implements JCloudsNativeComputeServiceAda
|
|||
if (millis == 0l)
|
||||
setStateOnNode(status, node);
|
||||
else
|
||||
ioExecutor.execute(new Runnable() {
|
||||
executor.execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -219,8 +216,8 @@ public class StubComputeServiceAdapter implements JCloudsNativeComputeServiceAda
|
|||
setStateOnNodeAfterDelay(Status.PENDING, node, 0);
|
||||
setStateOnNodeAfterDelay(Status.TERMINATED, node, 50);
|
||||
groupsForNodes.removeAll(id);
|
||||
|
||||
ioExecutor.execute(new Runnable() {
|
||||
|
||||
executor.execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
|
@ -28,12 +28,10 @@ import javax.inject.Inject;
|
|||
import javax.inject.Named;
|
||||
import javax.inject.Provider;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.compute.domain.SecurityGroup;
|
||||
import org.jclouds.compute.domain.SecurityGroupBuilder;
|
||||
import org.jclouds.compute.extensions.SecurityGroupExtension;
|
||||
import org.jclouds.domain.Location;
|
||||
import org.jclouds.location.suppliers.all.JustProvider;
|
||||
import org.jclouds.net.domain.IpPermission;
|
||||
import org.jclouds.net.domain.IpProtocol;
|
||||
|
||||
|
@ -42,7 +40,6 @@ import com.google.common.base.Supplier;
|
|||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
|
||||
/**
|
||||
* An extension to compute service to allow for the manipulation of {@link SecurityGroup}s. Implementation
|
||||
|
@ -52,23 +49,17 @@ public class StubSecurityGroupExtension implements SecurityGroupExtension {
|
|||
|
||||
private final Supplier<Location> location;
|
||||
private final Provider<Integer> groupIdProvider;
|
||||
private final Supplier<Set<? extends Location>> locationSupplier;
|
||||
private final ListeningExecutorService ioExecutor;
|
||||
private final ConcurrentMap<String, SecurityGroup> groups;
|
||||
private final Multimap<String, SecurityGroup> groupsForNodes;
|
||||
|
||||
@Inject
|
||||
public StubSecurityGroupExtension(ConcurrentMap<String, SecurityGroup> groups,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
Supplier<Location> location,
|
||||
@Named("GROUP_ID") Provider<Integer> groupIdProvider,
|
||||
JustProvider locationSupplier,
|
||||
Multimap<String, SecurityGroup> groupsForNodes) {
|
||||
this.groups = groups;
|
||||
this.ioExecutor = ioExecutor;
|
||||
this.location = location;
|
||||
this.groupIdProvider = groupIdProvider;
|
||||
this.locationSupplier = locationSupplier;
|
||||
this.groupsForNodes = groupsForNodes;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,6 @@ import static org.easymock.EasyMock.createMock;
|
|||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.verify;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
import static org.jclouds.compute.config.ComputeServiceProperties.TIMEOUT_SCRIPT_COMPLETE;
|
||||
import static org.jclouds.scriptbuilder.domain.Statements.exec;
|
||||
|
@ -52,11 +51,10 @@ import com.google.inject.assistedinject.FactoryModuleBuilder;
|
|||
|
||||
@Test(groups = "unit", singleThreaded = true, testName = "RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilCompleteTest")
|
||||
public class RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilCompleteTest {
|
||||
Injector injector = Guice.createInjector(new ExecutorServiceModule(sameThreadExecutor(), sameThreadExecutor()),
|
||||
Injector injector = Guice.createInjector(new ExecutorServiceModule(sameThreadExecutor()),
|
||||
new AbstractModule() {
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(named(PROPERTY_USER_THREADS)).to(1);
|
||||
bindConstant().annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).to(1);
|
||||
bindConstant().annotatedWith(named(TIMEOUT_SCRIPT_COMPLETE)).to(100);
|
||||
install(new FactoryModuleBuilder().build(BlockUntilInitScriptStatusIsZeroThenReturnOutput.Factory.class));
|
||||
}
|
||||
|
|
|
@ -34,7 +34,10 @@ public final class Constants {
|
|||
* Integer property. default (20)
|
||||
* <p/>
|
||||
* Amount of threads servicing the I/O of http connections.
|
||||
*
|
||||
* @deprecated No longer used. Will be removed in jclouds v2
|
||||
*/
|
||||
@Deprecated
|
||||
public static final String PROPERTY_IO_WORKER_THREADS = "jclouds.io-worker-threads";
|
||||
|
||||
/**
|
||||
|
|
|
@ -540,7 +540,7 @@ public class ContextBuilder {
|
|||
return input.getClass().isAnnotationPresent(SingleThreaded.class);
|
||||
}
|
||||
})) {
|
||||
modules.add(new ExecutorServiceModule(sameThreadExecutor(), sameThreadExecutor()));
|
||||
modules.add(new ExecutorServiceModule(sameThreadExecutor()));
|
||||
} else {
|
||||
modules.add(new ExecutorServiceModule());
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.jclouds.apis.internal;
|
|||
import static com.google.common.base.Objects.equal;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static org.jclouds.Constants.PROPERTY_CONNECTION_TIMEOUT;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_ISO3166_CODES;
|
||||
import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_CONTEXT;
|
||||
import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_HOST;
|
||||
|
@ -66,7 +65,6 @@ public abstract class BaseApiMetadata implements ApiMetadata {
|
|||
props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_HOST, 0 + "");
|
||||
props.setProperty(PROPERTY_SO_TIMEOUT, 60000 + "");
|
||||
props.setProperty(PROPERTY_CONNECTION_TIMEOUT, 60000 + "");
|
||||
props.setProperty(PROPERTY_IO_WORKER_THREADS, 20 + "");
|
||||
// Successfully tested 50 user threads with BlobStore.clearContainer.
|
||||
props.setProperty(PROPERTY_USER_THREADS, numUserThreads + "");
|
||||
props.setProperty(PROPERTY_SCHEDULER_THREADS, 10 + "");
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.jclouds.concurrent.config;
|
||||
|
||||
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
import static org.jclouds.concurrent.DynamicExecutors.newScalingThreadPool;
|
||||
|
||||
|
@ -74,22 +73,37 @@ public class ExecutorServiceModule extends AbstractModule {
|
|||
}
|
||||
|
||||
final ListeningExecutorService userExecutorFromConstructor;
|
||||
final ListeningExecutorService ioExecutorFromConstructor;
|
||||
|
||||
public ExecutorServiceModule() {
|
||||
this.userExecutorFromConstructor = null;
|
||||
this.ioExecutorFromConstructor = null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @deprecated {@code ioExecutor} is no longer used. This constructor will be removed in jclouds v2.
|
||||
* Use {@link #ExecutorServiceModule(ExecutorService)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public ExecutorServiceModule(@Named(PROPERTY_USER_THREADS) ExecutorService userExecutor,
|
||||
@Named(PROPERTY_IO_WORKER_THREADS) ExecutorService ioExecutor) {
|
||||
this(listeningDecorator(userExecutor), listeningDecorator(ioExecutor));
|
||||
ExecutorService ioExecutor) {
|
||||
this(userExecutor);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @deprecated {@code ioExecutor} is no longer used. This constructor will be removed in jclouds v2.
|
||||
* Use {@link #ExecutorServiceModule(ListeningExecutorService)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public ExecutorServiceModule(@Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
||||
@Named(PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor) {
|
||||
ListeningExecutorService ioExecutor) {
|
||||
this(userExecutor);
|
||||
}
|
||||
|
||||
public ExecutorServiceModule(@Named(PROPERTY_USER_THREADS) ExecutorService userExecutor) {
|
||||
this(listeningDecorator(userExecutor));
|
||||
}
|
||||
|
||||
public ExecutorServiceModule(@Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor) {
|
||||
this.userExecutorFromConstructor = WithSubmissionTrace.wrap(userExecutor);
|
||||
this.ioExecutorFromConstructor = WithSubmissionTrace.wrap(ioExecutor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -111,16 +125,6 @@ public class ExecutorServiceModule extends AbstractModule {
|
|||
return shutdownOnClose(WithSubmissionTrace.wrap(newThreadPoolNamed("user thread %d", count)), closer);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@Named(PROPERTY_IO_WORKER_THREADS)
|
||||
ListeningExecutorService provideListeningIOExecutorService(@Named(PROPERTY_IO_WORKER_THREADS) int count,
|
||||
Closer closer) { // NO_UCD
|
||||
if (ioExecutorFromConstructor != null)
|
||||
return ioExecutorFromConstructor;
|
||||
return shutdownOnClose(WithSubmissionTrace.wrap(newThreadPoolNamed("i/o thread %d", count)), closer);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@Named(PROPERTY_USER_THREADS)
|
||||
|
@ -128,13 +132,6 @@ public class ExecutorServiceModule extends AbstractModule {
|
|||
return in;
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@Named(PROPERTY_IO_WORKER_THREADS)
|
||||
ExecutorService provideIOExecutorService(@Named(PROPERTY_IO_WORKER_THREADS) ListeningExecutorService in) { // NO_UCD
|
||||
return in;
|
||||
}
|
||||
|
||||
static <T extends ListeningExecutorService> T shutdownOnClose(final T service, Closer closer) {
|
||||
closer.addToClose(new ShutdownExecutorOnClose(service));
|
||||
return service;
|
||||
|
|
|
@ -16,27 +16,14 @@
|
|||
*/
|
||||
package org.jclouds.http;
|
||||
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
/**
|
||||
* Capable of invoking http commands.
|
||||
*/
|
||||
public interface HttpCommandExecutorService {
|
||||
public interface HttpCommandExecutorService {
|
||||
|
||||
/**
|
||||
* Returns a potentially deferred {@code HttpResponse} from a server responding to the
|
||||
* {@code command}. The output {@code ListenableFuture} need not be
|
||||
* {@linkplain Future#isDone done}, making {@code HttpCommandExecutorService}
|
||||
* suitable for asynchronous derivations.
|
||||
*
|
||||
*/
|
||||
ListenableFuture<HttpResponse> submit(HttpCommand command);
|
||||
|
||||
/**
|
||||
* Returns a {@code HttpResponse} from the server which responded to the
|
||||
* {@code command}.
|
||||
*/
|
||||
HttpResponse invoke(HttpCommand command);
|
||||
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import static org.jclouds.http.HttpUtils.wirePayloadIfEnabled;
|
|||
import static org.jclouds.util.Throwables2.getFirstThrowableOfType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.inject.Inject;
|
||||
|
@ -43,9 +42,6 @@ import org.jclouds.http.handlers.DelegatingRetryHandler;
|
|||
import org.jclouds.io.ContentMetadataCodec;
|
||||
import org.jclouds.logging.Logger;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
|
||||
public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandExecutorService {
|
||||
protected final HttpUtils utils;
|
||||
protected final ContentMetadataCodec contentMetadataCodec;
|
||||
|
@ -53,7 +49,6 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
|
|||
protected final DelegatingRetryHandler retryHandler;
|
||||
protected final IOExceptionRetryHandler ioRetryHandler;
|
||||
protected final DelegatingErrorHandler errorHandler;
|
||||
protected final ListeningExecutorService ioExecutor;
|
||||
|
||||
@Resource
|
||||
protected Logger logger = Logger.NULL;
|
||||
|
@ -65,7 +60,6 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
|
|||
|
||||
@Inject
|
||||
protected BaseHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire) {
|
||||
this.utils = checkNotNull(utils, "utils");
|
||||
|
@ -73,7 +67,6 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
|
|||
this.retryHandler = checkNotNull(retryHandler, "retryHandler");
|
||||
this.ioRetryHandler = checkNotNull(ioRetryHandler, "ioRetryHandler");
|
||||
this.errorHandler = checkNotNull(errorHandler, "errorHandler");
|
||||
this.ioExecutor = checkNotNull(ioExecutor, "ioExecutor");
|
||||
this.wire = checkNotNull(wire, "wire");
|
||||
}
|
||||
|
||||
|
@ -137,37 +130,6 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
|
|||
return shouldContinue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
HttpRequest request = command.getCurrentRequest();
|
||||
checkRequestHasContentLengthOrChunkedEncoding(request,
|
||||
"if the request has a payload, it must be set to chunked encoding or specify a content length: " + request);
|
||||
return ioExecutor.submit(new HttpResponseCallable(command));
|
||||
}
|
||||
|
||||
public class HttpResponseCallable implements Callable<HttpResponse> {
|
||||
private final HttpCommand command;
|
||||
|
||||
public HttpResponseCallable(HttpCommand command) {
|
||||
this.command = command;
|
||||
}
|
||||
|
||||
public HttpResponse call() throws Exception {
|
||||
try {
|
||||
return invoke(command);
|
||||
} finally {
|
||||
if (command.getException() != null)
|
||||
throw command.getException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return command.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected abstract Q convert(HttpRequest request) throws IOException, InterruptedException;
|
||||
|
||||
protected abstract HttpResponse invoke(Q nativeRequest) throws IOException, InterruptedException;
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
package org.jclouds.http.internal;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Throwables.propagate;
|
||||
|
@ -44,7 +45,6 @@ import javax.net.ssl.HostnameVerifier;
|
|||
import javax.net.ssl.HttpsURLConnection;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.JcloudsVersion;
|
||||
import org.jclouds.http.HttpRequest;
|
||||
import org.jclouds.http.HttpResponse;
|
||||
|
@ -63,7 +63,6 @@ import com.google.common.collect.ImmutableMultimap.Builder;
|
|||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.google.common.io.CountingOutputStream;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
/**
|
||||
|
@ -83,12 +82,11 @@ public class JavaUrlHttpCommandExecutorService extends BaseHttpCommandExecutorSe
|
|||
|
||||
@Inject
|
||||
public JavaUrlHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire, @Named("untrusted") HostnameVerifier verifier,
|
||||
@Named("untrusted") Supplier<SSLContext> untrustedSSLContextProvider, Function<URI, Proxy> proxyForURI)
|
||||
throws SecurityException, NoSuchFieldException {
|
||||
super(utils, contentMetadataCodec, ioExecutor, retryHandler, ioRetryHandler, errorHandler, wire);
|
||||
super(utils, contentMetadataCodec, retryHandler, ioRetryHandler, errorHandler, wire);
|
||||
if (utils.getMaxConnections() > 0)
|
||||
System.setProperty("http.maxConnections", String.valueOf(checkNotNull(utils, "utils").getMaxConnections()));
|
||||
this.untrustedSSLContextProvider = checkNotNull(untrustedSSLContextProvider, "untrustedSSLContextProvider");
|
||||
|
|
|
@ -20,7 +20,6 @@ import static com.google.common.base.Throwables.propagate;
|
|||
import static com.google.common.collect.Iterables.filter;
|
||||
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
|
||||
import static com.google.inject.matcher.Matchers.any;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_SCHEDULER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
import static org.jclouds.reflect.Reflection2.methods;
|
||||
|
@ -70,9 +69,6 @@ public class LifeCycleModule extends AbstractModule {
|
|||
@Inject
|
||||
@Named(PROPERTY_USER_THREADS)
|
||||
ListeningExecutorService userExecutor;
|
||||
@Inject
|
||||
@Named(PROPERTY_IO_WORKER_THREADS)
|
||||
ListeningExecutorService ioExecutor;
|
||||
// ScheduledExecutor is defined in an optional module
|
||||
@Inject(optional = true)
|
||||
@Named(PROPERTY_SCHEDULER_THREADS)
|
||||
|
@ -81,8 +77,6 @@ public class LifeCycleModule extends AbstractModule {
|
|||
public void close() throws IOException {
|
||||
assert userExecutor != null;
|
||||
userExecutor.shutdownNow();
|
||||
assert ioExecutor != null;
|
||||
ioExecutor.shutdownNow();
|
||||
// ScheduledExecutor is defined in an optional module
|
||||
if (scheduledExecutor != null)
|
||||
scheduledExecutor.shutdownNow();
|
||||
|
|
|
@ -23,7 +23,6 @@ import static org.easymock.EasyMock.createMock;
|
|||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.verify;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.assertNotEquals;
|
||||
|
@ -55,7 +54,6 @@ public class ExecutorServiceModuleTest {
|
|||
ExecutorServiceModule module = new ExecutorServiceModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).to(1);
|
||||
bindConstant().annotatedWith(named(PROPERTY_USER_THREADS)).to(1);
|
||||
super.configure();
|
||||
}
|
||||
|
@ -63,18 +61,14 @@ public class ExecutorServiceModuleTest {
|
|||
|
||||
injector = Guice.createInjector(module);
|
||||
assertNull(module.userExecutorFromConstructor);
|
||||
assertNull(module.ioExecutorFromConstructor);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
private void close() throws IOException {
|
||||
ListeningExecutorService user = injector.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_USER_THREADS)));
|
||||
ListeningExecutorService io = injector.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_IO_WORKER_THREADS)));
|
||||
injector.getInstance(Closer.class).close();
|
||||
assertTrue(user.isShutdown());
|
||||
assertTrue(io.isShutdown());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -95,23 +89,18 @@ public class ExecutorServiceModuleTest {
|
|||
|
||||
@Test(timeOut = 5000)
|
||||
public void testExceptionInSubmitRunnableIncludesSubmissionTrace() throws Exception {
|
||||
ListeningExecutorService user = injector.getInstance(Key.get(ListeningExecutorService.class,
|
||||
ListeningExecutorService exec = injector.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_USER_THREADS)));
|
||||
ListeningExecutorService io = injector.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_IO_WORKER_THREADS)));
|
||||
|
||||
for (ListeningExecutorService exec : ImmutableList.of(user, io)) {
|
||||
String submission = null;
|
||||
try {
|
||||
// this is sensitive to formatting as we are looking for the stack traces to match. if you wrap the below
|
||||
// line again, you'll need to change incrementInitialElement to 3 line numbers instead of 2.
|
||||
submission = getStackTraceAsString(incrementInitialElement(new RuntimeException(), 2)).replaceFirst(format(".*%s", LINE_SEPARATOR),
|
||||
"");
|
||||
exec.submit(runnableThrowsRTE()).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertTraceHasSubmission(getStackTraceAsString(e), submission);
|
||||
assertTraceHasSubmission(getStackTraceAsString(e.getCause()), submission);
|
||||
}
|
||||
String submission = null;
|
||||
try {
|
||||
// this is sensitive to formatting as we are looking for the stack traces to match. if you wrap the below
|
||||
// line again, you'll need to change incrementInitialElement to 3 line numbers instead of 2.
|
||||
submission = getStackTraceAsString(incrementInitialElement(new RuntimeException(), 2)).replaceFirst(format(".*%s", LINE_SEPARATOR),
|
||||
"");
|
||||
exec.submit(runnableThrowsRTE()).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertTraceHasSubmission(getStackTraceAsString(e), submission);
|
||||
assertTraceHasSubmission(getStackTraceAsString(e.getCause()), submission);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -45,7 +45,6 @@ public class EventBusModuleTest {
|
|||
ExecutorServiceModule userExecutorModule = new ExecutorServiceModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_IO_WORKER_THREADS)).to(1);
|
||||
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_USER_THREADS)).to(1);
|
||||
super.configure();
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.jclouds.http;
|
||||
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_CONTEXT;
|
||||
import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_HOST;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
|
@ -41,8 +40,6 @@ public class JavaUrlHttpCommandExecutorServiceIntegrationTest extends BaseHttpCo
|
|||
protected void addOverrideProperties(Properties props) {
|
||||
props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_CONTEXT, 50 + "");
|
||||
props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_HOST, 0 + "");
|
||||
// IO workers not used in this executor
|
||||
props.setProperty(PROPERTY_IO_WORKER_THREADS, 0 + "");
|
||||
props.setProperty(PROPERTY_USER_THREADS, 5 + "");
|
||||
}
|
||||
|
||||
|
|
|
@ -27,9 +27,7 @@ import javax.inject.Singleton;
|
|||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.http.HttpCommand;
|
||||
import org.jclouds.http.HttpResponse;
|
||||
import org.jclouds.http.HttpUtils;
|
||||
import org.jclouds.http.IOExceptionRetryHandler;
|
||||
import org.jclouds.http.handlers.DelegatingErrorHandler;
|
||||
|
@ -41,8 +39,6 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.reflect.Invokable;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.TypeLiteral;
|
||||
|
@ -87,20 +83,12 @@ public class TrackingJavaUrlHttpCommandExecutorService extends JavaUrlHttpComman
|
|||
|
||||
@Inject
|
||||
public TrackingJavaUrlHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire, @Named("untrusted") HostnameVerifier verifier,
|
||||
@Named("untrusted") Supplier<SSLContext> untrustedSSLContextProvider, Function<URI, Proxy> proxyForURI,
|
||||
List<HttpCommand> commandsInvoked) throws SecurityException, NoSuchFieldException {
|
||||
super(utils, contentMetadataCodec, ioExecutor, retryHandler, ioRetryHandler, errorHandler, wire, verifier,
|
||||
untrustedSSLContextProvider, proxyForURI);
|
||||
super(utils, contentMetadataCodec, retryHandler, ioRetryHandler, errorHandler, wire, verifier,
|
||||
untrustedSSLContextProvider, proxyForURI);
|
||||
this.commandsInvoked = commandsInvoked;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
commandsInvoked.add(command);
|
||||
return super.submit(command);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.jclouds.lifecycle.config;
|
||||
|
||||
import static com.google.inject.name.Names.named;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
@ -48,13 +47,11 @@ public class LifeCycleModuleTest {
|
|||
void testBindsExecutor() {
|
||||
Injector i = createInjector();
|
||||
assert i.getInstance(Key.get(ListeningExecutorService.class, named(PROPERTY_USER_THREADS))) != null;
|
||||
assert i.getInstance(Key.get(ListeningExecutorService.class, named(PROPERTY_IO_WORKER_THREADS))) != null;
|
||||
}
|
||||
|
||||
private Injector createInjector() {
|
||||
Injector i = Guice.createInjector(new AbstractModule() {
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).to(1);
|
||||
bindConstant().annotatedWith(named(PROPERTY_USER_THREADS)).to(1);
|
||||
}
|
||||
}, new LifeCycleModule(), new ExecutorServiceModule());
|
||||
|
@ -90,14 +87,10 @@ public class LifeCycleModuleTest {
|
|||
ListeningExecutorService userExecutor = i.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_USER_THREADS)));
|
||||
assert !userExecutor.isShutdown();
|
||||
ListeningExecutorService ioExecutor = i.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_IO_WORKER_THREADS)));
|
||||
assert !ioExecutor.isShutdown();
|
||||
Closer closer = i.getInstance(Closer.class);
|
||||
assert closer.getState() == Closer.State.AVAILABLE;
|
||||
closer.close();
|
||||
assert userExecutor.isShutdown();
|
||||
assert ioExecutor.isShutdown();
|
||||
assert closer.getState() == Closer.State.DONE;
|
||||
}
|
||||
|
||||
|
|
|
@ -56,7 +56,7 @@ public class ClosableApiTest {
|
|||
|
||||
DelegatingApi api = ContextBuilder.newBuilder(provider)
|
||||
.modules(ImmutableSet.<Module> builder()
|
||||
.add(new ExecutorServiceModule(executor, executor))
|
||||
.add(new ExecutorServiceModule(executor))
|
||||
.build())
|
||||
.buildApi(DelegatingApi.class);
|
||||
api.close();
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.jclouds.rest.internal;
|
|||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
|
||||
import static com.google.inject.name.Names.named;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_MAX_RETRIES;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
|
@ -191,11 +190,9 @@ public abstract class BaseRestApiExpectTest<S> {
|
|||
|
||||
@Inject
|
||||
public ExpectHttpCommandExecutorService(Function<HttpRequest, HttpResponse> fn, HttpUtils utils,
|
||||
ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
IOExceptionRetryHandler ioRetryHandler, DelegatingRetryHandler retryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire) {
|
||||
super(utils, contentMetadataCodec, ioExecutor, retryHandler, ioRetryHandler, errorHandler, wire);
|
||||
ContentMetadataCodec contentMetadataCodec, IOExceptionRetryHandler ioRetryHandler,
|
||||
DelegatingRetryHandler retryHandler, DelegatingErrorHandler errorHandler, HttpWire wire) {
|
||||
super(utils, contentMetadataCodec, retryHandler, ioRetryHandler, errorHandler, wire);
|
||||
this.fn = checkNotNull(fn, "fn");
|
||||
}
|
||||
|
||||
|
@ -228,7 +225,6 @@ public abstract class BaseRestApiExpectTest<S> {
|
|||
@Override
|
||||
public void configure() {
|
||||
bind(ListeningExecutorService.class).annotatedWith(named(PROPERTY_USER_THREADS)).toInstance(sameThreadExecutor());
|
||||
bind(ListeningExecutorService.class).annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).toInstance(sameThreadExecutor());
|
||||
bind(new TypeLiteral<Function<HttpRequest, HttpResponse>>() {
|
||||
}).toInstance(fn);
|
||||
bind(HttpCommandExecutorService.class).to(ExpectHttpCommandExecutorService.class);
|
||||
|
|
|
@ -21,7 +21,6 @@ import static com.google.common.net.HttpHeaders.TRANSFER_ENCODING;
|
|||
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
|
||||
import static com.google.inject.name.Names.named;
|
||||
import static org.easymock.EasyMock.createMock;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.assertNull;
|
||||
|
@ -79,7 +78,6 @@ public abstract class BaseRestApiTest {
|
|||
@Override
|
||||
protected void configure() {
|
||||
bind(ListeningExecutorService.class).annotatedWith(named(PROPERTY_USER_THREADS)).toInstance(sameThreadExecutor());
|
||||
bind(ListeningExecutorService.class).annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).toInstance(sameThreadExecutor());
|
||||
bind(HttpCommandExecutorService.class).toInstance(mock);
|
||||
}
|
||||
|
||||
|
|
|
@ -244,12 +244,6 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
ExecutionException {
|
||||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
int callCounter = 0;
|
||||
|
||||
@Override
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
throw new AssertionError();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResponse invoke(HttpCommand command) {
|
||||
if (callCounter == 1)
|
||||
|
@ -279,19 +273,12 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
public void testDelegateWithPathParamIsLazyLoadedAndRequestIncludesEndpointVersionAndPath()
|
||||
throws InterruptedException, ExecutionException {
|
||||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
throw new AssertionError("jclouds no longer uses the submit method");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResponse invoke(HttpCommand command) {
|
||||
assertEquals(command.getCurrentRequest().getRequestLine(),
|
||||
"GET http://howdyboys/testing/testing/thepathparam/client/1/foo HTTP/1.1");
|
||||
return HttpResponse.builder().build();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
try {
|
||||
|
@ -310,18 +297,11 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
public void testDelegateWithHeaderParamIsLazyLoadedAndRequestIncludesEndpointVersionAndHeader()
|
||||
throws InterruptedException, ExecutionException {
|
||||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
throw new AssertionError("jclouds no longer uses the submit method");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResponse invoke(HttpCommand command) {
|
||||
assertEquals(command.getCurrentRequest().getFirstHeaderOrNull("header"), "theheaderparam");
|
||||
return HttpResponse.builder().build();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
try {
|
||||
|
@ -338,12 +318,6 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
public void testDelegateWithoutProducesAndConsumes()
|
||||
throws InterruptedException, ExecutionException {
|
||||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
throw new AssertionError("jclouds no longer uses the submit method");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResponse invoke(HttpCommand command) {
|
||||
assertEquals(
|
||||
|
@ -352,7 +326,6 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
assertTrue(command.getCurrentRequest().getHeaders().get("Accept").contains(APPLICATION_JSON));
|
||||
return HttpResponse.builder().build();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
try {
|
||||
|
@ -368,12 +341,6 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
public void testDelegateWithProducesAndConsumesOnMethodIsLazyLoaded()
|
||||
throws InterruptedException, ExecutionException {
|
||||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
throw new AssertionError("jclouds no longer uses the submit method");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResponse invoke(HttpCommand command) {
|
||||
assertEquals(
|
||||
|
@ -398,12 +365,6 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
public void testDelegateWithProducesAndConsumesOnClassIsLazyLoaded()
|
||||
throws InterruptedException, ExecutionException {
|
||||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
throw new AssertionError("jclouds no longer uses the submit method");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResponse invoke(HttpCommand command) {
|
||||
assertEquals(
|
||||
|
@ -412,7 +373,6 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
assertTrue(command.getCurrentRequest().getHeaders().get("Accept").contains(APPLICATION_XML));
|
||||
return HttpResponse.builder().build();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
try {
|
||||
|
@ -428,18 +388,11 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
public void testDelegateIsLazyLoadedAndRequestIncludesEndpointVersionAndPathOptionalPresent()
|
||||
throws InterruptedException, ExecutionException {
|
||||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
throw new AssertionError("jclouds no longer uses the submit method");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResponse invoke(HttpCommand command) {
|
||||
assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1");
|
||||
return HttpResponse.builder().build();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
try {
|
||||
|
@ -458,18 +411,11 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
public void testDelegateIsLazyLoadedAndRequestIncludesEndpointVersionAndPath() throws InterruptedException,
|
||||
ExecutionException {
|
||||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
throw new AssertionError("jclouds no longer uses the submit method");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResponse invoke(HttpCommand command) {
|
||||
assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1");
|
||||
return HttpResponse.builder().build();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
try {
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
package org.jclouds.http.apachehc;
|
||||
|
||||
import static com.google.common.hash.Hashing.md5;
|
||||
import static com.google.common.io.BaseEncoding.base64;
|
||||
import static org.jclouds.http.HttpUtils.filterOutContentHeaders;
|
||||
|
@ -22,14 +23,13 @@ import static org.jclouds.http.HttpUtils.filterOutContentHeaders;
|
|||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import javax.inject.Named;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.client.ClientProtocolException;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpUriRequest;
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.http.HttpRequest;
|
||||
import org.jclouds.http.HttpResponse;
|
||||
import org.jclouds.http.HttpUtils;
|
||||
|
@ -38,18 +38,16 @@ import org.jclouds.http.handlers.DelegatingErrorHandler;
|
|||
import org.jclouds.http.handlers.DelegatingRetryHandler;
|
||||
import org.jclouds.http.internal.BaseHttpCommandExecutorService;
|
||||
import org.jclouds.http.internal.HttpWire;
|
||||
import org.jclouds.io.ContentMetadataCodec;
|
||||
import org.jclouds.io.ByteStreams2;
|
||||
import org.jclouds.io.ContentMetadataCodec;
|
||||
import org.jclouds.io.Payload;
|
||||
import org.jclouds.io.Payloads;
|
||||
|
||||
import com.google.common.collect.LinkedHashMultimap;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
/**
|
||||
* Simple implementation of a {@link HttpFutureCommandClient}, Apache Components HttpClient 4.x.
|
||||
* Simple implementation of a {@link HttpCommandExecutorService}, Apache Components HttpClient 4.x.
|
||||
*/
|
||||
public class ApacheHCHttpCommandExecutorService extends BaseHttpCommandExecutorService<HttpUriRequest> {
|
||||
private final HttpClient client;
|
||||
|
@ -57,10 +55,9 @@ public class ApacheHCHttpCommandExecutorService extends BaseHttpCommandExecutorS
|
|||
|
||||
@Inject
|
||||
ApacheHCHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire, HttpClient client) {
|
||||
super(utils, contentMetadataCodec, ioExecutor, retryHandler, ioRetryHandler, errorHandler, wire);
|
||||
super(utils, contentMetadataCodec, retryHandler, ioRetryHandler, errorHandler, wire);
|
||||
this.client = client;
|
||||
this.apacheHCUtils = new ApacheHCUtils(contentMetadataCodec);
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.jclouds.http.apachehc;
|
||||
|
||||
import static org.jclouds.Constants.PROPERTY_CONNECTION_TIMEOUT;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_CONTEXT;
|
||||
import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_HOST;
|
||||
import static org.jclouds.Constants.PROPERTY_SO_TIMEOUT;
|
||||
|
@ -49,7 +48,6 @@ public class ApacheHCHttpCommandExecutorServiceTestDisabled extends BaseHttpComm
|
|||
props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_HOST, 0 + "");
|
||||
props.setProperty(PROPERTY_CONNECTION_TIMEOUT, 100 + "");
|
||||
props.setProperty(PROPERTY_SO_TIMEOUT, 100 + "");
|
||||
props.setProperty(PROPERTY_IO_WORKER_THREADS, 3 + "");
|
||||
props.setProperty(PROPERTY_USER_THREADS, 0 + "");
|
||||
}
|
||||
|
||||
|
|
|
@ -30,10 +30,20 @@ import com.google.common.util.concurrent.ListeningExecutorService;
|
|||
@ConfiguresExecutorService
|
||||
public class EnterpriseConfigurationModule extends ExecutorServiceModule {
|
||||
|
||||
/**
|
||||
* @deprecated {@code ioExecutor} is no longer used. This constructor will be removed in jclouds v2.
|
||||
* Use {@link #EnterpriseConfigurationModule(ListeningExecutorService)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public EnterpriseConfigurationModule(ListeningExecutorService userExecutor, ListeningExecutorService ioExecutor) {
|
||||
super(userExecutor, ioExecutor);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public EnterpriseConfigurationModule(ListeningExecutorService userExecutor) {
|
||||
super(userExecutor);
|
||||
}
|
||||
|
||||
public EnterpriseConfigurationModule() {
|
||||
super();
|
||||
}
|
||||
|
|
|
@ -1,175 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jclouds.gae;
|
||||
|
||||
import static com.google.common.base.Throwables.propagate;
|
||||
import static com.google.common.util.concurrent.Futures.transform;
|
||||
import static com.google.common.util.concurrent.JdkFutureAdapters.listenInPoolThread;
|
||||
import static org.jclouds.http.HttpUtils.checkRequestHasContentLengthOrChunkedEncoding;
|
||||
import static org.jclouds.http.HttpUtils.wirePayloadIfEnabled;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.JcloudsVersion;
|
||||
import org.jclouds.http.HttpCommand;
|
||||
import org.jclouds.http.HttpRequest;
|
||||
import org.jclouds.http.HttpRequestFilter;
|
||||
import org.jclouds.http.HttpResponse;
|
||||
import org.jclouds.http.HttpResponseException;
|
||||
import org.jclouds.http.HttpUtils;
|
||||
import org.jclouds.http.IOExceptionRetryHandler;
|
||||
import org.jclouds.http.handlers.DelegatingErrorHandler;
|
||||
import org.jclouds.http.handlers.DelegatingRetryHandler;
|
||||
import org.jclouds.http.internal.BaseHttpCommandExecutorService;
|
||||
import org.jclouds.http.internal.HttpWire;
|
||||
import org.jclouds.io.ContentMetadataCodec;
|
||||
import org.jclouds.util.Throwables2;
|
||||
|
||||
import com.google.appengine.api.urlfetch.HTTPRequest;
|
||||
import com.google.appengine.api.urlfetch.HTTPResponse;
|
||||
import com.google.appengine.api.urlfetch.URLFetchService;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
|
||||
/**
|
||||
* Google App Engine version of {@link HttpCommandExecutorService} using their
|
||||
* fetchAsync call
|
||||
*/
|
||||
@Singleton
|
||||
public class AsyncGaeHttpCommandExecutorService extends BaseHttpCommandExecutorService<HTTPRequest> {
|
||||
// TODO: look up gae version
|
||||
public static final String USER_AGENT = String.format("jclouds/%s urlfetch/%s", JcloudsVersion.get(), "1.6.5");
|
||||
|
||||
private final URLFetchService urlFetchService;
|
||||
private final ConvertToGaeRequest convertToGaeRequest;
|
||||
private final ConvertToJcloudsResponse convertToJcloudsResponse;
|
||||
private final ListeningExecutorService ioExecutor;
|
||||
|
||||
@Inject
|
||||
public AsyncGaeHttpCommandExecutorService(URLFetchService urlFetchService, HttpUtils utils,
|
||||
ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
IOExceptionRetryHandler ioRetryHandler, DelegatingRetryHandler retryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire, ConvertToGaeRequest convertToGaeRequest,
|
||||
ConvertToJcloudsResponse convertToJcloudsResponse) {
|
||||
super(utils, contentMetadataCodec, ioExecutor, retryHandler, ioRetryHandler, errorHandler, wire);
|
||||
this.ioExecutor = ioExecutor;
|
||||
this.urlFetchService = urlFetchService;
|
||||
this.convertToGaeRequest = convertToGaeRequest;
|
||||
this.convertToJcloudsResponse = convertToJcloudsResponse;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected HttpResponse convert(HTTPResponse gaeResponse) {
|
||||
return convertToJcloudsResponse.apply(gaeResponse);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected HTTPRequest convert(HttpRequest request) throws IOException {
|
||||
return convertToGaeRequest.apply(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* nothing to clean up.
|
||||
*/
|
||||
@Override
|
||||
protected void cleanup(HTTPRequest nativeRequest) {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpResponse invoke(HTTPRequest request) throws IOException {
|
||||
return convert(urlFetchService.fetch(request));
|
||||
}
|
||||
|
||||
public HTTPRequest filterLogAndConvertRe(HttpRequest request) {
|
||||
for (HttpRequestFilter filter : request.getFilters()) {
|
||||
request = filter.filter(request);
|
||||
}
|
||||
checkRequestHasContentLengthOrChunkedEncoding(request,
|
||||
"After filtering, the request has neither chunked encoding nor content length: " + request);
|
||||
logger.debug("Sending request %s: %s", request.hashCode(), request.getRequestLine());
|
||||
wirePayloadIfEnabled(wire, request);
|
||||
HTTPRequest nativeRequest = convertToGaeRequest.apply(request);
|
||||
utils.logRequest(headerLog, request, ">>");
|
||||
return nativeRequest;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<HttpResponse> submit(final HttpCommand command) {
|
||||
HTTPRequest nativeRequest = filterLogAndConvertRe(command.getCurrentRequest());
|
||||
ListenableFuture<HttpResponse> response = transform(
|
||||
listenInPoolThread(urlFetchService.fetchAsync(nativeRequest)), convertToJcloudsResponse);
|
||||
|
||||
return transform(response, new Function<HttpResponse, HttpResponse>() {
|
||||
public HttpResponse apply(HttpResponse response) {
|
||||
return receiveResponse(command, response);
|
||||
}
|
||||
|
||||
}, ioExecutor);
|
||||
}
|
||||
|
||||
private HttpResponse receiveResponse(HttpCommand command, HttpResponse response) {
|
||||
try {
|
||||
logger.debug("Receiving response %s: %s", command.getCurrentRequest().hashCode(), response.getStatusLine());
|
||||
utils.logResponse(headerLog, response, "<<");
|
||||
if (response.getPayload() != null && wire.enabled())
|
||||
wire.input(response);
|
||||
int statusCode = response.getStatusCode();
|
||||
if (statusCode >= 300) {
|
||||
if (shouldContinue(command, response))
|
||||
return submit(command).get();
|
||||
else
|
||||
return response;
|
||||
}
|
||||
return response;
|
||||
} catch (Exception e) {
|
||||
IOException ioe = Throwables2.getFirstThrowableOfType(e, IOException.class);
|
||||
if (ioe != null && ioRetryHandler.shouldRetryRequest(command, ioe)) {
|
||||
try {
|
||||
return submit(command).get();
|
||||
} catch (Exception e1) {
|
||||
command.setException(e1);
|
||||
return response;
|
||||
}
|
||||
} else {
|
||||
command.setException(new HttpResponseException(e.getMessage() + " connecting to "
|
||||
+ command.getCurrentRequest().getRequestLine(), command, null, e));
|
||||
return response;
|
||||
}
|
||||
} finally {
|
||||
if (command.getException() != null)
|
||||
propagate(command.getException());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldContinue(HttpCommand command, HttpResponse response) {
|
||||
boolean shouldContinue = false;
|
||||
if (retryHandler.shouldRetryRequest(command, response)) {
|
||||
shouldContinue = true;
|
||||
} else {
|
||||
errorHandler.handleError(command, response);
|
||||
}
|
||||
return shouldContinue;
|
||||
}
|
||||
}
|
|
@ -19,10 +19,8 @@ package org.jclouds.gae;
|
|||
import java.io.IOException;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.JcloudsVersion;
|
||||
import org.jclouds.concurrent.SingleThreaded;
|
||||
import org.jclouds.http.HttpRequest;
|
||||
|
@ -39,7 +37,6 @@ import com.google.appengine.api.urlfetch.HTTPRequest;
|
|||
import com.google.appengine.api.urlfetch.HTTPResponse;
|
||||
import com.google.appengine.api.urlfetch.URLFetchService;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
|
||||
/**
|
||||
* Google App Engine version of {@link HttpCommandExecutorService}
|
||||
|
@ -57,11 +54,10 @@ public class GaeHttpCommandExecutorService extends BaseHttpCommandExecutorServic
|
|||
@Inject
|
||||
public GaeHttpCommandExecutorService(URLFetchService urlFetchService, HttpUtils utils,
|
||||
ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
IOExceptionRetryHandler ioRetryHandler, DelegatingRetryHandler retryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire, ConvertToGaeRequest convertToGaeRequest,
|
||||
ConvertToJcloudsResponse convertToJcloudsResponse) {
|
||||
super(utils, contentMetadataCodec, ioExecutor, retryHandler, ioRetryHandler, errorHandler, wire);
|
||||
super(utils, contentMetadataCodec, retryHandler, ioRetryHandler, errorHandler, wire);
|
||||
this.urlFetchService = urlFetchService;
|
||||
this.convertToGaeRequest = convertToGaeRequest;
|
||||
this.convertToJcloudsResponse = convertToJcloudsResponse;
|
||||
|
|
|
@ -1,57 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jclouds.gae.config;
|
||||
|
||||
import org.jclouds.concurrent.SingleThreaded;
|
||||
import org.jclouds.concurrent.config.ConfiguresExecutorService;
|
||||
import org.jclouds.gae.AsyncGaeHttpCommandExecutorService;
|
||||
import org.jclouds.http.HttpCommandExecutorService;
|
||||
import org.jclouds.http.config.ConfiguresHttpCommandExecutorService;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.Module;
|
||||
|
||||
/**
|
||||
* Configures {@link AsyncGaeHttpCommandExecutorService}.
|
||||
*/
|
||||
@ConfiguresHttpCommandExecutorService
|
||||
@ConfiguresExecutorService
|
||||
@SingleThreaded
|
||||
public class AsyncGoogleAppEngineConfigurationModule extends GoogleAppEngineConfigurationModule {
|
||||
public AsyncGoogleAppEngineConfigurationModule() {
|
||||
super();
|
||||
}
|
||||
|
||||
public AsyncGoogleAppEngineConfigurationModule(Module userExecutorModule) {
|
||||
super(userExecutorModule);
|
||||
}
|
||||
|
||||
/**
|
||||
* Used when you are creating multiple contexts in the same app.
|
||||
* @param memoizedCurrentRequestExecutorService
|
||||
* @see CurrentRequestExecutorServiceModule#memoizedCurrentRequestExecutorService
|
||||
*/
|
||||
public AsyncGoogleAppEngineConfigurationModule(Supplier<ListeningExecutorService> memoizedCurrentRequestExecutorService) {
|
||||
super(memoizedCurrentRequestExecutorService);
|
||||
}
|
||||
|
||||
protected void bindHttpCommandExecutorService() {
|
||||
bind(HttpCommandExecutorService.class).to(AsyncGaeHttpCommandExecutorService.class);
|
||||
}
|
||||
|
||||
}
|
|
@ -102,11 +102,4 @@ public class CurrentRequestExecutorServiceModule extends AbstractModule {
|
|||
protected ListeningExecutorService userExecutor() {
|
||||
return memoizedCurrentRequestExecutorService.get();
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS)
|
||||
protected ListeningExecutorService ioExecutor() {
|
||||
return memoizedCurrentRequestExecutorService.get();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ public class GoogleAppEngineConfigurationModule extends AbstractModule {
|
|||
private final Module userExecutorModule;
|
||||
|
||||
public GoogleAppEngineConfigurationModule() {
|
||||
this(new ExecutorServiceModule(sameThreadExecutor(), sameThreadExecutor()));
|
||||
this(new ExecutorServiceModule(sameThreadExecutor()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jclouds.gae.config;
|
||||
|
||||
import org.jclouds.concurrent.config.ConfiguresExecutorService;
|
||||
import org.jclouds.gae.AsyncGaeHttpCommandExecutorService;
|
||||
import org.jclouds.http.HttpCommandExecutorService;
|
||||
import org.jclouds.http.config.ConfiguresHttpCommandExecutorService;
|
||||
|
||||
import com.google.common.annotations.Beta;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
|
||||
/**
|
||||
* Configures {@link AsyncGaeHttpCommandExecutorService}.
|
||||
*/
|
||||
@Beta
|
||||
@ConfiguresHttpCommandExecutorService
|
||||
@ConfiguresExecutorService
|
||||
public class MultithreadedAsyncGoogleAppEngineConfigurationModule extends GoogleAppEngineConfigurationModule {
|
||||
public MultithreadedAsyncGoogleAppEngineConfigurationModule() {
|
||||
super(new CurrentRequestExecutorServiceModule());
|
||||
}
|
||||
|
||||
/**
|
||||
* Used when you are creating multiple contexts in the same app.
|
||||
*
|
||||
* @param currentRequestThreadFactory
|
||||
* @see CurrentRequestExecutorServiceModule#currentRequestThreadFactory
|
||||
*/
|
||||
public MultithreadedAsyncGoogleAppEngineConfigurationModule(ListeningExecutorService currentRequestThreadFactory) {
|
||||
super(new CurrentRequestExecutorServiceModule(currentRequestThreadFactory));
|
||||
}
|
||||
|
||||
/**
|
||||
* Used when you are creating multiple contexts in the same app.
|
||||
*
|
||||
* @param memoizedCurrentRequestExecutorService
|
||||
* @see CurrentRequestExecutorServiceModule#memoizedCurrentRequestExecutorService
|
||||
*/
|
||||
public MultithreadedAsyncGoogleAppEngineConfigurationModule(
|
||||
Supplier<ListeningExecutorService> memoizedCurrentRequestExecutorService) {
|
||||
super(memoizedCurrentRequestExecutorService);
|
||||
}
|
||||
|
||||
protected void bindHttpCommandExecutorService() {
|
||||
bind(HttpCommandExecutorService.class).to(AsyncGaeHttpCommandExecutorService.class);
|
||||
}
|
||||
|
||||
}
|
|
@ -39,7 +39,7 @@ import com.google.inject.Module;
|
|||
* Integration test for the URLFetchService
|
||||
*/
|
||||
@Test
|
||||
public class AsyncGaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpCommandExecutorServiceIntegrationTest {
|
||||
public class GaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpCommandExecutorServiceIntegrationTest {
|
||||
|
||||
@BeforeMethod
|
||||
public void setupApiProxy() {
|
||||
|
@ -82,7 +82,7 @@ public class AsyncGaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpC
|
|||
}
|
||||
|
||||
protected HttpCommandExecutorService providerHttpCommandExecutorService(Injector injector) {
|
||||
return injector.getInstance(AsyncGaeHttpCommandExecutorService.class);
|
||||
return injector.getInstance(GaeHttpCommandExecutorService.class);
|
||||
}
|
||||
|
||||
}
|
|
@ -27,7 +27,6 @@ import javax.inject.Singleton;
|
|||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.http.HttpRequest;
|
||||
import org.jclouds.http.HttpUtils;
|
||||
import org.jclouds.http.IOExceptionRetryHandler;
|
||||
|
@ -40,7 +39,6 @@ import org.jclouds.io.ContentMetadataCodec;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.net.HttpHeaders;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.Inject;
|
||||
import com.squareup.okhttp.OkHttpClient;
|
||||
|
||||
|
@ -53,12 +51,11 @@ public class OkHttpCommandExecutorService extends JavaUrlHttpCommandExecutorServ
|
|||
|
||||
@Inject
|
||||
public OkHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire, @Named("untrusted") HostnameVerifier verifier,
|
||||
@Named("untrusted") Supplier<SSLContext> untrustedSSLContextProvider, Function<URI, Proxy> proxyForURI)
|
||||
throws SecurityException, NoSuchFieldException {
|
||||
super(utils, contentMetadataCodec, ioExecutor, retryHandler, ioRetryHandler, errorHandler, wire, verifier,
|
||||
super(utils, contentMetadataCodec, retryHandler, ioRetryHandler, errorHandler, wire, verifier,
|
||||
untrustedSSLContextProvider, proxyForURI);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.jclouds.http.okhttp;
|
||||
|
||||
import static com.google.common.io.Closeables.close;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_CONTEXT;
|
||||
import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_HOST;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
|
@ -56,8 +55,6 @@ public class OkHttpCommandExecutorServiceTest extends BaseHttpCommandExecutorSer
|
|||
protected void addOverrideProperties(final Properties props) {
|
||||
props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_CONTEXT, 50 + "");
|
||||
props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_HOST, 0 + "");
|
||||
// IO workers not used in this executor
|
||||
props.setProperty(PROPERTY_IO_WORKER_THREADS, 0 + "");
|
||||
props.setProperty(PROPERTY_USER_THREADS, 5 + "");
|
||||
}
|
||||
|
||||
|
|
|
@ -66,9 +66,9 @@ public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStra
|
|||
static final int DEFAULT_MIN_RETRIES = 5;
|
||||
@VisibleForTesting
|
||||
static final int DEFAULT_MAX_PERCENT_RETRIES = 10;
|
||||
|
||||
private final ListeningExecutorService ioExecutor;
|
||||
|
||||
|
||||
private final ListeningExecutorService executor;
|
||||
|
||||
@Inject(optional = true)
|
||||
@Named("jclouds.mpu.parallel.degree")
|
||||
@VisibleForTesting
|
||||
|
@ -96,10 +96,10 @@ public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStra
|
|||
|
||||
@Inject
|
||||
public ParallelMultipartUploadStrategy(AWSS3BlobStore blobstore, PayloadSlicer slicer,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor) {
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService executor) {
|
||||
this.blobstore = checkNotNull(blobstore, "blobstore");
|
||||
this.slicer = checkNotNull(slicer, "slicer");
|
||||
this.ioExecutor = checkNotNull(ioExecutor, "ioExecutor");
|
||||
this.executor = checkNotNull(executor, "executor");
|
||||
}
|
||||
|
||||
protected void prepareUploadPart(final String container, final String key,
|
||||
|
@ -118,7 +118,7 @@ public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStra
|
|||
final Payload chunkedPart = slicer.slice(payload, offset, size);
|
||||
logger.debug(String.format("async uploading part %s of %s to container %s with uploadId %s", part, key, container, uploadId));
|
||||
final long start = System.currentTimeMillis();
|
||||
final ListenableFuture<String> futureETag = ioExecutor.submit(new Callable<String>() {
|
||||
final ListenableFuture<String> futureETag = executor.submit(new Callable<String>() {
|
||||
@Override public String call() throws Exception {
|
||||
return client.uploadPart(container, key, part, uploadId, chunkedPart);
|
||||
}
|
||||
|
@ -148,13 +148,13 @@ public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStra
|
|||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}, ioExecutor);
|
||||
}, executor);
|
||||
futureParts.put(part, futureETag);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<String> execute(final String container, final Blob blob, final PutOptions options) {
|
||||
return ioExecutor.submit(new Callable<String>() {
|
||||
return executor.submit(new Callable<String>() {
|
||||
@Override
|
||||
public String call() throws Exception {
|
||||
String key = blob.getMetadata().getName();
|
||||
|
@ -242,7 +242,7 @@ public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStra
|
|||
// recursively call this execute method again; instead mark as not multipart
|
||||
// because it can all fit in one go.
|
||||
final PutOptions nonMultipartOptions = PutOptions.Builder.multipart(false);
|
||||
ListenableFuture<String> futureETag = ioExecutor.submit(new Callable<String>() {
|
||||
ListenableFuture<String> futureETag = executor.submit(new Callable<String>() {
|
||||
@Override public String call() throws Exception {
|
||||
return blobstore.putBlob(container, blob, nonMultipartOptions);
|
||||
}
|
||||
|
|
|
@ -127,8 +127,8 @@ public class SequentialMultipartUploadStrategyMockTest {
|
|||
}
|
||||
}
|
||||
|
||||
private static final Set<Module> modules = ImmutableSet.<Module> of(new ExecutorServiceModule(sameThreadExecutor(),
|
||||
sameThreadExecutor()));
|
||||
private static final Set<Module> modules = ImmutableSet.<Module>of(
|
||||
new ExecutorServiceModule(sameThreadExecutor()));
|
||||
|
||||
static SequentialMultipartUploadStrategy mockSequentialMultipartUploadStrategy(String uri, int partSize) {
|
||||
Properties overrides = new Properties();
|
||||
|
|
|
@ -30,7 +30,6 @@ import javax.inject.Singleton;
|
|||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.concurrent.SingleThreaded;
|
||||
import org.jclouds.dynect.v3.DynECTApi;
|
||||
import org.jclouds.dynect.v3.features.SessionApi;
|
||||
|
@ -56,7 +55,6 @@ import org.jclouds.rest.config.HttpApiModule;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
|
||||
/**
|
||||
* Configures the DynECT connection.
|
||||
|
@ -101,12 +99,11 @@ public class DynECTHttpApiModule extends HttpApiModule<DynECTApi> {
|
|||
|
||||
@Inject
|
||||
private SillyRabbit200sAreForSuccess(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire, @Named("untrusted") HostnameVerifier verifier,
|
||||
@Named("untrusted") Supplier<SSLContext> untrustedSSLContextProvider, Function<URI, Proxy> proxyForURI)
|
||||
throws SecurityException, NoSuchFieldException {
|
||||
super(utils, contentMetadataCodec, ioExecutor, retryHandler, ioRetryHandler, errorHandler, wire, verifier,
|
||||
super(utils, contentMetadataCodec, retryHandler, ioRetryHandler, errorHandler, wire, verifier,
|
||||
untrustedSSLContextProvider, proxyForURI);
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ import com.squareup.okhttp.mockwebserver.MockWebServer;
|
|||
public class DynectApiMockTest {
|
||||
|
||||
private static final Set<Module> modules = ImmutableSet.<Module> of(
|
||||
new ExecutorServiceModule(sameThreadExecutor(), sameThreadExecutor()));
|
||||
new ExecutorServiceModule(sameThreadExecutor()));
|
||||
|
||||
static DynECTApi mockDynectApi(String uri) {
|
||||
Properties overrides = new Properties();
|
||||
|
|
|
@ -46,7 +46,7 @@ public class BaseHPCloudObjectStorageMockTest {
|
|||
.credentials("jclouds:joe", "letmein") //
|
||||
.endpoint(uri) //
|
||||
.overrides(overrides) //
|
||||
.modules(ImmutableSet.<Module> of(new ExecutorServiceModule(sameThreadExecutor(), sameThreadExecutor()))) //
|
||||
.modules(ImmutableSet.<Module> of(new ExecutorServiceModule(sameThreadExecutor()))) //
|
||||
.buildApi(HPCloudObjectStorageApi.class);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue