issue 66: refactored http clients to not be bound to a single endpoint such that redirects can be assigned to another host

git-svn-id: http://jclouds.googlecode.com/svn/trunk@1458 3d8758e0-26b5-11de-8745-db77d3ebf521
This commit is contained in:
adrian.f.cole 2009-06-24 18:27:39 +00:00
parent 00cc4c4c4e
commit 78d0b7afd3
10 changed files with 427 additions and 632 deletions

View File

@ -23,57 +23,131 @@
*/
package org.jclouds.http.httpnio.config;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.jclouds.http.HttpConstants;
import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntity;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.entity.BufferingNHttpEntity;
import org.apache.http.nio.protocol.AsyncNHttpClientHandler;
import org.apache.http.nio.protocol.NHttpRequestExecutionHandler;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.util.ByteBufferAllocator;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.BasicHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
import org.jclouds.command.pool.PoolConstants;
import org.jclouds.command.pool.config.FutureCommandConnectionPoolClientModule;
import org.jclouds.http.HttpFutureCommand;
import org.jclouds.http.HttpFutureCommandClient;
import org.jclouds.http.config.HttpFutureCommandClientModule;
import org.jclouds.http.httpnio.config.internal.NonSSLHttpNioConnectionPoolClientModule;
import org.jclouds.http.httpnio.config.internal.SSLHttpNioConnectionPoolClientModule;
import org.jclouds.http.httpnio.pool.HttpNioConnectionPoolClient;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionPool;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandExecutionHandler;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.FactoryProvider;
import com.google.inject.name.Named;
/**
* Configures {@link HttpNioConnectionPoolClient}
*
* @author Adrian Cole
*/
@HttpFutureCommandClientModule
public class HttpNioConnectionPoolClientModule extends AbstractModule {
public class HttpNioConnectionPoolClientModule extends
FutureCommandConnectionPoolClientModule<NHttpConnection> {
@Named(HttpConstants.PROPERTY_HTTP_SECURE)
boolean isSecure;
@Provides
// @Singleton per uri...
public AsyncNHttpClientHandler provideAsyncNttpClientHandler(BasicHttpProcessor httpProcessor,
NHttpRequestExecutionHandler execHandler, ConnectionReuseStrategy connStrategy,
ByteBufferAllocator allocator, HttpParams params) {
return new AsyncNHttpClientHandler(httpProcessor, execHandler, connStrategy, allocator,
params);
}
@Provides
@Singleton
public BasicHttpProcessor provideClientProcessor() {
BasicHttpProcessor httpproc = new BasicHttpProcessor();
httpproc.addInterceptor(new RequestContent());
httpproc.addInterceptor(new RequestTargetHost());
httpproc.addInterceptor(new RequestConnControl());
httpproc.addInterceptor(new RequestUserAgent());
httpproc.addInterceptor(new RequestExpectContinue());
return httpproc;
}
@Provides
@Singleton
public HttpParams provideHttpParams() {
HttpParams params = new BasicHttpParams();
params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000).setIntParameter(
CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024).setBooleanParameter(
CoreConnectionPNames.STALE_CONNECTION_CHECK, false).setBooleanParameter(
CoreConnectionPNames.TCP_NODELAY, true).setParameter(
CoreProtocolPNames.ORIGIN_SERVER, "jclouds/1.0");
return params;
}
protected void configure() {
super.configure();
bind(HttpFutureCommandClient.class).to(HttpNioConnectionPoolClient.class);
bind(new TypeLiteral<BlockingQueue<HttpFutureCommand<?>>>() {
}).to(new TypeLiteral<LinkedBlockingQueue<HttpFutureCommand<?>>>() {
}).in(Scopes.SINGLETON);
bind(HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class).toProvider(
FactoryProvider.newFactory(
HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class,
InjectableBufferingNHttpEntity.class));// .in(Scopes.SINGLETON); but per URI
bind(NHttpRequestExecutionHandler.class).to(HttpNioFutureCommandExecutionHandler.class).in(
Scopes.SINGLETON);
bind(ConnectionReuseStrategy.class).to(DefaultConnectionReuseStrategy.class).in(
Scopes.SINGLETON);
bind(ByteBufferAllocator.class).to(HeapByteBufferAllocator.class);
bind(HttpNioFutureCommandConnectionPool.Factory.class).toProvider(
FactoryProvider.newFactory(
new TypeLiteral<HttpNioFutureCommandConnectionPool.Factory>() {
}, new TypeLiteral<HttpNioFutureCommandConnectionPool>() {
}));
}
static class InjectableBufferingNHttpEntity extends BufferingNHttpEntity {
@Inject
public InjectableBufferingNHttpEntity(@Assisted HttpEntity httpEntity,
ByteBufferAllocator allocator) {
super(httpEntity, allocator);
}
}
@Override
protected void configure() {
requestInjection(this);
if (isSecure)
install(new SSLHttpNioConnectionPoolClientModule());
else
install(new NonSSLHttpNioConnectionPoolClientModule());
bind(HttpFutureCommandClient.class).to(HttpNioConnectionPoolClient.class);
public BlockingQueue<NHttpConnection> provideAvailablePool(
@Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS) int max) throws Exception {
return new ArrayBlockingQueue<NHttpConnection>(max, true);
}
@Singleton
@Provides
protected InetSocketAddress provideAddress(URI endPoint) {
return new InetSocketAddress(endPoint.getHost(), endPoint.getPort());
// @Singleton per uri...
public DefaultConnectingIOReactor provideDefaultConnectingIOReactor(
@Named(PoolConstants.PROPERTY_POOL_IO_WORKER_THREADS) int ioWorkerThreads,
HttpParams params) throws IOReactorException {
return new DefaultConnectingIOReactor(ioWorkerThreads, params);
}
@Singleton
@Provides
protected URI provideAddress(@Named(HttpConstants.PROPERTY_HTTP_ADDRESS) String address,
@Named(HttpConstants.PROPERTY_HTTP_PORT) int port,
@Named(HttpConstants.PROPERTY_HTTP_SECURE) boolean isSecure)
throws MalformedURLException {
return URI.create(String.format("%1$s://%2$s:%3$s", isSecure ? "https" : "http", address,
port));
}
}

View File

@ -1,172 +0,0 @@
/**
*
* Copyright (C) 2009 Global Cloud Specialists, Inc. <info@globalcloudspecialists.com>
*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*/
package org.jclouds.http.httpnio.config.internal;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntity;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.entity.BufferingNHttpEntity;
import org.apache.http.nio.protocol.AsyncNHttpClientHandler;
import org.apache.http.nio.protocol.NHttpRequestExecutionHandler;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.util.ByteBufferAllocator;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.BasicHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
import org.jclouds.command.pool.PoolConstants;
import org.jclouds.command.pool.config.FutureCommandConnectionPoolClientModule;
import org.jclouds.http.HttpFutureCommand;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionHandle;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionPool;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandExecutionHandler;
import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.FactoryProvider;
import com.google.inject.name.Named;
/**
* // TODO: Adrian: Document this!
*
* @author Adrian Cole
*/
public abstract class BaseHttpNioConnectionPoolClientModule extends
FutureCommandConnectionPoolClientModule<NHttpConnection> {
@Provides
@Singleton
public AsyncNHttpClientHandler provideAsyncNttpClientHandler(
BasicHttpProcessor httpProcessor,
NHttpRequestExecutionHandler execHandler,
ConnectionReuseStrategy connStrategy,
ByteBufferAllocator allocator, HttpParams params) {
return new AsyncNHttpClientHandler(httpProcessor, execHandler,
connStrategy, allocator, params);
}
@Provides
@Singleton
public BasicHttpProcessor provideClientProcessor() {
BasicHttpProcessor httpproc = new BasicHttpProcessor();
httpproc.addInterceptor(new RequestContent());
httpproc.addInterceptor(new RequestTargetHost());
httpproc.addInterceptor(new RequestConnControl());
httpproc.addInterceptor(new RequestUserAgent());
httpproc.addInterceptor(new RequestExpectContinue());
return httpproc;
}
@Provides
@Singleton
public HttpParams provideHttpParams() {
HttpParams params = new BasicHttpParams();
params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE,
8 * 1024).setBooleanParameter(
CoreConnectionPNames.STALE_CONNECTION_CHECK, false)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "jclouds/1.0");
return params;
}
protected void configure() {
super.configure();
bind(new TypeLiteral<BlockingQueue<HttpFutureCommand<?>>>() {
}).to(new TypeLiteral<LinkedBlockingQueue<HttpFutureCommand<?>>>() {
}).in(Scopes.SINGLETON);
bind(
HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class)
.toProvider(
FactoryProvider
.newFactory(
HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class,
InjectableBufferingNHttpEntity.class))
.in(Scopes.SINGLETON);
bind(NHttpRequestExecutionHandler.class).to(
HttpNioFutureCommandExecutionHandler.class)
.in(Scopes.SINGLETON);
bind(ConnectionReuseStrategy.class).to(
DefaultConnectionReuseStrategy.class).in(Scopes.SINGLETON);
bind(ByteBufferAllocator.class).to(HeapByteBufferAllocator.class);
bind(
HttpNioFutureCommandConnectionPool.FutureCommandConnectionHandleFactory.class)
.toProvider(
FactoryProvider
.newFactory(
new TypeLiteral<HttpNioFutureCommandConnectionPool.FutureCommandConnectionHandleFactory>() {
},
new TypeLiteral<HttpNioFutureCommandConnectionHandle>() {
}));
}
static class InjectableBufferingNHttpEntity extends BufferingNHttpEntity {
@Inject
public InjectableBufferingNHttpEntity(@Assisted HttpEntity httpEntity,
ByteBufferAllocator allocator) {
super(httpEntity, allocator);
}
}
@Override
public BlockingQueue<NHttpConnection> provideAvailablePool(
@Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS) int max)
throws Exception {
return new ArrayBlockingQueue<NHttpConnection>(max, true);
}
@Provides
@Singleton
public abstract IOEventDispatch provideClientEventDispatch(
AsyncNHttpClientHandler handler, HttpParams params)
throws Exception;
@Provides
@Singleton
public DefaultConnectingIOReactor provideDefaultConnectingIOReactor(
@Named(PoolConstants.PROPERTY_POOL_IO_WORKER_THREADS) int ioWorkerThreads,
HttpParams params) throws IOReactorException {
return new DefaultConnectingIOReactor(ioWorkerThreads, params);
}
}

