417356 - Add SOCKS support to jetty client.

Big refactoring to allow for additional proxy schemes that work at a
lower level than HTTP.

Introduced client-side ConnectionFactory, and binding that to a
HttpDestination, so that connections to that destination will use the
same ConnectionFactory.

The destination's ConnectionFactory is now initialized from the proxy
configuration and the transport, which is now itself a
ConnectionFactory.

The proxy configuration has also changed becoming polymorphic by
introducing a new ProxyConfiguration.Proxy abstract class,
which is implemented as HTTPProxy and can be implemented in future as
SOCKS4Proxy (and possibly others).
This commit is contained in:
Simone Bordet 2013-10-04 16:58:55 +02:00
parent 8eaedc4a5e
commit 25d9b8704f
37 changed files with 1803 additions and 672 deletions

View File

@ -19,24 +19,20 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import javax.net.ssl.SSLEngine;
import java.util.Map;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public abstract class AbstractHttpClientTransport extends ContainerLifeCycle implements HttpClientTransport
{
@ -72,12 +68,20 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
}
@Override
public void connect(HttpDestination destination, SocketAddress address, Promise<org.eclipse.jetty.client.api.Connection> promise)
protected void doStop() throws Exception
{
super.doStop();
removeBean(selectorManager);
}
@Override
public void connect(SocketAddress address, Map<String, Object> context)
{
SocketChannel channel = null;
try
{
channel = SocketChannel.open();
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
SocketAddress bindAddress = client.getBindAddress();
if (bindAddress != null)
@ -86,8 +90,9 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
channel.configureBlocking(false);
channel.connect(address);
ConnectionCallback callback = new ConnectionCallback(destination, promise);
selectorManager.connect(channel, callback);
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());
selectorManager.connect(channel, context);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
@ -104,12 +109,14 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
}
finally
{
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(x);
}
}
}
protected void configure(HttpClient client, SocketChannel channel) throws SocketException
protected void configure(HttpClient client, SocketChannel channel) throws IOException
{
channel.socket().setTcpNoDelay(client.isTCPNoDelay());
}
@ -119,40 +126,6 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
return new ClientSelectorManager(client, selectors);
}
protected SslConnection createSslConnection(EndPoint endPoint, HttpDestination destination)
{
HttpClient httpClient = destination.getHttpClient();
SslContextFactory sslContextFactory = httpClient.getSslContextFactory();
SSLEngine engine = sslContextFactory.newSSLEngine(destination.getHost(), destination.getPort());
engine.setUseClientMode(true);
SslConnection sslConnection = newSslConnection(httpClient, endPoint, engine);
sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed());
endPoint.setConnection(sslConnection);
EndPoint appEndPoint = sslConnection.getDecryptedEndPoint();
Connection connection = newConnection(appEndPoint, destination);
appEndPoint.setConnection(connection);
return sslConnection;
}
protected SslConnection newSslConnection(HttpClient httpClient, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(httpClient.getByteBufferPool(), httpClient.getExecutor(), endPoint, engine);
}
protected abstract Connection newConnection(EndPoint endPoint, HttpDestination destination);
protected org.eclipse.jetty.client.api.Connection tunnel(EndPoint endPoint, HttpDestination destination, org.eclipse.jetty.client.api.Connection connection)
{
SslConnection sslConnection = createSslConnection(endPoint, destination);
Connection result = sslConnection.getDecryptedEndPoint().getConnection();
selectorManager.connectionClosed((Connection)connection);
selectorManager.connectionOpened(sslConnection);
LOG.debug("Tunnelled {} over {}", connection, result);
return (org.eclipse.jetty.client.api.Connection)result;
}
protected class ClientSelectorManager extends SelectorManager
{
private final HttpClient client;
@ -170,63 +143,21 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
}
@Override
public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
public org.eclipse.jetty.io.Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
ConnectionCallback callback = (ConnectionCallback)attachment;
HttpDestination destination = callback.destination;
SslContextFactory sslContextFactory = client.getSslContextFactory();
if (!destination.isProxied() && HttpScheme.HTTPS.is(destination.getScheme()))
{
if (sslContextFactory == null)
{
IOException failure = new ConnectException("Missing " + SslContextFactory.class.getSimpleName() + " for " + destination.getScheme() + " requests");
callback.failed(failure);
throw failure;
}
else
{
SslConnection sslConnection = createSslConnection(endPoint, destination);
callback.succeeded((org.eclipse.jetty.client.api.Connection)sslConnection.getDecryptedEndPoint().getConnection());
return sslConnection;
}
}
else
{
Connection connection = AbstractHttpClientTransport.this.newConnection(endPoint, destination);
callback.succeeded((org.eclipse.jetty.client.api.Connection)connection);
return connection;
}
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
ConnectionCallback callback = (ConnectionCallback)attachment;
callback.failed(ex);
}
}
private class ConnectionCallback implements Promise<org.eclipse.jetty.client.api.Connection>
{
private final HttpDestination destination;
private final Promise<org.eclipse.jetty.client.api.Connection> promise;
private ConnectionCallback(HttpDestination destination, Promise<org.eclipse.jetty.client.api.Connection> promise)
{
this.destination = destination;
this.promise = promise;
}
@Override
public void succeeded(org.eclipse.jetty.client.api.Connection result)
{
promise.succeeded(result);
}
@Override
public void failed(Throwable x)
protected void connectionFailed(SocketChannel channel, Throwable x, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(x);
}
}

View File