View File

@ -1,43 +0,0 @@
/**
*
* Copyright (C) 2009 Global Cloud Specialists, Inc. <info@globalcloudspecialists.com>
*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*/
package org.jclouds.http.httpnio.config.internal;
import org.apache.http.impl.nio.DefaultClientIOEventDispatch;
import org.apache.http.nio.protocol.AsyncNHttpClientHandler;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.params.HttpParams;
import org.jclouds.http.httpnio.config.internal.BaseHttpNioConnectionPoolClientModule;
/**
* // TODO: Adrian: Document this!
*
* @author Adrian Cole
*/
public class NonSSLHttpNioConnectionPoolClientModule extends BaseHttpNioConnectionPoolClientModule {
public IOEventDispatch provideClientEventDispatch(AsyncNHttpClientHandler handler, HttpParams params) throws Exception {
return new DefaultClientIOEventDispatch(
handler,
params);
}
}

View File

@ -1,56 +0,0 @@
/**
*
* Copyright (C) 2009 Global Cloud Specialists, Inc. <info@globalcloudspecialists.com>
*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*/
package org.jclouds.http.httpnio.config.internal;
import org.apache.http.impl.nio.SSLClientIOEventDispatch;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.protocol.AsyncNHttpClientHandler;
import org.apache.http.params.HttpParams;
import org.jclouds.http.httpnio.config.internal.BaseHttpNioConnectionPoolClientModule;
import javax.net.ssl.SSLContext;
/**
* // TODO: Adrian: Document this!
*
* @author Adrian Cole
*/
public class SSLHttpNioConnectionPoolClientModule extends BaseHttpNioConnectionPoolClientModule {
protected void configure() {
super.configure();
}
// note until a bug is fixed, you cannot annotate overriding methods with google annotations
// http://code.google.com/p/google-guice/issues/detail?id=347
@Override
public IOEventDispatch provideClientEventDispatch(AsyncNHttpClientHandler handler, HttpParams params) throws Exception {
SSLContext context = SSLContext.getInstance("TLS");
context.init(null, null, null);
return new SSLClientIOEventDispatch(
handler,
context,
params);
}
}