@ -109,16 +109,19 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
return;
}
URI uri = getAuthenticationURI(request);
Authentication authentication = null;
Authentication.HeaderInfo headerInfo = null;
for (Authentication.HeaderInfo element : headerInfos)
URI uri = getAuthenticationURI(request);
if (uri != null)
{
authentication = client.getAuthenticationStore().findAuthentication(element.getType(), uri, element.getRealm());
if (authentication != null)
for (Authentication.HeaderInfo element : headerInfos)
{
headerInfo = element;
break;
authentication = client.getAuthenticationStore().findAuthentication(element.getType(), uri, element.getRealm());
if (authentication != null)
{
headerInfo = element;
break;
}
}
}
if (authentication == null)

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.client.http;
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.concurrent.BlockingDeque;
@ -33,9 +33,9 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpConnectionPool implements Dumpable
public class ConnectionPool implements Dumpable
{
private static final Logger LOG = Log.getLogger(HttpConnectionPool.class);
private static final Logger LOG = Log.getLogger(ConnectionPool.class);
private final AtomicInteger connectionCount = new AtomicInteger();
private final Destination destination;
@ -44,7 +44,7 @@ public class HttpConnectionPool implements Dumpable
private final BlockingDeque<Connection> idleConnections;
private final BlockingQueue<Connection> activeConnections;
public HttpConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise)
public ConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise)
{
this.destination = destination;
this.maxConnections = maxConnections;

View File

@ -27,10 +27,12 @@ import java.net.URI;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -44,7 +46,6 @@ import org.eclipse.jetty.client.api.AuthenticationStore;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.ProxyConfiguration;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
@ -57,7 +58,6 @@ import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -104,12 +104,13 @@ public class HttpClient extends ContainerLifeCycle
{
private static final Logger LOG = Log.getLogger(HttpClient.class);
private final ConcurrentMap<String, HttpDestination> destinations = new ConcurrentHashMap<>();
private final ConcurrentMap<Origin, HttpDestination> destinations = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, HttpConversation> conversations = new ConcurrentHashMap<>();
private final List<ProtocolHandler> handlers = new ArrayList<>();
private final List<Request.Listener> requestListeners = new ArrayList<>();
private final AuthenticationStore authenticationStore = new HttpAuthenticationStore();
private final Set<ContentDecoder.Factory> decoderFactories = new ContentDecoderFactorySet();
private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
private final HttpClientTransport transport;
private final SslContextFactory sslContextFactory;
private volatile CookieManager cookieManager;
@ -132,7 +133,6 @@ public class HttpClient extends ContainerLifeCycle
private volatile boolean tcpNoDelay = true;
private volatile boolean dispatchIO = true;
private volatile boolean strictEventOrdering = false;
private volatile ProxyConfiguration proxyConfig;
private volatile HttpField encodingField;
/**
@ -359,7 +359,7 @@ public class HttpClient extends ContainerLifeCycle
*/
public Request newRequest(String host, int port)
{
return newRequest(address("http", host, port));
return newRequest(new Origin("http", host, port).asString());
}
/**
@ -417,13 +417,6 @@ public class HttpClient extends ContainerLifeCycle
return newRequest;
}
public String address(String scheme, String host, int port)
{
StringBuilder result = new StringBuilder();
URIUtil.appendSchemeHostPort(result, scheme, host, port);
return result.toString();
}
/**
* Returns a {@link Destination} for the given scheme, host and port.
* Applications may use {@link Destination}s to create {@link Connection}s
@ -446,20 +439,20 @@ public class HttpClient extends ContainerLifeCycle
{
port = normalizePort(scheme, port);
String address = address(scheme, host, port);
HttpDestination destination = destinations.get(address);
Origin origin = new Origin(scheme, host, port);
HttpDestination destination = destinations.get(origin);
if (destination == null)
{
destination = transport.newHttpDestination(scheme, host, port);
destination = transport.newHttpDestination(origin);
if (isRunning())
{
HttpDestination existing = destinations.putIfAbsent(address, destination);
HttpDestination existing = destinations.putIfAbsent(origin, destination);
if (existing != null)
destination = existing;
else
LOG.debug("Created {}", destination);
if (!isRunning())
destinations.remove(address);
destinations.remove(origin);
}
}
@ -486,13 +479,16 @@ public class HttpClient extends ContainerLifeCycle
protected void newConnection(final HttpDestination destination, final Promise<Connection> promise)
{
Destination.Address address = destination.getConnectAddress();
Origin.Address address = destination.getConnectAddress();
resolver.resolve(address.getHost(), address.getPort(), new Promise<SocketAddress>()
{
@Override
public void succeeded(SocketAddress socketAddress)
{
transport.connect(destination, socketAddress, promise);
Map<String, Object> context = new HashMap<>();
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination);
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise);
transport.connect(socketAddress, context);
}
@Override
@ -881,14 +877,6 @@ public class HttpClient extends ContainerLifeCycle
return proxyConfig;
}
/**
* @param proxyConfig the forward proxy configuration
*/
public void setProxyConfiguration(ProxyConfiguration proxyConfig)
{
this.proxyConfig = proxyConfig;
}
protected HttpField getAcceptEncodingField()
{
return encodingField;

View File

@ -19,9 +19,9 @@
package org.eclipse.jetty.client;
import java.net.SocketAddress;
import java.util.Map;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.io.ClientConnectionFactory;
/**
* {@link HttpClientTransport} represents what transport implementations should provide
@ -34,8 +34,11 @@ import org.eclipse.jetty.util.Promise;
* but the HTTP exchange may also be carried using the SPDY protocol or the FCGI protocol or, in future,
* other protocols.
*/
public interface HttpClientTransport
public interface HttpClientTransport extends ClientConnectionFactory
{
public static final String HTTP_DESTINATION_CONTEXT_KEY = "http.destination";
public static final String HTTP_CONNECTION_PROMISE_CONTEXT_KEY = "http.connection.promise";
/**
* Sets the {@link HttpClient} instance on this transport.
* <p />
@ -53,27 +56,16 @@ public interface HttpClientTransport
* {@link HttpDestination} controls the destination-connection cardinality: protocols like
* HTTP have 1-N cardinality, while multiplexed protocols like SPDY have a 1-1 cardinality.
*
* @param scheme the destination scheme
* @param host the destination host
* @param port the destination port
* @param origin the destination origin
* @return a new, transport-specific, {@link HttpDestination} object
*/
public HttpDestination newHttpDestination(String scheme, String host, int port);
public HttpDestination newHttpDestination(Origin origin);
/**
* Establishes a physical connection to the given {@code address}.
*
* @param destination the destination
* @param address the address to connect to
* @param promise the promise to notify when the connection succeeds or fails
* @param context the context information to establish the connection
*/
public void connect(HttpDestination destination, SocketAddress address, Promise<Connection> promise);
/**
* Establishes an encrypted tunnel over the given {@code connection}
*
* @param connection the connection to tunnel
* @return the tunnelled connection
*/
public Connection tunnel(Connection connection);
public void connect(SocketAddress address, Map<String, Object> context);
}

View File

@ -83,6 +83,7 @@ public abstract class HttpConnection implements Connection
HttpVersion version = request.getVersion();
HttpFields headers = request.getHeaders();
ContentProvider content = request.getContent();
ProxyConfiguration.Proxy proxy = destination.getProxy();
// Make sure the path is there
String path = request.getPath();
@ -91,7 +92,7 @@ public abstract class HttpConnection implements Connection
path = "/";
request.path(path);
}
if (destination.isProxied() && !HttpMethod.CONNECT.is(method))
if (proxy != null && !HttpMethod.CONNECT.is(method))
{
path = request.getURI().toString();
request.path(path);
@ -136,9 +137,12 @@ public abstract class HttpConnection implements Connection
request.header(HttpHeader.COOKIE.asString(), cookieString.toString());
// Authorization
URI authenticationURI = destination.isProxied() ? destination.getProxyURI() : request.getURI();
Authentication.Result authnResult = getHttpClient().getAuthenticationStore().findAuthenticationResult(authenticationURI);
if (authnResult != null)
authnResult.apply(request);
URI authenticationURI = proxy != null ? proxy.getURI() : request.getURI();
if (authenticationURI != null)
{
Authentication.Result authnResult = getHttpClient().getAuthenticationStore().findAuthenticationResult(authenticationURI);
if (authnResult != null)
authnResult.apply(request);
}
}
}

View File

@ -20,51 +20,45 @@ package org.eclipse.jetty.client;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.AsynchronousCloseException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.ProxyConfiguration;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public abstract class HttpDestination implements Destination, Closeable, Dumpable
{
protected static final Logger LOG = Log.getLogger(HttpDestination.class);
private final HttpClient client;
private final String scheme;
private final String host;
private final Address address;
private final Origin origin;
private final Queue<HttpExchange> exchanges;
private final RequestNotifier requestNotifier;
private final ResponseNotifier responseNotifier;
private final Address proxyAddress;
private final ProxyConfiguration.Proxy proxy;
private final ClientConnectionFactory connectionFactory;
private final HttpField hostField;
public HttpDestination(HttpClient client, String scheme, String host, int port)
public HttpDestination(HttpClient client, Origin origin)
{
this.client = client;
this.scheme = scheme;
this.host = host;
this.address = new Address(host, port);
this.origin = origin;
this.exchanges = new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
@ -72,19 +66,40 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
this.responseNotifier = new ResponseNotifier(client);
ProxyConfiguration proxyConfig = client.getProxyConfiguration();
proxyAddress = proxyConfig != null && proxyConfig.matches(host, port) ?
new Address(proxyConfig.getHost(), proxyConfig.getPort()) : null;
proxy = proxyConfig.match(origin);
ClientConnectionFactory connectionFactory = client.getTransport();
if (proxy != null)
{
connectionFactory = proxy.newClientConnectionFactory(connectionFactory);
}
else
{
if (HttpScheme.HTTPS.is(getScheme()))
connectionFactory = newSslClientConnectionFactory(connectionFactory);
}
this.connectionFactory = connectionFactory;
if (!client.isDefaultPort(scheme, port))
host += ":" + port;
String host = getHost();
if (!client.isDefaultPort(getScheme(), getPort()))
host += ":" + getPort();
hostField = new HttpField(HttpHeader.HOST, host);
}
protected ClientConnectionFactory newSslClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
return new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
}
public HttpClient getHttpClient()
{
return client;
}
public Origin getOrigin()
{
return origin;
}
public Queue<HttpExchange> getHttpExchanges()
{
return exchanges;
@ -100,10 +115,20 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
return responseNotifier;
}
public ProxyConfiguration.Proxy getProxy()
{
return proxy;
}
public ClientConnectionFactory getClientConnectionFactory()
{
return connectionFactory;
}
@Override
public String getScheme()
{
return scheme;
return origin.getScheme();
}
@Override
@ -111,32 +136,18 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
{
// InetSocketAddress.getHostString() transforms the host string
// in case of IPv6 addresses, so we return the original host string
return host;
return origin.getAddress().getHost();
}
@Override
public int getPort()
{
return address.getPort();
return origin.getAddress().getPort();
}
public Address getConnectAddress()
public Origin.Address getConnectAddress()
{
return isProxied() ? proxyAddress : address;
}
public boolean isProxied()
{
return proxyAddress != null;
}
public URI getProxyURI()
{
ProxyConfiguration proxyConfiguration = client.getProxyConfiguration();
String uri = getScheme() + "://" + proxyConfiguration.getHost();
if (!client.isDefaultPort(getScheme(), proxyConfiguration.getPort()))
uri += ":" + proxyConfiguration.getPort();
return URI.create(uri);
return proxy == null ? origin.getAddress() : proxy.getAddress();
}
public HttpField getHostField()
@ -146,7 +157,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
protected void send(Request request, List<Response.ResponseListener> listeners)
{
if (!scheme.equals(request.getScheme()))
if (!getScheme().equals(request.getScheme()))
throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
if (!getHost().equals(request.getHost()))
throw new IllegalArgumentException("Invalid request host " + request.getHost() + " for destination " + this);
@ -188,7 +199,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
public void newConnection(Promise<Connection> promise)
{
createConnection(new ProxyPromise(promise));
createConnection(promise);
}
protected void createConnection(Promise<Connection> promise)
@ -236,18 +247,6 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
getResponseNotifier().notifyComplete(listeners, new Result(request, cause, response, cause));
}
protected void tunnelSucceeded(Connection connection, Promise<Connection> promise)
{
// Wrap the connection with TLS
promise.succeeded(client.getTransport().tunnel(connection));
}
protected void tunnelFailed(Connection connection, Promise<Connection> promise, Throwable failure)
{
promise.failed(failure);
connection.close();
}
@Override
public String dump()
{
@ -262,7 +261,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
public String asString()
{
return client.address(getScheme(), getHost(), getPort());
return origin.asString();
}
@Override
@ -271,84 +270,6 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
return String.format("%s(%s)%s",
HttpDestination.class.getSimpleName(),
asString(),
proxyAddress == null ? "" : " via " + proxyAddress.getHost() + ":" + proxyAddress.getPort());
}
/**
* Decides whether to establish a proxy tunnel using HTTP CONNECT.
* It is implemented as a promise because it needs to establish the tunnel
* when the TCP connection is succeeded, and needs to notify another
* promise when the tunnel is established (or failed).
*/
private class ProxyPromise implements Promise<Connection>
{
private final Promise<Connection> delegate;
private ProxyPromise(Promise<Connection> delegate)
{
this.delegate = delegate;
}
@Override
public void succeeded(Connection connection)
{
if (isProxied() && HttpScheme.HTTPS.is(getScheme()))
{
if (client.getSslContextFactory() != null)
{
tunnel(connection);
}
else
{
String message = String.format("Cannot perform requests over SSL, no %s in %s",
SslContextFactory.class.getSimpleName(), HttpClient.class.getSimpleName());
delegate.failed(new IllegalStateException(message));
}
}
else
{
delegate.succeeded(connection);
}
}
@Override
public void failed(Throwable x)
{
delegate.failed(x);
}
private void tunnel(final Connection connection)
{
String target = address.getHost() + ":" + address.getPort();
Request connect = client.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
.scheme(HttpScheme.HTTP.asString())
.method(HttpMethod.CONNECT)
.path(target)
.header(HttpHeader.HOST, target)
.timeout(client.getConnectTimeout(), TimeUnit.MILLISECONDS);
connection.send(connect, new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
{
tunnelFailed(connection, delegate, result.getFailure());
}
else
{
Response response = result.getResponse();
if (response.getStatus() == 200)
{
tunnelSucceeded(connection, delegate);
}
else
{
tunnelFailed(connection, delegate, new HttpResponseException("Received " + response + " for " + result.getRequest(), response));
}
}
}
});
}
proxy == null ? "" : " via " + proxy);
}
}

View File

@ -0,0 +1,200 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HttpProxy extends ProxyConfiguration.Proxy
{
public HttpProxy(String host, int port)
{
this(new Origin.Address(host, port), false);
}
public HttpProxy(Origin.Address address, boolean secure)
{
super(address, secure);
}
@Override
public ClientConnectionFactory newClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
return new HttpProxyClientConnectionFactory(connectionFactory);
}
@Override
public URI getURI()
{
String scheme = isSecure() ? HttpScheme.HTTPS.asString() : HttpScheme.HTTP.asString();
return URI.create(new Origin(scheme, getAddress()).asString());
}
public static class HttpProxyClientConnectionFactory implements ClientConnectionFactory
{
private static final Logger LOG = Log.getLogger(HttpProxyClientConnectionFactory.class);
private final ClientConnectionFactory connectionFactory;
public HttpProxyClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
this.connectionFactory = connectionFactory;
}
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
final ProxyPromise proxyPromise = new ProxyPromise(endPoint, promise, context);
// Replace the promise with the proxy one
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, proxyPromise);
return connectionFactory.newConnection(endPoint, context);
}
/**
* Decides whether to establish a proxy tunnel using HTTP CONNECT.
* It is implemented as a promise because it needs to establish the
* tunnel after the TCP connection is succeeded, and needs to notify
* the nested promise when the tunnel is established (or failed).
*/
private class ProxyPromise implements Promise<Connection>
{
private final EndPoint endPoint;
private final Promise<Connection> promise;
private final Map<String, Object> context;
private ProxyPromise(EndPoint endPoint, Promise<Connection> promise, Map<String, Object> context)
{
this.endPoint = endPoint;
this.promise = promise;
this.context = context;
}
@Override
public void succeeded(Connection connection)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
if (HttpScheme.HTTPS.is(destination.getScheme()))
{
SslContextFactory sslContextFactory = destination.getHttpClient().getSslContextFactory();
if (sslContextFactory != null)
{
tunnel(destination, connection);
}
else
{
String message = String.format("Cannot perform requests over SSL, no %s in %s",
SslContextFactory.class.getSimpleName(), HttpClient.class.getSimpleName());
promise.failed(new IllegalStateException(message));
}
}
else
{
promise.succeeded(connection);
}
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
private void tunnel(HttpDestination destination, final Connection connection)
{
String target = destination.getOrigin().getAddress().asString();
Origin.Address proxyAddress = destination.getConnectAddress();
HttpClient httpClient = destination.getHttpClient();
Request connect = httpClient.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
.scheme(HttpScheme.HTTP.asString())
.method(HttpMethod.CONNECT)
.path(target)
.header(HttpHeader.HOST, target)
.timeout(httpClient.getConnectTimeout(), TimeUnit.MILLISECONDS);
connection.send(connect, new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
{
tunnelFailed(result.getFailure());
}
else
{
Response response = result.getResponse();
if (response.getStatus() == 200)
{
tunnelSucceeded();
}
else
{
tunnelFailed(new HttpResponseException("Received " + response + " for " + result.getRequest(), response));
}
}
}
});
}
private void tunnelSucceeded()
{
try
{
// Replace the promise back with the original
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise);
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
ClientConnectionFactory sslConnectionFactory = new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
org.eclipse.jetty.io.Connection oldConnection = endPoint.getConnection();
org.eclipse.jetty.io.Connection newConnection = sslConnectionFactory.newConnection(endPoint, context);
Helper.replaceConnection(oldConnection, newConnection);
LOG.debug("HTTP tunnel established: {} over {}", oldConnection, newConnection);
}
catch (Throwable x)
{
tunnelFailed(x);
}
}
private void tunnelFailed(Throwable failure)
{
endPoint.close();
failed(failure);
}
}
}
}

View File

@ -589,7 +589,7 @@ public class HttpRequest implements Request
path += "?" + query;
URI result = URI.create(path);
if (!result.isAbsolute() && !result.isOpaque())
result = URI.create(client.address(getScheme(), getHost(), getPort()) + path);
result = URI.create(new Origin(getScheme(), getHost(), getPort()).asString() + path);
return result;
}

View File

@ -29,9 +29,9 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
private C connection;
protected MultiplexHttpDestination(HttpClient client, String scheme, String host, int port)
protected MultiplexHttpDestination(HttpClient client, Origin origin)
{
super(client, scheme, host, port);
super(client, origin);
}
@Override

View File

@ -0,0 +1,122 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.util.URIUtil;
public class Origin
{
private final String scheme;
private final Address address;
public Origin(String scheme, String host, int port)
{
this(scheme, new Address(host, port));
}
public Origin(String scheme, Address address)
{
this.scheme = scheme;
this.address = address;
}
public String getScheme()
{
return scheme;
}
public Address getAddress()
{
return address;
}
public String asString()
{
StringBuilder result = new StringBuilder();
URIUtil.appendSchemeHostPort(result, scheme, address.host, address.port);
return result.toString();
}
@Override
public boolean equals(Object obj)
{
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
Origin that = (Origin)obj;
return scheme.equals(that.scheme) && address.equals(that.address);
}
@Override
public int hashCode()
{
int result = scheme.hashCode();
result = 31 * result + address.hashCode();
return result;
}
public static class Address
{
private final String host;
private final int port;
public Address(String host, int port)
{
this.host = host;
this.port = port;
}
public String getHost()
{
return host;
}
public int getPort()
{
return port;
}
@Override
public boolean equals(Object obj)
{
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
Address that = (Address)obj;
return host.equals(that.host) && port == that.port;
}
@Override
public int hashCode()
{
int result = host.hashCode();
result = 31 * result + port;
return result;
}
public String asString()
{
return String.format("%s:%d", host, port);
}
@Override
public String toString()
{
return asString();
}
}
}

View File

@ -59,6 +59,7 @@ public class ProxyAuthenticationProtocolHandler extends AuthenticationProtocolHa
protected URI getAuthenticationURI(Request request)
{
HttpDestination destination = getHttpClient().destinationFor(request.getScheme(), request.getHost(), request.getPort());
return destination.isProxied() ? destination.getProxyURI() : request.getURI();
ProxyConfiguration.Proxy proxy = destination.getProxy();
return proxy != null ? proxy.getURI() : request.getURI();
}
}

View File

@ -0,0 +1,137 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.eclipse.jetty.io.ClientConnectionFactory;
/**
* The configuration of the forward proxy to use with {@link org.eclipse.jetty.client.HttpClient}.
* <p />
* Applications add subclasses of {@link Proxy} to this configuration via:
* <pre>
* ProxyConfiguration proxyConfig = httpClient.getProxyConfiguration();
* proxyConfig.getProxies().add(new HttpProxy(proxyHost, 8080));
* </pre>
*
* @see HttpClient#getProxyConfiguration()
*/
public class ProxyConfiguration
{
private final List<Proxy> proxies = new ArrayList<>();
public List<Proxy> getProxies()
{
return proxies;
}
public Proxy match(Origin origin)
{
for (Proxy proxy : getProxies())
{
if (proxy.matches(origin))
return proxy;
}
return null;
}
public static abstract class Proxy
{
private final Set<Origin> included = new HashSet<>();
private final Set<Origin> excluded = new HashSet<>();
private final Origin.Address address;
private final boolean secure;
protected Proxy(Origin.Address address, boolean secure)
{
this.address = address;
this.secure = secure;
}
/**
* @return the address of this proxy
*/
public Origin.Address getAddress()
{
return address;
}
/**
* @return whether the connection to the proxy must be secured via TLS
*/
public boolean isSecure()
{
return secure;
}
/**
* @return the list of origins that must be proxied
*/
public Set<Origin> getIncludedOrigins()
{
return included;
}
/**
* @return the list of origins that must not be proxied.
*/
public Set<Origin> getExcludedOrigins()
{
return excluded;
}
/**
* @return an URI representing this proxy, or null if no URI can represent this proxy
*/
public URI getURI()
{
return null;
}
/**
* Matches the given {@code origin} with the included and excluded origins,
* returning true if the given {@code origin} is to be proxied.
*
* @param origin the origin to test for proxying
* @return true if the origin must be proxied, false otherwise
*/
public boolean matches(Origin origin)
{
return included.contains(origin) || !excluded.contains(origin);
}
/**
* @param connectionFactory the nested {@link ClientConnectionFactory}
* @return a new {@link ClientConnectionFactory} for this {@link Proxy}
*/
public abstract ClientConnectionFactory newClientConnectionFactory(ClientConnectionFactory connectionFactory);
@Override
public String toString()
{
return address.toString();
}
}
}

View File