View File

@ -23,59 +23,61 @@
*/
package org.jclouds.http.httpnio.pool;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.http.nio.NHttpConnection;
import org.jclouds.command.pool.FutureCommandConnectionPoolClient;
import org.jclouds.http.*;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import org.apache.http.nio.NHttpConnection;
import org.jclouds.command.pool.FutureCommandConnectionPoolClient;
import org.jclouds.http.HttpException;
import org.jclouds.http.HttpFutureCommand;
import org.jclouds.http.HttpFutureCommandClient;
import org.jclouds.http.HttpRequest;
import org.jclouds.http.HttpRequestFilter;
import com.google.inject.Inject;
import com.google.inject.Singleton;
/**
* // TODO: Adrian: Document this!
*
* @author Adrian Cole
*/
@Singleton
public class HttpNioConnectionPoolClient
extends
FutureCommandConnectionPoolClient<NHttpConnection, HttpFutureCommand<?>>
implements HttpFutureCommandClient {
private List<HttpRequestFilter> requestFilters = Collections.emptyList();
public class HttpNioConnectionPoolClient extends
FutureCommandConnectionPoolClient<URI, NHttpConnection, HttpFutureCommand<?>> implements
HttpFutureCommandClient {
public List<HttpRequestFilter> getRequestFilters() {
return requestFilters;
}
private List<HttpRequestFilter> requestFilters = Collections.emptyList();
@Inject(optional = true)
public void setRequestFilters(List<HttpRequestFilter> requestFilters) {
this.requestFilters = requestFilters;
}
public List<HttpRequestFilter> getRequestFilters() {
return requestFilters;
}
@Override
protected void invoke(HttpFutureCommand<?> command) {
HttpRequest request = (HttpRequest) command.getRequest();
try {
for (HttpRequestFilter filter : getRequestFilters()) {
filter.filter(request);
}
super.invoke(command);
} catch (HttpException e) {
command.setException(e);
}
}
@Inject(optional = true)
public void setRequestFilters(List<HttpRequestFilter> requestFilters) {
this.requestFilters = requestFilters;
}
@Inject
public HttpNioConnectionPoolClient(
ExecutorService executor,
HttpNioFutureCommandConnectionPool httpFutureCommandConnectionHandleNHttpConnectionNioFutureCommandConnectionPool,
@Override
protected void invoke(HttpFutureCommand<?> command) {
HttpRequest request = (HttpRequest) command.getRequest();
try {
for (HttpRequestFilter filter : getRequestFilters()) {
filter.filter(request);
}
super.invoke(command);
} catch (HttpException e) {
command.setException(e);
}
}
@Inject
public HttpNioConnectionPoolClient(ExecutorService executor,
HttpNioFutureCommandConnectionPool.Factory poolFactory,
BlockingQueue<HttpFutureCommand<?>> commandQueue) {
super(
executor,
httpFutureCommandConnectionHandleNHttpConnectionNioFutureCommandConnectionPool,
commandQueue);
}
super(executor, poolFactory, commandQueue);
}
}

View File

@ -24,6 +24,7 @@
package org.jclouds.http.httpnio.pool;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
@ -31,7 +32,6 @@ import org.apache.http.nio.NHttpConnection;
import org.jclouds.command.pool.FutureCommandConnectionHandle;
import org.jclouds.http.HttpFutureCommand;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
/**
@ -40,25 +40,24 @@ import com.google.inject.assistedinject.Assisted;
* @author Adrian Cole
*/
public class HttpNioFutureCommandConnectionHandle extends
FutureCommandConnectionHandle<NHttpConnection, HttpFutureCommand<?>> {
FutureCommandConnectionHandle<URI, NHttpConnection, HttpFutureCommand<?>> {
@Inject
public HttpNioFutureCommandConnectionHandle(
BlockingQueue<NHttpConnection> available, Semaphore maxConnections,
@Assisted NHttpConnection conn,
@Assisted HttpFutureCommand<?> command) throws InterruptedException {
super(maxConnections, command, conn, available);
// currently not injected as we want to ensure we share the correct objects with the pool
public HttpNioFutureCommandConnectionHandle(Semaphore maxConnections,
BlockingQueue<NHttpConnection> available, @Assisted URI endPoint,
@Assisted HttpFutureCommand<?> command, @Assisted NHttpConnection conn)
throws InterruptedException {
super(maxConnections, available, endPoint, command, conn);
}
}
public void startConnection() {
conn.getContext().setAttribute("command", command);
logger.trace("invoking %1$s on connection %2$s", command, conn);
conn.requestOutput();
}
public void startConnection() {
conn.getContext().setAttribute("command", command);
logger.trace("invoking %1$s on connection %2$s", command, conn);
conn.requestOutput();
}
public void shutdownConnection() throws IOException {
conn.shutdown();
}
public void shutdownConnection() throws IOException {
conn.shutdown();
}
}

View File

@ -23,9 +23,20 @@
*/
package org.jclouds.http.httpnio.pool;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import org.apache.http.HttpException;
import org.apache.http.impl.nio.DefaultClientIOEventDispatch;
import org.apache.http.impl.nio.SSLClientIOEventDispatch;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.protocol.AsyncNHttpClientHandler;
@ -34,15 +45,16 @@ import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorStatus;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.nio.reactor.SessionRequestCallback;
import org.apache.http.params.HttpParams;
import org.jclouds.command.FutureCommand;
import org.jclouds.command.pool.FutureCommandConnectionHandle;
import org.jclouds.command.pool.FutureCommandConnectionPool;
import org.jclouds.command.pool.PoolConstants;
import org.jclouds.http.HttpFutureCommand;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.*;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.name.Named;
/**
* Connection Pool for HTTP requests that utilizes Apache HTTPNio
@ -50,238 +62,236 @@ import java.util.concurrent.*;
* @author Adrian Cole
*/
public class HttpNioFutureCommandConnectionPool extends
FutureCommandConnectionPool<NHttpConnection, HttpFutureCommand<?>>
implements EventListener {
FutureCommandConnectionPool<URI, NHttpConnection, HttpFutureCommand<?>> implements
EventListener {
private final NHttpClientConnectionPoolSessionRequestCallback sessionCallback;
private final DefaultConnectingIOReactor ioReactor;
private final IOEventDispatch dispatch;
private final InetSocketAddress target;
private final int maxSessionFailures;
private final NHttpClientConnectionPoolSessionRequestCallback sessionCallback;
private final DefaultConnectingIOReactor ioReactor;
private final IOEventDispatch dispatch;
private final InetSocketAddress target;
private final int maxSessionFailures;
@Inject
public HttpNioFutureCommandConnectionPool(
ExecutorService executor,
Semaphore allConnections,
public static interface Factory extends
FutureCommandConnectionPool.Factory<URI, NHttpConnection, HttpFutureCommand<?>> {
HttpNioFutureCommandConnectionPool create(URI endPoint);
}
@Inject
public HttpNioFutureCommandConnectionPool(ExecutorService executor, Semaphore allConnections,
BlockingQueue<HttpFutureCommand<?>> commandQueue,
BlockingQueue<NHttpConnection> available,
AsyncNHttpClientHandler clientHandler,
DefaultConnectingIOReactor ioReactor,
IOEventDispatch dispatch,
FutureCommandConnectionHandleFactory requestHandleFactory,
InetSocketAddress target,
BlockingQueue<NHttpConnection> available, AsyncNHttpClientHandler clientHandler,
DefaultConnectingIOReactor ioReactor, HttpParams params,
@Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTION_REUSE) int maxConnectionReuse,
@Named(PoolConstants.PROPERTY_POOL_MAX_SESSION_FAILURES) int maxSessionFailures) {
super(executor, allConnections, commandQueue, requestHandleFactory,
maxConnectionReuse, available);
this.ioReactor = ioReactor;
this.dispatch = dispatch;
this.target = target;
this.maxSessionFailures = maxSessionFailures;
this.sessionCallback = new NHttpClientConnectionPoolSessionRequestCallback();
clientHandler.setEventListener(this);
}
@Named(PoolConstants.PROPERTY_POOL_MAX_SESSION_FAILURES) int maxSessionFailures,
@Assisted URI endPoint) throws Exception {
super(executor, allConnections, commandQueue, maxConnectionReuse, available, endPoint);
this.ioReactor = ioReactor;
this.dispatch = endPoint.getScheme().equals("https") ? provideSSLClientEventDispatch(
clientHandler, params) : provideClientEventDispatch(clientHandler, params);
this.maxSessionFailures = maxSessionFailures;
this.sessionCallback = new NHttpClientConnectionPoolSessionRequestCallback();
this.target = new InetSocketAddress(getEndPoint().getHost(), getEndPoint().getPort());
clientHandler.setEventListener(this);
}
@Override
public void start() {
synchronized (this.statusLock) {
if (this.status.compareTo(Status.INACTIVE) == 0) {
executor.execute(new Runnable() {
public void run() {
try {
ioReactor.execute(dispatch);
} catch (IOException e) {
exception.set(e);
logger.error(e, "Error dispatching %1$s", dispatch);
status = Status.SHUTDOWN_REQUEST;
}
}
});
}
super.start();
}
}
public static IOEventDispatch provideSSLClientEventDispatch(AsyncNHttpClientHandler handler,
HttpParams params) throws Exception {
SSLContext context = SSLContext.getInstance("TLS");
context.init(null, null, null);
return new SSLClientIOEventDispatch(handler, context, params);
}
public void shutdownReactor(long waitMs) {
try {
this.ioReactor.shutdown(waitMs);
} catch (IOException e) {
logger.error(e, "Error shutting down reactor");
}
}
public static IOEventDispatch provideClientEventDispatch(AsyncNHttpClientHandler handler,
HttpParams params) throws Exception {
return new DefaultClientIOEventDispatch(handler, params);
}
@Override
public boolean connectionValid(NHttpConnection conn) {
return conn.isOpen() && !conn.isStale()
&& conn.getMetrics().getRequestCount() < maxConnectionReuse;
}
@Override
public void start() {
synchronized (this.statusLock) {
if (this.status.compareTo(Status.INACTIVE) == 0) {
executor.execute(new Runnable() {
public void run() {
try {
ioReactor.execute(dispatch);
} catch (IOException e) {
exception.set(e);
logger.error(e, "Error dispatching %1$s", dispatch);
status = Status.SHUTDOWN_REQUEST;
}
}
});
}
super.start();
}
}
@Override
public void shutdownConnection(NHttpConnection conn) {
if (conn.getMetrics().getRequestCount() >= maxConnectionReuse)
logger.debug(
"%1$s - %2$d - closing connection due to overuse %1$s/%2$s",
conn, conn.hashCode(), conn.getMetrics().getRequestCount(),
maxConnectionReuse);
if (conn.getStatus() == NHttpConnection.ACTIVE) {
try {
conn.shutdown();
} catch (IOException e) {
logger.error(e, "Error shutting down connection");
}
}
}
public void shutdownReactor(long waitMs) {
try {
this.ioReactor.shutdown(waitMs);
} catch (IOException e) {
logger.error(e, "Error shutting down reactor");
}
}
@Override
protected void doWork() throws Exception {
createNewConnection();
}
@Override
public boolean connectionValid(NHttpConnection conn) {
return conn.isOpen() && !conn.isStale()
&& conn.getMetrics().getRequestCount() < maxConnectionReuse;
}
@Override
protected void doShutdown() {
// Give the I/O reactor 1 sec to shut down
shutdownReactor(1000);
assert this.ioReactor.getStatus().equals(IOReactorStatus.SHUT_DOWN) : "incorrect status after io reactor shutdown :"
+ this.ioReactor.getStatus();
}
@Override
public void shutdownConnection(NHttpConnection conn) {
if (conn.getMetrics().getRequestCount() >= maxConnectionReuse)
logger.debug("%1$s - %2$d - closing connection due to overuse %1$s/%2$s", conn, conn
.hashCode(), conn.getMetrics().getRequestCount(), maxConnectionReuse);
if (conn.getStatus() == NHttpConnection.ACTIVE) {
try {
conn.shutdown();
} catch (IOException e) {
logger.error(e, "Error shutting down connection");
}
}
}
@Override
protected void createNewConnection() throws InterruptedException {
boolean acquired = allConnections.tryAcquire(1, TimeUnit.SECONDS);
if (acquired) {
if (shouldDoWork()) {
logger.debug("%1$s - opening new connection", target);
ioReactor.connect(target, null, null, sessionCallback);
} else {
allConnections.release();
}
}
}
@Override
protected void doWork() throws Exception {
createNewConnection();
}
@Override
protected void associateHandleWithConnection(
FutureCommandConnectionHandle<NHttpConnection, HttpFutureCommand<?>> handle,
NHttpConnection connection) {
connection.getContext().setAttribute("command-handle", handle);
}
@Override
protected void doShutdown() {
// Give the I/O reactor 1 sec to shut down
shutdownReactor(1000);
assert this.ioReactor.getStatus().equals(IOReactorStatus.SHUT_DOWN) : "incorrect status after io reactor shutdown :"
+ this.ioReactor.getStatus();
}
@Override
protected HttpNioFutureCommandConnectionHandle getHandleFromConnection(
NHttpConnection connection) {
return (HttpNioFutureCommandConnectionHandle) connection.getContext()
.getAttribute("command-handle");
}
class NHttpClientConnectionPoolSessionRequestCallback implements
SessionRequestCallback {
public void completed(SessionRequest request) {
logger.trace("%1$s->%2$s[%3$s] - SessionRequest complete", request
.getLocalAddress(), request.getRemoteAddress(), request
.getAttachment());
}
public void cancelled(SessionRequest request) {
logger.trace("%1$s->%2$s[%3$s] - SessionRequest cancelled", request
.getLocalAddress(), request.getRemoteAddress(), request
.getAttachment());
releaseConnectionAndCancelResponse(request);
}
private void releaseConnectionAndCancelResponse(SessionRequest request) {
@Override
protected void createNewConnection() throws InterruptedException {
boolean acquired = allConnections.tryAcquire(1, TimeUnit.SECONDS);
if (acquired) {
if (shouldDoWork()) {
logger.debug("%1$s - opening new connection", target);
ioReactor.connect(target, null, null, sessionCallback);
} else {
allConnections.release();
FutureCommand<?, ?, ?> frequest = (FutureCommand<?, ?, ?>) request
.getAttachment();
if (frequest != null) {
logger.error("%1$s->%2$s[%3$s] - Cancelling FutureCommand",
request.getLocalAddress(), request.getRemoteAddress(),
frequest);
frequest.cancel(true);
}
}
}
}
}
private void releaseConnectionAndSetResponseException(
SessionRequest request, Exception e) {
allConnections.release();
HttpFutureCommand<?> frequest = (HttpFutureCommand<?>) request
.getAttachment();
if (frequest != null) {
logger.error(e,
"%1$s->%2$s[%3$s] - Setting Exception on FutureCommand",
request.getLocalAddress(), request.getRemoteAddress(),
frequest);
frequest.setException(e);
}
}
@Override
protected void associateHandleWithConnection(
FutureCommandConnectionHandle<URI, NHttpConnection, HttpFutureCommand<?>> handle,
NHttpConnection connection) {
connection.getContext().setAttribute("command-handle", handle);
}
public void failed(SessionRequest request) {
int count = currentSessionFailures.getAndIncrement();
logger.warn("%1$s->%2$s[%3$s] - SessionRequest failed", request
.getLocalAddress(), request.getRemoteAddress(), request
.getAttachment());
releaseConnectionAndSetResponseException(request, request
.getException());
if (count >= maxSessionFailures) {
logger
.error(
request.getException(),
"%1$s->%2$s[%3$s] - SessionRequest failures: %4$s, Disabling pool for %5$s",
request.getLocalAddress(), request
.getRemoteAddress(),
maxSessionFailures, target);
exception.set(request.getException());
}
@Override
protected HttpNioFutureCommandConnectionHandle getHandleFromConnection(NHttpConnection connection) {
return (HttpNioFutureCommandConnectionHandle) connection.getContext().getAttribute(
"command-handle");
}
}
class NHttpClientConnectionPoolSessionRequestCallback implements SessionRequestCallback {
public void timeout(SessionRequest request) {
logger.warn("%1$s->%2$s[%3$s] - SessionRequest timeout", request
.getLocalAddress(), request.getRemoteAddress(), request
.getAttachment());
releaseConnectionAndCancelResponse(request);
}
public void completed(SessionRequest request) {
logger.trace("%1$s->%2$s[%3$s] - SessionRequest complete", request.getLocalAddress(),
request.getRemoteAddress(), request.getAttachment());
}
}
public void cancelled(SessionRequest request) {
logger.trace("%1$s->%2$s[%3$s] - SessionRequest cancelled", request.getLocalAddress(),
request.getRemoteAddress(), request.getAttachment());
releaseConnectionAndCancelResponse(request);
}
public void connectionOpen(NHttpConnection conn) {
conn.setSocketTimeout(0);
available.offer(conn);
logger.trace("%1$s - %2$d - open", conn, conn.hashCode());
}
@SuppressWarnings("unchecked")
private void releaseConnectionAndCancelResponse(SessionRequest request) {
allConnections.release();
FutureCommand<URI, ?, ?, ?> frequest = (FutureCommand<URI, ?, ?, ?>) request
.getAttachment();
if (frequest != null) {
logger.error("%1$s->%2$s[%3$s] - Cancelling FutureCommand", request.getLocalAddress(),
request.getRemoteAddress(), frequest);
frequest.cancel(true);
}
}
public void connectionTimeout(NHttpConnection conn) {
String message = String.format("%1$s - %2$d - timeout %2$d", conn, conn
.hashCode(), conn.getSocketTimeout());
logger.warn(message);
resubmitIfRequestIsReplayable(conn, new TimeoutException(message));
}
private void releaseConnectionAndSetResponseException(SessionRequest request, Exception e) {
allConnections.release();
HttpFutureCommand<?> frequest = (HttpFutureCommand<?>) request.getAttachment();
if (frequest != null) {
logger.error(e, "%1$s->%2$s[%3$s] - Setting Exception on FutureCommand", request
.getLocalAddress(), request.getRemoteAddress(), frequest);
frequest.setException(e);
}
}
public void connectionClosed(NHttpConnection conn) {
logger.trace("%1$s - %2$d - closed", conn, conn.hashCode());
}
public void failed(SessionRequest request) {
int count = currentSessionFailures.getAndIncrement();
logger.warn("%1$s->%2$s[%3$s] - SessionRequest failed", request.getLocalAddress(), request
.getRemoteAddress(), request.getAttachment());
releaseConnectionAndSetResponseException(request, request.getException());
if (count >= maxSessionFailures) {
logger.error(request.getException(),
"%1$s->%2$s[%3$s] - SessionRequest failures: %4$s, Disabling pool for %5$s",
request.getLocalAddress(), request.getRemoteAddress(), maxSessionFailures,
target);
exception.set(request.getException());
}
public void fatalIOException(IOException ex, NHttpConnection conn) {
logger.error(ex, "%3$s-%1$s{%2$d} - io error", conn, conn.hashCode(),
target);
resubmitIfRequestIsReplayable(conn, ex);
}
}
public void fatalProtocolException(HttpException ex, NHttpConnection conn) {
logger.error(ex, "%3$s-%1$s{%2$d} - http error", conn, conn.hashCode(),
target);
setExceptionOnCommand(conn, ex);
}
public void timeout(SessionRequest request) {
logger.warn("%1$s->%2$s[%3$s] - SessionRequest timeout", request.getLocalAddress(),
request.getRemoteAddress(), request.getAttachment());
releaseConnectionAndCancelResponse(request);
}
public static interface FutureCommandConnectionHandleFactory
extends
FutureCommandConnectionPool.FutureCommandConnectionHandleFactory<NHttpConnection, HttpFutureCommand<?>> {
HttpNioFutureCommandConnectionHandle create(
HttpFutureCommand<?> command, NHttpConnection conn);
}
}
@Override
protected boolean isReplayable(HttpFutureCommand<?> command) {
return command.getRequest().isReplayable();
}
public void connectionOpen(NHttpConnection conn) {
conn.setSocketTimeout(0);
available.offer(conn);
logger.trace("%1$s - %2$d - open", conn, conn.hashCode());
}
public void connectionTimeout(NHttpConnection conn) {
String message = String.format("%1$s - %2$d - timeout %2$d", conn, conn.hashCode(), conn
.getSocketTimeout());
logger.warn(message);
resubmitIfRequestIsReplayable(conn, new TimeoutException(message));
}
public void connectionClosed(NHttpConnection conn) {
logger.trace("%1$s - %2$d - closed", conn, conn.hashCode());
}
public void fatalIOException(IOException ex, NHttpConnection conn) {
logger.error(ex, "%3$s-%1$s{%2$d} - io error", conn, conn.hashCode(), target);
resubmitIfRequestIsReplayable(conn, ex);
}
public void fatalProtocolException(HttpException ex, NHttpConnection conn) {
logger.error(ex, "%3$s-%1$s{%2$d} - http error", conn, conn.hashCode(), target);
setExceptionOnCommand(conn, ex);
}
@Override
protected boolean isReplayable(HttpFutureCommand<?> command) {
return command.getRequest().isReplayable();
}
@Override
protected FutureCommandConnectionHandle<URI, NHttpConnection, HttpFutureCommand<?>> createHandle(
HttpFutureCommand<?> command, NHttpConnection conn) {
try {
return new HttpNioFutureCommandConnectionHandle(allConnections, available, endPoint,
command, conn);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted creating a handle to " + conn, e);
}
}
}

View File

@ -39,8 +39,8 @@ import org.jclouds.http.HttpErrorHandler;
import org.jclouds.http.HttpFutureCommand;
import org.jclouds.http.HttpRequest;
import org.jclouds.http.HttpRetryHandler;
import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
import org.jclouds.http.handlers.CloseContentAndSetExceptionHandler;
import org.jclouds.http.handlers.DelegatingErrorHandler;
import org.jclouds.http.handlers.DelegatingRetryHandler;
import org.jclouds.http.httpnio.util.HttpNioUtils;
import org.jclouds.logging.Logger;
@ -56,24 +56,28 @@ public class HttpNioFutureCommandExecutionHandler implements NHttpRequestExecuti
@Resource
protected Logger logger = Logger.NULL;
private final ConsumingNHttpEntityFactory entityFactory;
private final BlockingQueue<HttpFutureCommand<?>> commandQueue;
/**
* inputOnly: nothing is taken from this queue.
*/
private final BlockingQueue<HttpFutureCommand<?>> resubmitQueue;
@Inject(optional = true)
private HttpErrorHandler serverErrorHandler = new CloseContentAndSetExceptionHandler();
private HttpRetryHandler retryHandler = new DelegatingRetryHandler();
@Inject(optional = true)
protected HttpRetryHandler httpRetryHandler = new BackoffLimitedRetryHandler(5);
public interface ConsumingNHttpEntityFactory {
public ConsumingNHttpEntity create(HttpEntity httpEntity);
}
private HttpErrorHandler errorHandler = new DelegatingErrorHandler();
@Inject
public HttpNioFutureCommandExecutionHandler(ConsumingNHttpEntityFactory entityFactory,
ExecutorService executor, BlockingQueue<HttpFutureCommand<?>> commandQueue) {
ExecutorService executor, BlockingQueue<HttpFutureCommand<?>> resubmitQueue) {
this.executor = executor;
this.entityFactory = entityFactory;
this.commandQueue = commandQueue;
this.resubmitQueue = resubmitQueue;
}
public interface ConsumingNHttpEntityFactory {
public ConsumingNHttpEntity create(HttpEntity httpEntity);
}
public void initalizeContext(HttpContext context, Object attachment) {
@ -82,8 +86,8 @@ public class HttpNioFutureCommandExecutionHandler implements NHttpRequestExecuti
public HttpEntityEnclosingRequest submitRequest(HttpContext context) {
HttpFutureCommand<?> command = (HttpFutureCommand<?>) context.removeAttribute("command");
if (command != null) {
HttpRequest object = command.getRequest();
return HttpNioUtils.convertToApacheRequest(object);
HttpRequest request = command.getRequest();
return HttpNioUtils.convertToApacheRequest(request);
}
return null;
@ -102,24 +106,13 @@ public class HttpNioFutureCommandExecutionHandler implements NHttpRequestExecuti
HttpFutureCommand<?> command = handle.getCommand();
org.jclouds.http.HttpResponse response = HttpNioUtils
.convertToJavaCloudsResponse(apacheResponse);
int code = response.getStatusCode();
if (code >= 500) {
boolean retryRequest = false;
try {
retryRequest = httpRetryHandler.shouldRetryRequest(command, response);
} catch (InterruptedException ie) {
// TODO: Add interrupt exception to command and abort?
}
if (retryRequest) {
commandQueue.add(command);
int statusCode = response.getStatusCode();
if (statusCode >= 300) {
if (retryHandler.shouldRetryRequest(command, response)) {
resubmitQueue.add(command);
} else {
serverErrorHandler.handle(command, response);
errorHandler.handleError(command, response);
}
} else if (code >= 400 && code < 500) {
serverErrorHandler.handle(command, response);
} else if (code >= 300 && code < 400) {
serverErrorHandler.handle(command, response);
} else {
processResponse(response, command);
}

View File

@ -41,26 +41,29 @@ import org.jclouds.http.HttpRequest;
import org.jclouds.http.HttpResponse;
public class HttpNioUtils {
public static HttpEntityEnclosingRequest convertToApacheRequest(HttpRequest object) {
BasicHttpEntityEnclosingRequest apacheRequest = new BasicHttpEntityEnclosingRequest(object
.getMethod().toString(), object.getUri(), HttpVersion.HTTP_1_1);
public static HttpEntityEnclosingRequest convertToApacheRequest(HttpRequest request) {
BasicHttpEntityEnclosingRequest apacheRequest = new BasicHttpEntityEnclosingRequest(request
.getMethod().toString(), request.getUri(), HttpVersion.HTTP_1_1);
Object content = object.getPayload();
Object content = request.getPayload();
// Since we may remove headers, ensure they are added to the apache
// request after this block
if (content != null) {
long contentLength = Long.parseLong(object
.getFirstHeaderOrNull(HttpHeaders.CONTENT_LENGTH));
object.getHeaders().removeAll(HttpHeaders.CONTENT_LENGTH);
String contentType = object.getFirstHeaderOrNull(HttpHeaders.CONTENT_TYPE);
object.getHeaders().removeAll(HttpHeaders.CONTENT_TYPE);
String lengthString = request.getFirstHeaderOrNull(HttpHeaders.CONTENT_LENGTH);
if (lengthString == null) {
throw new IllegalStateException("no Content-Length header on request: " + apacheRequest);
}
long contentLength = Long.parseLong(lengthString);
String contentType = request.getFirstHeaderOrNull(HttpHeaders.CONTENT_TYPE);
addEntityForContent(apacheRequest, content, contentType, contentLength);
}
for (String header : object.getHeaders().keySet()) {
for (String value : object.getHeaders().get(header))
apacheRequest.addHeader(header, value);
for (String header : request.getHeaders().keySet()) {
for (String value : request.getHeaders().get(header))
// apache automatically tries to add content length header
if (!header.equals(HttpHeaders.CONTENT_LENGTH))
apacheRequest.addHeader(header, value);
}
return apacheRequest;
}

View File

@ -23,10 +23,7 @@
*/
package org.jclouds.http.httpnio.pool;
import java.net.MalformedURLException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.jclouds.command.pool.PoolConstants;
import org.jclouds.http.BaseHttpFutureCommandClientTest;
@ -55,16 +52,4 @@ public class HttpNioConnectionPoolFutureCommandClientTest extends BaseHttpFuture
return new HttpNioConnectionPoolClientModule();
}
@Override
@Test(enabled = false)
public void testGetStringRedirect() throws MalformedURLException, ExecutionException,
InterruptedException, TimeoutException {
}
@Override
@Test(enabled = false)
public void testPutRedirect() throws MalformedURLException, ExecutionException,
InterruptedException, TimeoutException {
}
}