@ -0,0 +1,190 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class Socks4Proxy extends ProxyConfiguration.Proxy
{
public Socks4Proxy(String host, int port)
{
this(new Origin.Address(host, port), false);
}
public Socks4Proxy(Origin.Address address, boolean secure)
{
super(address, secure);
}
@Override
public ClientConnectionFactory newClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
return new Socks4ProxyClientConnectionFactory(connectionFactory);
}
public static class Socks4ProxyClientConnectionFactory implements ClientConnectionFactory
{
private final ClientConnectionFactory connectionFactory;
public Socks4ProxyClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
this.connectionFactory = connectionFactory;
}
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
Executor executor = destination.getHttpClient().getExecutor();
return new Socks4ProxyConnection(endPoint, executor, connectionFactory, context);
}
}
private static class Socks4ProxyConnection extends AbstractConnection implements Callback
{
private static final Pattern IPv4_PATTERN = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
private static final Logger LOG = Log.getLogger(Socks4ProxyConnection.class);
private final ClientConnectionFactory connectionFactory;
private final Map<String, Object> context;
public Socks4ProxyConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, Map<String, Object> context)
{
super(endPoint, executor);
this.connectionFactory = connectionFactory;
this.context = context;
}
@Override
public void onOpen()
{
super.onOpen();
writeSocks4Connect();
}
/**
* Writes the SOCKS "connect" bytes, differentiating between SOCKS 4 and 4A;
* the former sends an IPv4 address, the latter the full domain name.
*/
private void writeSocks4Connect()
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
String host = destination.getHost();
short port = (short)destination.getPort();
Matcher matcher = IPv4_PATTERN.matcher(host);
if (matcher.matches())
{
// SOCKS 4
ByteBuffer buffer = ByteBuffer.allocate(9);
buffer.put((byte)4).put((byte)1).putShort(port);
for (int i = 1; i <= 4; ++i)
buffer.put((byte)Integer.parseInt(matcher.group(i)));
buffer.put((byte)0);
buffer.flip();
getEndPoint().write(this, buffer);
}
else
{
// SOCKS 4A
byte[] hostBytes = host.getBytes(Charset.forName("UTF-8"));
ByteBuffer buffer = ByteBuffer.allocate(9 + hostBytes.length + 1);
buffer.put((byte)4).put((byte)1).putShort(port);
buffer.put((byte)0).put((byte)0).put((byte)0).put((byte)1).put((byte)0);
buffer.put(hostBytes).put((byte)0);
buffer.flip();
getEndPoint().write(this, buffer);
}
}
@Override
public void succeeded()
{
LOG.debug("Written SOCKS4 connect request");
fillInterested();
}
@Override
public void failed(Throwable x)
{
close();
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(x);
}
@Override
public void onFillable()
{
try
{
ByteBuffer buffer = BufferUtil.allocate(8);
int filled = getEndPoint().fill(buffer);
LOG.debug("Read SOCKS4 connect response, {} bytes", filled);
if (filled != 8)
throw new IOException("Invalid response from SOCKS4 proxy");
int result = buffer.get(1);
if (result == 0x5A)
tunnel();
else
throw new IOException("SOCKS4 tunnel failed with code " + result);
}
catch (Throwable x)
{
failed(x);
}
}
private void tunnel()
{
try
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
ClientConnectionFactory connectionFactory = this.connectionFactory;
if (HttpScheme.HTTPS.is(destination.getScheme()))
connectionFactory = new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
org.eclipse.jetty.io.Connection connection = connectionFactory.newConnection(getEndPoint(), context);
ClientConnectionFactory.Helper.replaceConnection(this, connection);
LOG.debug("SOCKS4 tunnel established: {} over {}", this, connection);
}
catch (Throwable x)
{
failed(x);
}
}
}
}

View File

@ -63,43 +63,4 @@ public interface Destination
* @param promise the promise of a new, unpooled, {@link Connection}
*/
void newConnection(Promise<Connection> promise);
public static class Address
{
private final String host;
private final int port;
public Address(String host, int port)
{
this.host = host;
this.port = port;
}
public String getHost()
{
return host;
}
public int getPort()
{
return port;
}
@Override
public boolean equals(Object obj)
{
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
Address that = (Address)obj;
return host.equals(that.host) && port == that.port;
}
@Override
public int hashCode()
{
int result = host.hashCode();
result = 31 * result + port;
return result;
}
}
}

View File

@ -1,81 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.api;
import java.util.HashSet;
import java.util.Set;
/**
* The configuration of the forward proxy to use with {@link org.eclipse.jetty.client.HttpClient}.
* <p />
* Configuration parameters include the host and port of the forward proxy, and a list of
* {@link #getExcludedOrigins() origins} that are excluded from being proxied.
*
* @see org.eclipse.jetty.client.HttpClient#setProxyConfiguration(ProxyConfiguration)
*/
public class ProxyConfiguration
{
private final Set<String> excluded = new HashSet<>();
private final String host;
private final int port;
public ProxyConfiguration(String host, int port)
{
this.host = host;
this.port = port;
}
/**
* @return the host name of the forward proxy
*/
public String getHost()
{
return host;
}
/**
* @return the port of the forward proxy
*/
public int getPort()
{
return port;
}
/**
* Matches the given {@code host} and {@code port} with the list of excluded origins,
* returning true if the origin is to be proxied, false if it is excluded from proxying.
* @param host the host to match
* @param port the port to match
* @return true if the origin made of {@code host} and {@code port} is to be proxied,
* false if it is excluded from proxying.
*/
public boolean matches(String host, int port)
{
String hostPort = host + ":" + port;
return !getExcludedOrigins().contains(hostPort);
}
/**
* @return the list of origins to exclude from proxying, in the form "host:port".
*/
public Set<String> getExcludedOrigins()
{
return excluded;
}
}

View File

@ -18,16 +18,21 @@
package org.eclipse.jetty.client.http;
import java.io.IOException;
import java.util.Map;
import org.eclipse.jetty.client.AbstractHttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
public class HttpClientTransportOverHTTP extends AbstractHttpClientTransport
{
public HttpClientTransportOverHTTP()
{
this(Runtime.getRuntime().availableProcessors());
this(Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
}
public HttpClientTransportOverHTTP(int selectors)
@ -36,21 +41,20 @@ public class HttpClientTransportOverHTTP extends AbstractHttpClientTransport
}
@Override
public HttpDestination newHttpDestination(String scheme, String host, int port)
public HttpDestination newHttpDestination(Origin origin)
{
return new HttpDestinationOverHTTP(getHttpClient(), scheme, host, port);
return new HttpDestinationOverHTTP(getHttpClient(), origin);
}
@Override
protected Connection newConnection(EndPoint endPoint, HttpDestination destination)
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
return new HttpConnectionOverHTTP(endPoint, destination);
}
@Override
public org.eclipse.jetty.client.api.Connection tunnel(org.eclipse.jetty.client.api.Connection connection)
{
HttpConnectionOverHTTP httpConnection = (HttpConnectionOverHTTP)connection;
return tunnel(httpConnection.getEndPoint(), httpConnection.getHttpDestination(), connection);
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.succeeded(connection);
return connection;
}
}

View File

@ -21,30 +21,32 @@ package org.eclipse.jetty.client.http;
import java.io.IOException;
import java.util.Arrays;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
public class HttpDestinationOverHTTP extends HttpDestination implements Promise<Connection>
public class HttpDestinationOverHTTP extends HttpDestination implements Promise<Connection>
{
private final HttpConnectionPool connectionPool;
private final ConnectionPool connectionPool;
public HttpDestinationOverHTTP(HttpClient client, String scheme, String host, int port)
public HttpDestinationOverHTTP(HttpClient client, Origin origin)
{
super(client, scheme, host, port);
this.connectionPool = newHttpConnectionPool(client);
super(client, origin);
this.connectionPool = newConnectionPool(client);
}
protected HttpConnectionPool newHttpConnectionPool(HttpClient client)
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new HttpConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
return new ConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
}
public HttpConnectionPool getHttpConnectionPool()
public ConnectionPool getConnectionPool()
{
return connectionPool;
}

View File

@ -0,0 +1,245 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class HttpClientCustomProxyTest
{
public static final byte[] CAFE_BABE = new byte[]{(byte)0xCA, (byte)0xFE, (byte)0xBA, (byte)0xBE};
private Server server;
private ServerConnector connector;
private HttpClient client;
public void prepare(Handler handler) throws Exception
{
server = new Server();
connector = new ServerConnector(server, new CAFEBABEServerConnectionFactory(new HttpConnectionFactory()));
server.addConnector(connector);
server.setHandler(handler);
server.start();
QueuedThreadPool executor = new QueuedThreadPool();
executor.setName(executor.getName() + "-client");
client = new HttpClient();
client.setExecutor(executor);
client.start();
}
@After
public void dispose() throws Exception
{
if (client != null)
client.stop();
if (server != null)
server.stop();
}
@Test
public void testCustomProxy() throws Exception
{
final String serverHost = "server";
final int status = HttpStatus.NO_CONTENT_204;
prepare(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
if (!URI.create(baseRequest.getUri().toString()).isAbsolute())
response.setStatus(HttpServletResponse.SC_USE_PROXY);
else if (serverHost.equals(request.getServerName()))
response.setStatus(status);
else
response.setStatus(HttpServletResponse.SC_NOT_ACCEPTABLE);
}
});
// Setup the custom proxy
int proxyPort = connector.getLocalPort();
int serverPort = proxyPort + 1; // Any port will do for these tests - just not the same as the proxy
client.getProxyConfiguration().getProxies().add(new CAFEBABEProxy(new Origin.Address("localhost", proxyPort), false));
ContentResponse response = client.newRequest(serverHost, serverPort)
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(status, response.getStatus());
}
private class CAFEBABEProxy extends ProxyConfiguration.Proxy
{
private CAFEBABEProxy(Origin.Address address, boolean secure)
{
super(address, secure);
}
@Override
public ClientConnectionFactory newClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
return new CAFEBABEClientConnectionFactory(connectionFactory);
}
}
private class CAFEBABEClientConnectionFactory implements ClientConnectionFactory
{
private final ClientConnectionFactory connectionFactory;
private CAFEBABEClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
this.connectionFactory = connectionFactory;
}
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
Executor executor = destination.getHttpClient().getExecutor();
return new CAFEBABEConnection(endPoint, executor, connectionFactory, context);
}
}
private class CAFEBABEConnection extends AbstractConnection
{
private final ClientConnectionFactory connectionFactory;
private final Map<String, Object> context;
public CAFEBABEConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, Map<String, Object> context)
{
super(endPoint, executor);
this.connectionFactory = connectionFactory;
this.context = context;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
getEndPoint().write(new Callback.Adapter(), ByteBuffer.wrap(CAFE_BABE));
}
@Override
public void onFillable()
{
try
{
ByteBuffer buffer = BufferUtil.allocate(4);
int filled = getEndPoint().fill(buffer);
Assert.assertEquals(4, filled);
Assert.assertArrayEquals(CAFE_BABE, buffer.array());
// We are good, upgrade the connection
ClientConnectionFactory.Helper.replaceConnection(this, connectionFactory.newConnection(getEndPoint(), context));
}
catch (Throwable x)
{
close();
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(x);
}
}
}
private class CAFEBABEServerConnectionFactory extends AbstractConnectionFactory
{
private final org.eclipse.jetty.server.ConnectionFactory connectionFactory;
private CAFEBABEServerConnectionFactory(org.eclipse.jetty.server.ConnectionFactory connectionFactory)
{
super("cafebabe");
this.connectionFactory = connectionFactory;
}
@Override
public org.eclipse.jetty.io.Connection newConnection(Connector connector, EndPoint endPoint)
{
return new CAFEBABEServerConnection(connector, endPoint, connectionFactory);
}
}
private class CAFEBABEServerConnection extends AbstractConnection
{
private final org.eclipse.jetty.server.ConnectionFactory connectionFactory;
public CAFEBABEServerConnection(Connector connector, EndPoint endPoint, org.eclipse.jetty.server.ConnectionFactory connectionFactory)
{
super(endPoint, connector.getExecutor());
this.connectionFactory = connectionFactory;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onFillable()
{
try
{
ByteBuffer buffer = BufferUtil.allocate(4);
int filled = getEndPoint().fill(buffer);
Assert.assertEquals(4, filled);
Assert.assertArrayEquals(CAFE_BABE, buffer.array());
getEndPoint().write(new Callback.Adapter(), buffer);
// We are good, upgrade the connection
ClientConnectionFactory.Helper.replaceConnection(this, connectionFactory.newConnection(connector, getEndPoint()));
}
catch (Throwable x)
{
close();
}
}
}
}

View File

@ -25,7 +25,6 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionPool;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
@ -60,7 +59,7 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
Assert.assertEquals(200, response.getStatus());
HttpDestinationOverHTTP httpDestination = (HttpDestinationOverHTTP)destination;
HttpConnectionPool connectionPool = httpDestination.getHttpConnectionPool();
ConnectionPool connectionPool = httpDestination.getConnectionPool();
Assert.assertTrue(connectionPool.getActiveConnections().isEmpty());
Assert.assertTrue(connectionPool.getIdleConnections().isEmpty());
}
@ -92,7 +91,7 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
Assert.assertFalse(httpConnection.getEndPoint().isOpen());
HttpDestinationOverHTTP httpDestination = (HttpDestinationOverHTTP)destination;
HttpConnectionPool connectionPool = httpDestination.getHttpConnectionPool();
ConnectionPool connectionPool = httpDestination.getConnectionPool();
Assert.assertTrue(connectionPool.getActiveConnections().isEmpty());
Assert.assertTrue(connectionPool.getIdleConnections().isEmpty());
}

View File

@ -37,7 +37,6 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionPool;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpHeader;
@ -85,7 +84,7 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
}
// Re-run after warmup
iterations = 50_000;
iterations = 5_000;
for (int i = 0; i < runs; ++i)
{
run(random, iterations);
@ -111,7 +110,7 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
for (String host : Arrays.asList("localhost", "127.0.0.1"))
{
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, connector.getLocalPort());
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
ConnectionPool connectionPool = destination.getConnectionPool();
for (Connection connection : new ArrayList<>(connectionPool.getActiveConnections()))
{
HttpConnectionOverHTTP active = (HttpConnectionOverHTTP)connection;

View File

@ -27,7 +27,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.ProxyConfiguration;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.BasicAuthentication;
import org.eclipse.jetty.http.HttpHeader;
@ -68,7 +67,7 @@ public class HttpClientProxyTest extends AbstractHttpClientServerTest
int proxyPort = connector.getLocalPort();
int serverPort = proxyPort + 1; // Any port will do for these tests - just not the same as the proxy
client.setProxyConfiguration(new ProxyConfiguration("localhost", proxyPort));
client.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyPort));
ContentResponse response = client.newRequest(serverHost, serverPort)
.scheme(scheme)
@ -115,7 +114,7 @@ public class HttpClientProxyTest extends AbstractHttpClientServerTest
String proxyHost = "localhost";
int proxyPort = connector.getLocalPort();
int serverPort = proxyPort + 1; // Any port will do for these tests - just not the same as the proxy
client.setProxyConfiguration(new ProxyConfiguration(proxyHost, proxyPort));
client.getProxyConfiguration().getProxies().add(new HttpProxy(proxyHost, proxyPort));
ContentResponse response1 = client.newRequest(serverHost, serverPort)
.scheme(scheme)

View File

@ -51,7 +51,6 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionPool;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpField;
@ -85,7 +84,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertEquals(200, response.getStatus());
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
ConnectionPool connectionPool = destination.getConnectionPool();
long start = System.nanoTime();
HttpConnectionOverHTTP connection = null;
@ -637,7 +636,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
public void onBegin(Request request)
{
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
destination.getHttpConnectionPool().getActiveConnections().peek().close();
destination.getConnectionPool().getActiveConnections().peek().close();
}
})
.send(new Response.Listener.Adapter()

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.client;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -35,9 +36,13 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
@ -244,15 +249,30 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
client = new HttpClient(new HttpClientTransportOverHTTP()
{
@Override
protected SslConnection newSslConnection(HttpClient httpClient, EndPoint endPoint, SSLEngine engine)
public HttpDestination newHttpDestination(Origin origin)
{
return new SslConnection(httpClient.getByteBufferPool(), httpClient.getExecutor(), endPoint, engine)
return new HttpDestinationOverHTTP(getHttpClient(), origin)
{
@Override
protected boolean onReadTimeout()
protected ClientConnectionFactory newSslClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
sslIdle.set(true);
return super.onReadTimeout();
HttpClient client = getHttpClient();
return new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory)
{
@Override
protected SslConnection newSslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(byteBufferPool, executor, endPoint, engine)
{
@Override
protected boolean onReadTimeout()
{
sslIdle.set(true);
return super.onReadTimeout();
}
};
}
};
}
};
}

View File

@ -33,7 +33,6 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionPool;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.http.HttpHeader;
@ -68,7 +67,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
ConnectionPool connectionPool = destination.getConnectionPool();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
@ -129,7 +128,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
ConnectionPool connectionPool = destination.getConnectionPool();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
@ -180,7 +179,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
ConnectionPool connectionPool = destination.getConnectionPool();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
@ -240,7 +239,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
ConnectionPool connectionPool = destination.getConnectionPool();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
@ -313,7 +312,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
ConnectionPool connectionPool = destination.getConnectionPool();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
@ -366,7 +365,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
ConnectionPool connectionPool = destination.getConnectionPool();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
@ -416,7 +415,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
ConnectionPool connectionPool = destination.getConnectionPool();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
@ -466,7 +465,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
ConnectionPool connectionPool = destination.getConnectionPool();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.AbstractHttpClientServerTest;
import org.eclipse.jetty.client.EmptyServerHandler;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
@ -52,12 +53,12 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
@Test
public void test_FirstAcquire_WithEmptyQueue() throws Exception
{
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, "http", "localhost", connector.getLocalPort());
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()));
Connection connection = destination.acquire();
if (connection == null)
{
// There are no queued requests, so the newly created connection will be idle
connection = destination.getHttpConnectionPool().getIdleConnections().poll(5, TimeUnit.SECONDS);
connection = destination.getConnectionPool().getIdleConnections().poll(5, TimeUnit.SECONDS);
}
Assert.assertNotNull(connection);
}
@ -65,7 +66,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
@Test
public void test_SecondAcquire_AfterFirstAcquire_WithEmptyQueue_ReturnsSameConnection() throws Exception
{
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, "http", "localhost", connector.getLocalPort());
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()));
Connection connection1 = destination.acquire();
if (connection1 == null)
{
@ -74,7 +75,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
TimeUnit.MILLISECONDS.sleep(50);
connection1 = destination.getHttpConnectionPool().getIdleConnections().peek();
connection1 = destination.getConnectionPool().getIdleConnections().peek();
}
Assert.assertNotNull(connection1);
@ -87,7 +88,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
public void test_SecondAcquire_ConcurrentWithFirstAcquire_WithEmptyQueue_CreatesTwoConnections() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, "http", "localhost", connector.getLocalPort())
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))
{
@Override
protected void process(HttpConnectionOverHTTP connection, boolean dispatch)
@ -115,23 +116,23 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
latch.countDown();
// There must be 2 idle connections
Connection connection = destination.getHttpConnectionPool().getIdleConnections().poll(5, TimeUnit.SECONDS);
Connection connection = destination.getConnectionPool().getIdleConnections().poll(5, TimeUnit.SECONDS);
Assert.assertNotNull(connection);
connection = destination.getHttpConnectionPool().getIdleConnections().poll(5, TimeUnit.SECONDS);
connection = destination.getConnectionPool().getIdleConnections().poll(5, TimeUnit.SECONDS);
Assert.assertNotNull(connection);
}
@Test
public void test_Acquire_Process_Release_Acquire_ReturnsSameConnection() throws Exception
{
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, "http", "localhost", connector.getLocalPort());
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()));
HttpConnectionOverHTTP connection1 = destination.acquire();
long start = System.nanoTime();
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
TimeUnit.MILLISECONDS.sleep(50);
connection1 = (HttpConnectionOverHTTP)destination.getHttpConnectionPool().getIdleConnections().peek();
connection1 = (HttpConnectionOverHTTP)destination.getConnectionPool().getIdleConnections().peek();
}
Assert.assertNotNull(connection1);
@ -152,7 +153,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
long idleTimeout = 1000;
client.setIdleTimeout(idleTimeout);
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, "http", "localhost", connector.getLocalPort());
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()));
Connection connection1 = destination.acquire();
if (connection1 == null)
{
@ -161,13 +162,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
TimeUnit.MILLISECONDS.sleep(50);
connection1 = destination.getHttpConnectionPool().getIdleConnections().peek();
connection1 = destination.getConnectionPool().getIdleConnections().peek();
}
Assert.assertNotNull(connection1);
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
connection1 = destination.getHttpConnectionPool().getIdleConnections().poll();
connection1 = destination.getConnectionPool().getIdleConnections().poll();
Assert.assertNull(connection1);
}
}

View File

@ -0,0 +1,89 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.IOException;
import java.util.Map;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* Factory for client-side {@link Connection} instances.
*/
public interface ClientConnectionFactory
{
/**
*
* @param endPoint the {@link org.eclipse.jetty.io.EndPoint} to link the newly created connection to
* @param context the context data to create the connection
* @return a new {@link Connection}
* @throws IOException if the connection cannot be created
*/
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException;
public static class Helper
{
private static Logger LOG = Log.getLogger(Helper.class);
private Helper()
{
}
/**
* Replaces the given {@code oldConnection} with the given {@code newConnection} on the
* {@link EndPoint} associated with {@code oldConnection}, performing connection lifecycle management.
* <p />
* The {@code oldConnection} will be closed by invoking {@link org.eclipse.jetty.io.Connection#onClose()}
* and the {@code newConnection} will be opened by invoking {@link org.eclipse.jetty.io.Connection#onOpen()}.
* @param oldConnection the old connection to replace
* @param newConnection the new connection replacement
*/
public static void replaceConnection(Connection oldConnection, Connection newConnection)
{
close(oldConnection);
oldConnection.getEndPoint().setConnection(newConnection);
open(newConnection);
}
private static void open(Connection connection)
{
try
{
connection.onOpen();
}
catch (Throwable x)
{
LOG.debug(x);
}
}
private static void close(Connection connection)
{
try
{
connection.onClose();
}
catch (Throwable x)
{
LOG.debug(x);
}
}
}
}

View File

@ -0,0 +1,72 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io.ssl;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class SslClientConnectionFactory implements ClientConnectionFactory
{
public static final String SSL_PEER_HOST_CONTEXT_KEY = "ssl.peer.host";
public static final String SSL_PEER_PORT_CONTEXT_KEY = "ssl.peer.port";
public static final String SSL_ENGINE_CONTEXT_KEY = "ssl.engine";
private final SslContextFactory sslContextFactory;
private final ByteBufferPool byteBufferPool;
private final Executor executor;
private final ClientConnectionFactory connectionFactory;
public SslClientConnectionFactory(SslContextFactory sslContextFactory, ByteBufferPool byteBufferPool, Executor executor, ClientConnectionFactory connectionFactory)
{
this.sslContextFactory = sslContextFactory;
this.byteBufferPool = byteBufferPool;
this.executor = executor;
this.connectionFactory = connectionFactory;
}
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
String host = (String)context.get(SSL_PEER_HOST_CONTEXT_KEY);
int port = (Integer)context.get(SSL_PEER_PORT_CONTEXT_KEY);
SSLEngine engine = sslContextFactory.newSSLEngine(host, port);
engine.setUseClientMode(true);
context.put(SSL_ENGINE_CONTEXT_KEY, engine);
SslConnection sslConnection = newSslConnection(byteBufferPool, executor, endPoint, engine);
sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed());
endPoint.setConnection(sslConnection);
EndPoint appEndPoint = sslConnection.getDecryptedEndPoint();
appEndPoint.setConnection(connectionFactory.newConnection(appEndPoint, context));
return sslConnection;
}
protected SslConnection newSslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(byteBufferPool, executor, endPoint, engine);
}
}

View File

@ -48,8 +48,9 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpContentResponse;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.ProxyConfiguration;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
@ -110,7 +111,7 @@ public class ProxyServletTest
private HttpClient prepareClient() throws Exception
{
HttpClient result = new HttpClient();
result.setProxyConfiguration(new ProxyConfiguration("localhost", proxyConnector.getLocalPort()));
result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyConnector.getLocalPort()));
result.start();
return result;
}
@ -236,7 +237,7 @@ public class ProxyServletTest
prepareProxy(new ProxyServlet());
HttpClient result = new HttpClient();
result.setProxyConfiguration(new ProxyConfiguration("localhost", proxyConnector.getLocalPort()));
result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyConnector.getLocalPort()));
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName("foo");
threadPool.setMaxThreads(20);
@ -631,7 +632,7 @@ public class ProxyServletTest
}
});
int port = serverConnector.getLocalPort();
client.getProxyConfiguration().getExcludedOrigins().add("127.0.0.1:" + port);
client.getProxyConfiguration().getProxies().get(0).getExcludedOrigins().add(new Origin("http", "127.0.0.1", port));
// Try with a proxied host
ContentResponse response = client.newRequest("localhost", port)
@ -865,7 +866,7 @@ public class ProxyServletTest
}
});
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();

View File

@ -26,17 +26,16 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.ProxyConfiguration;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpHeader;
@ -143,7 +142,7 @@ public class ProxyTunnellingTest
startProxy();
HttpClient httpClient = new HttpClient(sslContextFactory);
httpClient.setProxyConfiguration(new ProxyConfiguration("localhost", proxyPort()));
httpClient.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyPort()));
httpClient.start();
try
@ -172,7 +171,7 @@ public class ProxyTunnellingTest
startProxy();
HttpClient httpClient = new HttpClient(sslContextFactory);
httpClient.setProxyConfiguration(new ProxyConfiguration("localhost", proxyPort()));
httpClient.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyPort()));
httpClient.start();
try
@ -215,7 +214,7 @@ public class ProxyTunnellingTest
startProxy();
final HttpClient httpClient = new HttpClient(sslContextFactory);
httpClient.setProxyConfiguration(new ProxyConfiguration("localhost", proxyPort()));
httpClient.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyPort()));
httpClient.start();
try
@ -285,7 +284,7 @@ public class ProxyTunnellingTest
stopProxy();
HttpClient httpClient = new HttpClient(sslContextFactory);
httpClient.setProxyConfiguration(new ProxyConfiguration("localhost", proxyPort));
httpClient.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyPort));
httpClient.start();
try
@ -317,7 +316,7 @@ public class ProxyTunnellingTest
startProxy();
HttpClient httpClient = new HttpClient(sslContextFactory);
httpClient.setProxyConfiguration(new ProxyConfiguration("localhost", proxyPort()));
httpClient.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyPort()));
httpClient.start();
try
@ -354,7 +353,7 @@ public class ProxyTunnellingTest
});
HttpClient httpClient = new HttpClient(sslContextFactory);
httpClient.setProxyConfiguration(new ProxyConfiguration("localhost", proxyPort()));
httpClient.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyPort()));
httpClient.start();
try
@ -394,7 +393,7 @@ public class ProxyTunnellingTest
sslContextFactory.start();
HttpClient httpClient = new HttpClient(sslContextFactory);
httpClient.setProxyConfiguration(new ProxyConfiguration(proxyHost, proxyPort));
httpClient.getProxyConfiguration().getProxies().add(new HttpProxy(proxyHost, proxyPort));
httpClient.start();
try

View File

@ -20,10 +20,11 @@ package org.eclipse.jetty.spdy.client;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.Map;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RuntimeIOException;
@ -32,20 +33,22 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class NextProtoNegoClientConnection extends AbstractConnection implements NextProtoNego.ClientProvider
public class NPNClientConnection extends AbstractConnection implements NextProtoNego.ClientProvider
{
private final Logger LOG = Log.getLogger(getClass());
private final SPDYClient client;
private final Object attachment;
private final ClientConnectionFactory connectionFactory;
private final SSLEngine engine;
private final Map<String, Object> context;
private volatile boolean completed;
public NextProtoNegoClientConnection(EndPoint endPoint, Executor executor, SPDYClient client, Object attachment, SSLEngine engine)
public NPNClientConnection(EndPoint endPoint, SPDYClient client, ClientConnectionFactory connectionFactory, SSLEngine sslEngine, Map<String, Object> context)
{
super(endPoint, executor);
super(endPoint, client.getFactory().getExecutor());
this.client = client;
this.attachment = attachment;
this.engine = engine;
this.connectionFactory = connectionFactory;
this.engine = sslEngine;
this.context = context;
NextProtoNego.put(engine, this);
}
@ -92,7 +95,7 @@ public class NextProtoNegoClientConnection extends AbstractConnection implements
{
LOG.debug(x);
NextProtoNego.remove(engine);
getEndPoint().close();
close();
return -1;
}
}
@ -115,16 +118,22 @@ public class NextProtoNegoClientConnection extends AbstractConnection implements
{
NextProtoNego.remove(engine);
completed = true;
String protocol = client.selectProtocol(protocols);
return protocol == null ? null : protocol;
return client.selectProtocol(protocols);
}
private void replaceConnection()
{
EndPoint endPoint = getEndPoint();
Connection connection = client.getConnectionFactory().newConnection(endPoint, attachment);
endPoint.getConnection().onClose();
endPoint.setConnection(connection);
connection.onOpen();
try
{
Connection oldConnection = endPoint.getConnection();
Connection newConnection = connectionFactory.newConnection(endPoint, context);
ClientConnectionFactory.Helper.replaceConnection(oldConnection, newConnection);
}
catch (Throwable x)
{
LOG.debug(x);
close();
}
}
}

View File

@ -0,0 +1,47 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.spdy.client;
import java.io.IOException;
import java.util.Map;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
public class NPNClientConnectionFactory implements ClientConnectionFactory
{
private final SPDYClient client;
private final ClientConnectionFactory connectionFactory;
public NPNClientConnectionFactory(SPDYClient client, ClientConnectionFactory connectionFactory)
{
this.client = client;
this.connectionFactory = connectionFactory;
}
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
return new NPNClientConnection(endPoint, client, connectionFactory,
(SSLEngine)context.get(SslClientConnectionFactory.SSL_ENGINE_CONTEXT_KEY), context);
}
}

View File

@ -19,26 +19,28 @@
package org.eclipse.jetty.spdy.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.io.ssl.SslConnection.DecryptedEndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.spdy.FlowControlStrategy;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Session;
@ -55,7 +57,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
/**
* A {@link SPDYClient} allows applications to connect to one or more SPDY servers,
* obtaining {@link Session} objects that can be used to send/receive SPDY frames.
* <p />
* <p/>
* {@link SPDYClient} instances are created through a {@link Factory}:
* <pre>
* SPDYClient.Factory factory = new SPDYClient.Factory();
@ -76,6 +78,7 @@ public class SPDYClient
private volatile long idleTimeout = -1;
private volatile int initialWindowSize;
private volatile boolean dispatchIO;
private volatile ClientConnectionFactory connectionFactory;
protected SPDYClient(short version, Factory factory)
{
@ -83,6 +86,10 @@ public class SPDYClient
this.factory = factory;
setInitialWindowSize(65536);
setDispatchIO(true);
ClientConnectionFactory connectionFactory = new SPDYClientConnectionFactory();
if (factory.sslContextFactory != null)
connectionFactory = new SslClientConnectionFactory(factory.getSslContextFactory(), factory.getByteBufferPool(), factory.getExecutor(), new NPNClientConnectionFactory(this, connectionFactory));
setClientConnectionFactory(connectionFactory);
}
public short getVersion()
@ -95,6 +102,85 @@ public class SPDYClient
return factory;
}
/**
* Equivalent to:
* <pre>
* Future&lt;Session&gt; promise = new FuturePromise&lt;&gt;();
* connect(address, listener, promise);
* </pre>
*
* @param address the address to connect to
* @param listener the session listener that will be notified of session events
* @return a {@link Session} when connected
*/
public Session connect(SocketAddress address, SessionFrameListener listener) throws ExecutionException, InterruptedException
{
FuturePromise<Session> promise = new FuturePromise<>();
connect(address, listener, promise);
return promise.get();
}
/**
* Equivalent to:
* <pre>
* connect(address, listener, promise, null);
* </pre>
*
* @param address the address to connect to
* @param listener the session listener that will be notified of session events
* @param promise the promise notified of connection success/failure
*/
public void connect(SocketAddress address, SessionFrameListener listener, Promise<Session> promise)
{
connect(address, listener, promise, new HashMap<String, Object>());
}
/**
* Connects to the given {@code address}, binding the given {@code listener} to session events,
* and notified the given {@code promise} of the connect result.
* <p/>
* If the connect operation is successful, the {@code promise} will be invoked with the {@link Session}
* object that applications can use to perform SPDY requests.
*
* @param address the address to connect to
* @param listener the session listener that will be notified of session events
* @param promise the promise notified of connection success/failure
* @param context a context object passed to the {@link #getClientConnectionFactory() ConnectionFactory}
* for the creation of the connection
*/
public void connect(final SocketAddress address, final SessionFrameListener listener, final Promise<Session> promise, Map<String, Object> context)
{
if (!factory.isStarted())
throw new IllegalStateException(Factory.class.getSimpleName() + " is not started");
try
{
SocketChannel channel = SocketChannel.open();
if (bindAddress != null)
channel.bind(bindAddress);
configure(channel);
channel.configureBlocking(false);
channel.connect(address);
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, ((InetSocketAddress)address).getHostString());
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, ((InetSocketAddress)address).getPort());
context.put(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY, this);
context.put(SPDYClientConnectionFactory.SPDY_SESSION_LISTENER_CONTEXT_KEY, listener);
context.put(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY, promise);
factory.selector.connect(channel, context);
}
catch (IOException x)
{
promise.failed(x);
}
}
protected void configure(SocketChannel channel) throws IOException
{
channel.socket().setTcpNoDelay(true);
}
/**
* @return the address to bind the socket channel to
* @see #setBindAddress(SocketAddress)
@ -113,58 +199,6 @@ public class SPDYClient
this.bindAddress = bindAddress;
}
/**
* Equivalent to:
* <pre>
* Future&lt;Session&gt; promise = new FuturePromise&lt;&gt;();
* connect(address, listener, promise);
* </pre>
*
* @param address the address to connect to
* @param listener the session listener that will be notified of session events
* @return a {@link Session} when connected
*/
public Session connect(SocketAddress address, SessionFrameListener listener) throws ExecutionException, InterruptedException
{
FuturePromise<Session> promise = new FuturePromise<>();
connect(address, listener, promise);
return promise.get();
}
/**
* Connects to the given {@code address}, binding the given {@code listener} to session events,
* and notified the given {@code promise} of the connect result.
* <p />
* If the connect operation is successful, the {@code promise} will be invoked with the {@link Session}
* object that applications can use to perform SPDY requests.
*
* @param address the address to connect to
* @param listener the session listener that will be notified of session events
* @param promise the promise notified of connection success/failure
*/
public void connect(SocketAddress address, SessionFrameListener listener, Promise<Session> promise)
{
if (!factory.isStarted())
throw new IllegalStateException(Factory.class.getSimpleName() + " is not started");
try
{
SocketChannel channel = SocketChannel.open();
if (bindAddress != null)
channel.bind(bindAddress);
channel.socket().setTcpNoDelay(true);
channel.configureBlocking(false);
channel.connect(address);
SessionContext context = new SessionContext(this, listener, promise);
factory.selector.connect(channel, context);
}
catch (IOException x)
{
promise.failed(x);
}
}
public long getIdleTimeout()
{
return idleTimeout;
@ -195,6 +229,16 @@ public class SPDYClient
this.dispatchIO = dispatchIO;
}
public ClientConnectionFactory getClientConnectionFactory()
{
return connectionFactory;
}
public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
this.connectionFactory = connectionFactory;
}
protected String selectProtocol(List<String> serverProtocols)
{
String protocol = "spdy/" + version;
@ -206,20 +250,6 @@ public class SPDYClient
return null;
}
public SPDYClientConnectionFactory getConnectionFactory()
{
return factory.connectionFactory;
}
protected SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel)
{
String peerHost = channel.socket().getInetAddress().getHostName();
int peerPort = channel.socket().getPort();
SSLEngine engine = sslContextFactory.newSSLEngine(peerHost, peerPort);
engine.setUseClientMode(true);
return engine;
}
protected FlowControlStrategy newFlowControlStrategy()
{
return FlowControlStrategyFactory.newFlowControlStrategy(version);
@ -227,7 +257,6 @@ public class SPDYClient
public static class Factory extends ContainerLifeCycle
{
private final SPDYClientConnectionFactory connectionFactory = new SPDYClientConnectionFactory();
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ByteBufferPool bufferPool = new MappedByteBufferPool();
private final Scheduler scheduler;
@ -301,6 +330,11 @@ public class SPDYClient
return executor;
}
public SslContextFactory getSslContextFactory()
{
return sslContextFactory;
}
public long getConnectTimeout()
{
return connectTimeout;
@ -358,80 +392,33 @@ public class SPDYClient
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
SessionContext context = (SessionContext)key.attachment();
long clientIdleTimeout = context.client.getIdleTimeout();
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)key.attachment();
SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
long clientIdleTimeout = client.getIdleTimeout();
if (clientIdleTimeout < 0)
clientIdleTimeout = idleTimeout;
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), clientIdleTimeout);
}
@Override
public Connection newConnection(final SocketChannel channel, EndPoint endPoint, final Object attachment)
public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
SessionContext context = (SessionContext)attachment;
final SPDYClient client = context.client;
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
try
{
if (sslContextFactory != null)
{
final SSLEngine engine = client.newSSLEngine(sslContextFactory, channel);
SslConnection sslConnection = new SslConnection(bufferPool, getExecutor(), endPoint, engine);
sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed());
DecryptedEndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
NextProtoNegoClientConnection connection = new NextProtoNegoClientConnection(sslEndPoint, getExecutor(), client, attachment, engine);
sslEndPoint.setConnection(connection);
return sslConnection;
}
else
{
return connectionFactory.newConnection(endPoint, attachment);
}
SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
return client.getClientConnectionFactory().newConnection(endPoint, context);
}
catch (RuntimeException x)
catch (Throwable x)
{
context.failed(x);
@SuppressWarnings("unchecked")
Promise<Session> promise = (Promise<Session>)context.get(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY);
promise.failed(x);
throw x;
}
}
}
}
protected static class SessionContext implements Promise<Session>
{
private final Promise<Session> promise;
private final SPDYClient client;
private final SessionFrameListener listener;
private SessionContext(SPDYClient client, SessionFrameListener listener, Promise<Session> promise)
{
this.client = client;
this.listener = listener;
this.promise = promise;
}
public SPDYClient getSPDYClient()
{
return client;
}
public SessionFrameListener getSessionFrameListener()
{
return listener;
}
@Override
public void succeeded(Session result)
{
promise.succeeded(result);
}
@Override
public void failed(Throwable cause)
{
promise.failed(cause);
}
}
}

View File

@ -18,44 +18,54 @@
package org.eclipse.jetty.spdy.client;
import java.io.IOException;
import java.util.Map;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.spdy.CompressionFactory;
import org.eclipse.jetty.spdy.FlowControlStrategy;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.StandardSession;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.client.SPDYClient.Factory;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.util.Promise;
public class SPDYClientConnectionFactory
public class SPDYClientConnectionFactory implements ClientConnectionFactory
{
public Connection newConnection(EndPoint endPoint, Object attachment)
{
SPDYClient.SessionContext context = (SPDYClient.SessionContext)attachment;
SPDYClient client = context.getSPDYClient();
Factory factory = client.getFactory();
ByteBufferPool bufferPool = factory.getByteBufferPool();
public static final String SPDY_CLIENT_CONTEXT_KEY = "spdy.client";
public static final String SPDY_SESSION_LISTENER_CONTEXT_KEY = "spdy.session.listener";
public static final String SPDY_SESSION_PROMISE_CONTEXT_KEY = "spdy.session.promise";
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
SPDYClient client = (SPDYClient)context.get(SPDY_CLIENT_CONTEXT_KEY);
SPDYClient.Factory factory = client.getFactory();
ByteBufferPool byteBufferPool = factory.getByteBufferPool();
CompressionFactory compressionFactory = new StandardCompressionFactory();
Parser parser = new Parser(compressionFactory.newDecompressor());
Generator generator = new Generator(bufferPool, compressionFactory.newCompressor());
Generator generator = new Generator(byteBufferPool, compressionFactory.newCompressor());
SPDYConnection connection = new ClientSPDYConnection(endPoint, bufferPool, parser, factory, client.isDispatchIO());
SPDYConnection connection = new ClientSPDYConnection(endPoint, byteBufferPool, parser, factory, client.isDispatchIO());
FlowControlStrategy flowControlStrategy = client.newFlowControlStrategy();
StandardSession session = new StandardSession(client.getVersion(), bufferPool, factory.getExecutor(),
factory.getScheduler(), connection, endPoint, connection, 1, context.getSessionFrameListener(),
generator, flowControlStrategy);
SessionFrameListener listener = (SessionFrameListener)context.get(SPDY_SESSION_LISTENER_CONTEXT_KEY);
StandardSession session = new StandardSession(client.getVersion(), byteBufferPool, factory.getExecutor(),
factory.getScheduler(), connection, endPoint, connection, 1, listener, generator, flowControlStrategy);
session.setWindowSize(client.getInitialWindowSize());
parser.addListener(session);
connection.setSession(session);
/*connection = context.getConnectionProvider().decorateConnection(endPoint, connection);*/
context.succeeded(session);
@SuppressWarnings("unchecked")
Promise<Session> promise = (Promise<Session>)context.get(SPDY_SESSION_PROMISE_CONTEXT_KEY);
promise.succeeded(session);
return connection;
}

View File

@ -18,12 +18,17 @@
package org.eclipse.jetty.spdy.client.http;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Map;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.client.SPDYClient;
@ -32,11 +37,22 @@ import org.eclipse.jetty.util.Promise;
public class HttpClientTransportOverSPDY implements HttpClientTransport
{
private final SPDYClient client;
private volatile HttpClient httpClient;
private final ClientConnectionFactory connectionFactory;
private HttpClient httpClient;
public HttpClientTransportOverSPDY(SPDYClient client)
{
this.client = client;
this.connectionFactory = client.getClientConnectionFactory();
client.setClientConnectionFactory(new ClientConnectionFactory()
{
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
}
});
}
@Override
@ -46,51 +62,46 @@ public class HttpClientTransportOverSPDY implements HttpClientTransport
}
@Override
public HttpDestination newHttpDestination(String scheme, String host, int port)
public HttpDestination newHttpDestination(Origin origin)
{
return new HttpDestinationOverSPDY(httpClient, scheme, host, port);
return new HttpDestinationOverSPDY(httpClient, origin);
}
@Override
public void connect(final HttpDestination destination, SocketAddress address, final Promise<Connection> promise)
public void connect(SocketAddress address, Map<String, Object> context)
{
final HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
@SuppressWarnings("unchecked")
final Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
SessionFrameListener.Adapter listener = new SessionFrameListener.Adapter()
{
@Override
public void onFailure(Session session, Throwable x)
{
// TODO: is this correct ?
// TODO: if I get a stream error (e.g. invalid response headers)
// TODO: I must abort the *current* exchange, while below I will abort
// TODO: the queued exchanges only.
// TODO: The problem is that a single destination/connection multiplexes
// TODO: several exchanges, so I would need to cancel them all,
// TODO: or only the one that failed ?
destination.abort(x);
}
};
client.connect(address, listener, new Promise<Session>()
{
@Override
public void succeeded(Session session)
{
Connection result = new HttpConnectionOverSPDY(destination, session);
promise.succeeded(result);
}
{
@Override
public void succeeded(Session session)
{
promise.succeeded(new HttpConnectionOverSPDY(destination, session));
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
}
);
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
}, context);
}
@Override
public Connection tunnel(Connection connection)
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
throw new UnsupportedOperationException();
return connectionFactory.newConnection(endPoint, context);
}
}

View File

@ -21,12 +21,13 @@ package org.eclipse.jetty.spdy.client.http;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.MultiplexHttpDestination;
import org.eclipse.jetty.client.Origin;
public class HttpDestinationOverSPDY extends MultiplexHttpDestination<HttpConnectionOverSPDY>
{
public HttpDestinationOverSPDY(HttpClient client, String scheme, String host, int port)
public HttpDestinationOverSPDY(HttpClient client, Origin origin)
{
super(client, scheme, host, port);
super(client, origin);
}
@Override
@ -34,4 +35,12 @@ public class HttpDestinationOverSPDY extends MultiplexHttpDestination<HttpConnec
{
connection.send(exchange);
}
@Override
public void abort(Throwable cause)
{
// TODO: in case of connection failure, we need to abort also
// TODO: all pending exchanges, so we need to track them.
super.abort(cause);
}
}

View File

@ -0,0 +1,261 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.spdy.client.http;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.ProxyConfiguration;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.client.SPDYClient;
import org.eclipse.jetty.spdy.server.http.HTTPSPDYServerConnectionFactory;
import org.eclipse.jetty.spdy.server.http.PushStrategy;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class HttpClientCustomProxyTest
{
public static final byte[] CAFE_BABE = new byte[]{(byte)0xCA, (byte)0xFE, (byte)0xBA, (byte)0xBE};
private Server server;
private ServerConnector connector;
private SPDYClient.Factory factory;
private HttpClient httpClient;
public void prepare(Handler handler) throws Exception
{
server = new Server();
connector = new ServerConnector(server, new CAFEBABEServerConnectionFactory(new HTTPSPDYServerConnectionFactory(SPDY.V3, new HttpConfiguration(), new PushStrategy.None())));
server.addConnector(connector);
server.setHandler(handler);
server.start();
QueuedThreadPool executor = new QueuedThreadPool();
executor.setName(executor.getName() + "-client");
factory = new SPDYClient.Factory(executor);
factory.start();
httpClient = new HttpClient(new HttpClientTransportOverSPDY(factory.newSPDYClient(SPDY.V3)), null);
httpClient.setExecutor(executor);
httpClient.start();
}
@After
public void dispose() throws Exception
{
if (httpClient != null)
httpClient.stop();
if (factory != null)
factory.stop();
if (server != null)
server.stop();
}
@Test
public void testCustomProxy() throws Exception
{
final String serverHost = "server";
final int status = HttpStatus.NO_CONTENT_204;
prepare(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
if (!URI.create(baseRequest.getUri().toString()).isAbsolute())
response.setStatus(HttpServletResponse.SC_USE_PROXY);
else if (serverHost.equals(request.getServerName()))
response.setStatus(status);
else
response.setStatus(HttpServletResponse.SC_NOT_ACCEPTABLE);
}
});
// Setup the custom proxy
int proxyPort = connector.getLocalPort();
int serverPort = proxyPort + 1; // Any port will do for these tests - just not the same as the proxy
httpClient.getProxyConfiguration().getProxies().add(new CAFEBABEProxy(new Origin.Address("localhost", proxyPort), false));
ContentResponse response = httpClient.newRequest(serverHost, serverPort)
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(status, response.getStatus());
}
private class CAFEBABEProxy extends ProxyConfiguration.Proxy
{
private CAFEBABEProxy(Origin.Address address, boolean secure)
{
super(address, secure);
}
@Override
public ClientConnectionFactory newClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
return new CAFEBABEClientConnectionFactory(connectionFactory);
}
}
private static class CAFEBABEClientConnectionFactory implements ClientConnectionFactory
{
private final ClientConnectionFactory connectionFactory;
private CAFEBABEClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
this.connectionFactory = connectionFactory;
}
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
Executor executor = destination.getHttpClient().getExecutor();
return new CAFEBABEConnection(endPoint, executor, connectionFactory, context);
}
}
private static class CAFEBABEConnection extends AbstractConnection
{
private final ClientConnectionFactory connectionFactory;
private final Map<String, Object> context;
public CAFEBABEConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, Map<String, Object> context)
{
super(endPoint, executor);
this.connectionFactory = connectionFactory;
this.context = context;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
getEndPoint().write(new Callback.Adapter(), ByteBuffer.wrap(CAFE_BABE));
}
@Override
public void onFillable()
{
try
{
ByteBuffer buffer = BufferUtil.allocate(4);
int filled = getEndPoint().fill(buffer);
Assert.assertEquals(4, filled);
Assert.assertArrayEquals(CAFE_BABE, buffer.array());
// We are good, upgrade the connection
ClientConnectionFactory.Helper.replaceConnection(this, connectionFactory.newConnection(getEndPoint(), context));
}
catch (Throwable x)
{
close();
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(x);
}
}
}
private class CAFEBABEServerConnectionFactory extends AbstractConnectionFactory
{
private final org.eclipse.jetty.server.ConnectionFactory connectionFactory;
private CAFEBABEServerConnectionFactory(org.eclipse.jetty.server.ConnectionFactory connectionFactory)
{
super("cafebabe");
this.connectionFactory = connectionFactory;
}
@Override
public org.eclipse.jetty.io.Connection newConnection(Connector connector, EndPoint endPoint)
{
return new CAFEBABEServerConnection(connector, endPoint, connectionFactory);
}
}
private class CAFEBABEServerConnection extends AbstractConnection
{
private final org.eclipse.jetty.server.ConnectionFactory connectionFactory;
public CAFEBABEServerConnection(Connector connector, EndPoint endPoint, org.eclipse.jetty.server.ConnectionFactory connectionFactory)
{
super(endPoint, connector.getExecutor());
this.connectionFactory = connectionFactory;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onFillable()
{
try
{
ByteBuffer buffer = BufferUtil.allocate(4);
int filled = getEndPoint().fill(buffer);
Assert.assertEquals(4, filled);
Assert.assertArrayEquals(CAFE_BABE, buffer.array());
getEndPoint().write(new Callback.Adapter(), buffer);
// We are good, upgrade the connection
ClientConnectionFactory.Helper.replaceConnection(this, connectionFactory.newConnection(connector, getEndPoint()));
}
catch (Throwable x)
{
close();
}
}
}
}