Fixes #1656 - Improve configurability of ConnectionPools.

Introduced ConnectionPool.Factory and HttpClientTransport.connectionPoolFactory.
This allows applications to create a ConnectionPool given the HttpDestination.
This commit is contained in:
Simone Bordet 2017-07-04 11:23:17 +02:00 committed by Joakim Erdfelt
parent 2202d333fe
commit 191b6e044f
13 changed files with 313 additions and 280 deletions

View File

@ -0,0 +1,183 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Map;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
public abstract class AbstractConnectorHttpClientTransport extends AbstractHttpClientTransport
{
private final int selectors;
private SelectorManager selectorManager;
protected AbstractConnectorHttpClientTransport(int selectors)
{
this.selectors = selectors;
}
@ManagedAttribute(value = "The number of selectors", readonly = true)
public int getSelectors()
{
return selectors;
}
@Override
protected void doStart() throws Exception
{
HttpClient httpClient = getHttpClient();
selectorManager = newSelectorManager(httpClient);
selectorManager.setConnectTimeout(httpClient.getConnectTimeout());
addBean(selectorManager);
super.doStart();
}
@Override
protected void doStop() throws Exception
{
super.doStop();
removeBean(selectorManager);
}
@Override
public void connect(InetSocketAddress address, Map<String, Object> context)
{
SocketChannel channel = null;
try
{
channel = SocketChannel.open();
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
SocketAddress bindAddress = client.getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
configure(client, channel);
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());
if (client.isConnectBlocking())
{
channel.socket().connect(address, (int)client.getConnectTimeout());
channel.configureBlocking(false);
selectorManager.accept(channel, context);
}
else
{
channel.configureBlocking(false);
if (channel.connect(address))
selectorManager.accept(channel, context);
else
selectorManager.connect(channel, context);
}
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
// If IPv6 is not deployed, a generic SocketException "Network is unreachable"
// exception is being thrown, so we attempt to provide a better error message.
if (x.getClass() == SocketException.class)
x = new SocketException("Could not connect to " + address).initCause(x);
try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
finally
{
connectFailed(context, x);
}
}
}
protected void connectFailed(Map<String, Object> context, Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(x);
}
protected void configure(HttpClient client, SocketChannel channel) throws IOException
{
channel.socket().setTcpNoDelay(client.isTCPNoDelay());
}
protected SelectorManager newSelectorManager(HttpClient client)
{
return new ClientSelectorManager(client, getSelectors());
}
protected class ClientSelectorManager extends SelectorManager
{
private final HttpClient client;
protected ClientSelectorManager(HttpClient client, int selectors)
{
super(client.getExecutor(), client.getScheduler(), selectors);
this.client = client;
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
endp.setIdleTimeout(client.getIdleTimeout());
return endp;
}
@Override
public org.eclipse.jetty.io.Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
}
@Override
protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
connectFailed(context, x);
}
}
}

View File

@ -18,24 +18,6 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Map;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -46,14 +28,8 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
{ {
protected static final Logger LOG = Log.getLogger(HttpClientTransport.class); protected static final Logger LOG = Log.getLogger(HttpClientTransport.class);
private final int selectors; private HttpClient client;
private volatile HttpClient client; private ConnectionPool.Factory factory;
private volatile SelectorManager selectorManager;
protected AbstractHttpClientTransport(int selectors)
{
this.selectors = selectors;
}
protected HttpClient getHttpClient() protected HttpClient getHttpClient()
{ {
@ -66,137 +42,15 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
this.client = client; this.client = client;
} }
@ManagedAttribute(value = "The number of selectors", readonly = true) @Override
public int getSelectors() public ConnectionPool.Factory getConnectionPoolFactory()
{ {
return selectors; return factory;
} }
@Override @Override
protected void doStart() throws Exception public void setConnectionPoolFactory(ConnectionPool.Factory factory)
{ {
selectorManager = newSelectorManager(client); this.factory = factory;
selectorManager.setConnectTimeout(client.getConnectTimeout());
addBean(selectorManager);
super.doStart();
}
@Override
protected void doStop() throws Exception
{
super.doStop();
removeBean(selectorManager);
}
@Override
public void connect(InetSocketAddress address, Map<String, Object> context)
{
SocketChannel channel = null;
try
{
channel = SocketChannel.open();
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
SocketAddress bindAddress = client.getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
configure(client, channel);
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());
if (client.isConnectBlocking())
{
channel.socket().connect(address, (int)client.getConnectTimeout());
channel.configureBlocking(false);
selectorManager.accept(channel, context);
}
else
{
channel.configureBlocking(false);
if (channel.connect(address))
selectorManager.accept(channel, context);
else
selectorManager.connect(channel, context);
}
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
// If IPv6 is not deployed, a generic SocketException "Network is unreachable"
// exception is being thrown, so we attempt to provide a better error message.
if (x.getClass() == SocketException.class)
x = new SocketException("Could not connect to " + address).initCause(x);
try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
finally
{
connectFailed(context, x);
}
}
}
protected void connectFailed(Map<String, Object> context, Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(x);
}
protected void configure(HttpClient client, SocketChannel channel) throws IOException
{
channel.socket().setTcpNoDelay(client.isTCPNoDelay());
}
protected SelectorManager newSelectorManager(HttpClient client)
{
return new ClientSelectorManager(client, selectors);
}
protected class ClientSelectorManager extends SelectorManager
{
private final HttpClient client;
protected ClientSelectorManager(HttpClient client, int selectors)
{
super(client.getExecutor(), client.getScheduler(), selectors);
this.client = client;
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
endp.setIdleTimeout(client.getIdleTimeout());
return endp;
}
@Override
public org.eclipse.jetty.io.Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
}
@Override
protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
connectFailed(context, x);
}
} }
} }

View File

@ -22,20 +22,73 @@ import java.io.Closeable;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
/**
* <p>Client-side connection pool abstraction.</p>
*/
public interface ConnectionPool extends Closeable public interface ConnectionPool extends Closeable
{ {
/**
* @param connection the connection to test
* @return whether the given connection is currently in use
*/
boolean isActive(Connection connection); boolean isActive(Connection connection);
/**
* @return whether this ConnectionPool has no open connections
*/
boolean isEmpty(); boolean isEmpty();
/**
* @return whether this ConnectionPool has been closed
* @see #close()
*/
boolean isClosed(); boolean isClosed();
/**
* <p>Returns an idle connection, if available, or schedules the opening
* of a new connection and returns {@code null}.</p>
*
* @return an available connection, or null
*/
Connection acquire(); Connection acquire();
/**
* <p>Returns the given connection, previously obtained via {@link #acquire()},
* back to this ConnectionPool.</p>
*
* @param connection the connection to release
* @return true if the connection has been released, false if the connection
* was not obtained from the this ConnectionPool
*/
boolean release(Connection connection); boolean release(Connection connection);
/**
* <p>Removes the given connection from this ConnectionPool.</p>
*
* @param connection the connection to remove
* @return true if the connection was removed from this ConnectionPool
*/
boolean remove(Connection connection); boolean remove(Connection connection);
/**
* Closes this ConnectionPool.
*
* @see #isClosed()
*/
@Override @Override
void close(); void close();
/**
* Factory for ConnectionPool instances.
*/
interface Factory
{
/**
* Creates a new ConnectionPool for the given destination.
*
* @param destination the destination to create the ConnectionPool for
* @return the newly created ConnectionPool
*/
ConnectionPool newConnectionPool(HttpDestination destination);
}
} }

View File

@ -68,4 +68,14 @@ public interface HttpClientTransport extends ClientConnectionFactory
* @param context the context information to establish the connection * @param context the context information to establish the connection
*/ */
public void connect(InetSocketAddress address, Map<String, Object> context); public void connect(InetSocketAddress address, Map<String, Object> context);
/**
* @return the factory for ConnectionPool instances
*/
public ConnectionPool.Factory getConnectionPoolFactory();
/**
* @param factory the factory for ConnectionPool instances
*/
public void setConnectionPoolFactory(ConnectionPool.Factory factory);
} }

View File

@ -114,7 +114,10 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
removeBean(connectionPool); removeBean(connectionPool);
} }
protected abstract ConnectionPool newConnectionPool(HttpClient client); protected ConnectionPool newConnectionPool(HttpClient client)
{
return client.getTransport().getConnectionPoolFactory().newConnectionPool(this);
}
protected Queue<HttpExchange> newExchangeQueue(HttpClient client) protected Queue<HttpExchange> newExchangeQueue(HttpClient client)
{ {

View File

@ -25,12 +25,6 @@ public abstract class MultiplexHttpDestination extends HttpDestination
super(client, origin); super(client, origin);
} }
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new MultiplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this,
client.getMaxRequestsQueuedPerDestination());
}
public int getMaxRequestsPerConnection() public int getMaxRequestsPerConnection()
{ {
ConnectionPool connectionPool = getConnectionPool(); ConnectionPool connectionPool = getConnectionPool();

View File

@ -24,9 +24,4 @@ public abstract class PoolingHttpDestination extends HttpDestination
{ {
super(client, origin); super(client, origin);
} }
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
}
} }

View File

@ -21,7 +21,8 @@ package org.eclipse.jetty.client.http;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import org.eclipse.jetty.client.AbstractHttpClientTransport; import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin; import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
@ -30,7 +31,7 @@ import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
@ManagedObject("The HTTP/1.1 client transport") @ManagedObject("The HTTP/1.1 client transport")
public class HttpClientTransportOverHTTP extends AbstractHttpClientTransport public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTransport
{ {
public HttpClientTransportOverHTTP() public HttpClientTransportOverHTTP()
{ {
@ -40,6 +41,7 @@ public class HttpClientTransportOverHTTP extends AbstractHttpClientTransport
public HttpClientTransportOverHTTP(int selectors) public HttpClientTransportOverHTTP(int selectors)
{ {
super(selectors); super(selectors);
setConnectionPoolFactory(destination -> new DuplexConnectionPool(destination, getHttpClient().getMaxConnectionsPerDestination(), destination));
} }
@Override @Override

View File

@ -29,7 +29,6 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.FutureResponseListener; import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.http.HttpHeaderValue;
@ -50,7 +49,11 @@ public class ValidatingConnectionPoolTest extends AbstractHttpClientServerTest
@Override @Override
protected void startClient() throws Exception protected void startClient() throws Exception
{ {
startClient(new ValidatingHttpClientTransportOverHTTP(1000)); long timeout = 1000;
HttpClientTransportOverHTTP transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(destination ->
new ValidatingConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, destination.getHttpClient().getScheduler(), timeout));
startClient(transport);
} }
@Test @Test
@ -177,28 +180,4 @@ public class ValidatingConnectionPoolTest extends AbstractHttpClientServerTest
ContentResponse response2 = listener2.get(5, TimeUnit.SECONDS); ContentResponse response2 = listener2.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response2.getStatus()); Assert.assertEquals(200, response2.getStatus());
} }
private static class ValidatingHttpClientTransportOverHTTP extends HttpClientTransportOverHTTP
{
private final long timeout;
public ValidatingHttpClientTransportOverHTTP(long timeout)
{
super(1);
this.timeout = timeout;
}
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new HttpDestinationOverHTTP(getHttpClient(), origin)
{
@Override
protected DuplexConnectionPool newConnectionPool(HttpClient client)
{
return new ValidatingConnectionPool(this, client.getMaxConnectionsPerDestination(), this, client.getScheduler(), timeout);
}
};
}
}
} }

View File

@ -21,8 +21,11 @@ package org.eclipse.jetty.fcgi.client.http;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import org.eclipse.jetty.client.AbstractHttpClientTransport; import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.Origin; import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
@ -34,7 +37,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
@ManagedObject("The FastCGI/1.0 client transport") @ManagedObject("The FastCGI/1.0 client transport")
public class HttpClientTransportOverFCGI extends AbstractHttpClientTransport public class HttpClientTransportOverFCGI extends AbstractConnectorHttpClientTransport
{ {
private final boolean multiplexed; private final boolean multiplexed;
private final String scriptRoot; private final String scriptRoot;
@ -49,6 +52,14 @@ public class HttpClientTransportOverFCGI extends AbstractHttpClientTransport
super(selectors); super(selectors);
this.multiplexed = multiplexed; this.multiplexed = multiplexed;
this.scriptRoot = scriptRoot; this.scriptRoot = scriptRoot;
setConnectionPoolFactory(destination ->
{
HttpClient httpClient = getHttpClient();
int maxConnections = httpClient.getMaxConnectionsPerDestination();
return isMultiplexed() ?
new MultiplexConnectionPool(destination, maxConnections, destination, httpClient.getMaxRequestsQueuedPerDestination()) :
new DuplexConnectionPool(destination, maxConnections, destination);
});
} }
public boolean isMultiplexed() public boolean isMultiplexed()

View File

@ -20,13 +20,10 @@ package org.eclipse.jetty.fcgi.server;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.LeakTrackingConnectionPool; import org.eclipse.jetty.client.LeakTrackingConnectionPool;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI; import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
import org.eclipse.jetty.fcgi.client.http.HttpDestinationOverFCGI;
import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.LeakTrackingByteBufferPool; import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.io.MappedByteBufferPool;
@ -72,28 +69,16 @@ public abstract class AbstractHttpClientServerTest
QueuedThreadPool executor = new QueuedThreadPool(); QueuedThreadPool executor = new QueuedThreadPool();
executor.setName(executor.getName() + "-client"); executor.setName(executor.getName() + "-client");
client = new HttpClient(new HttpClientTransportOverFCGI(1, false, "") HttpClientTransport transport = new HttpClientTransportOverFCGI(1, false, "");
transport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
{ {
@Override @Override
public HttpDestination newHttpDestination(Origin origin) protected void leaked(LeakDetector.LeakInfo leakInfo)
{ {
return new HttpDestinationOverFCGI(client, origin) connectionLeaks.incrementAndGet();
{
@Override
protected DuplexConnectionPool newConnectionPool(HttpClient client)
{
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
connectionLeaks.incrementAndGet();
}
};
}
};
} }
}, null); });
client = new HttpClient(transport, null);
client.setExecutor(executor); client.setExecutor(executor);
clientBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()); clientBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged());
client.setByteBufferPool(clientBufferPool); client.setByteBufferPool(clientBufferPool);

View File

@ -24,9 +24,10 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory; import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
import org.eclipse.jetty.client.AbstractHttpClientTransport;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.Origin; import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.ProxyConfiguration; import org.eclipse.jetty.client.ProxyConfiguration;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
@ -42,20 +43,23 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
@ManagedObject("The HTTP/2 client transport") @ManagedObject("The HTTP/2 client transport")
public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements HttpClientTransport public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
{ {
private final HTTP2Client client; private final HTTP2Client client;
private ClientConnectionFactory connectionFactory; private ClientConnectionFactory connectionFactory;
private HttpClient httpClient;
private boolean useALPN = true; private boolean useALPN = true;
public HttpClientTransportOverHTTP2(HTTP2Client client) public HttpClientTransportOverHTTP2(HTTP2Client client)
{ {
this.client = client; this.client = client;
setConnectionPoolFactory(destination ->
{
HttpClient httpClient = getHttpClient();
return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, httpClient.getMaxRequestsQueuedPerDestination());
});
} }
@ManagedAttribute(value = "The number of selectors", readonly = true) @ManagedAttribute(value = "The number of selectors", readonly = true)
@ -79,6 +83,7 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
{ {
if (!client.isStarted()) if (!client.isStarted())
{ {
HttpClient httpClient = getHttpClient();
client.setExecutor(httpClient.getExecutor()); client.setExecutor(httpClient.getExecutor());
client.setScheduler(httpClient.getScheduler()); client.setScheduler(httpClient.getScheduler());
client.setByteBufferPool(httpClient.getByteBufferPool()); client.setByteBufferPool(httpClient.getByteBufferPool());
@ -104,34 +109,23 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
removeBean(client); removeBean(client);
} }
protected HttpClient getHttpClient()
{
return httpClient;
}
@Override
public void setHttpClient(HttpClient client)
{
httpClient = client;
}
@Override @Override
public HttpDestination newHttpDestination(Origin origin) public HttpDestination newHttpDestination(Origin origin)
{ {
return new HttpDestinationOverHTTP2(httpClient, origin); return new HttpDestinationOverHTTP2(getHttpClient(), origin);
} }
@Override @Override
public void connect(InetSocketAddress address, Map<String, Object> context) public void connect(InetSocketAddress address, Map<String, Object> context)
{ {
client.setConnectTimeout(httpClient.getConnectTimeout()); client.setConnectTimeout(getHttpClient().getConnectTimeout());
SessionListenerPromise listenerPromise = new SessionListenerPromise(context); SessionListenerPromise listenerPromise = new SessionListenerPromise(context);
HttpDestinationOverHTTP2 destination = (HttpDestinationOverHTTP2)context.get(HTTP_DESTINATION_CONTEXT_KEY); HttpDestinationOverHTTP2 destination = (HttpDestinationOverHTTP2)context.get(HTTP_DESTINATION_CONTEXT_KEY);
SslContextFactory sslContextFactory = null; SslContextFactory sslContextFactory = null;
if (HttpScheme.HTTPS.is(destination.getScheme())) if (HttpScheme.HTTPS.is(destination.getScheme()))
sslContextFactory = httpClient.getSslContextFactory(); sslContextFactory = getHttpClient().getSslContextFactory();
client.connect(sslContextFactory, address, listenerPromise, listenerPromise, context); client.connect(sslContextFactory, address, listenerPromise, listenerPromise, context);
} }
@ -139,7 +133,7 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
@Override @Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{ {
endPoint.setIdleTimeout(httpClient.getIdleTimeout()); endPoint.setIdleTimeout(getHttpClient().getIdleTimeout());
ClientConnectionFactory factory = connectionFactory; ClientConnectionFactory factory = connectionFactory;
HttpDestinationOverHTTP2 destination = (HttpDestinationOverHTTP2)context.get(HTTP_DESTINATION_CONTEXT_KEY); HttpDestinationOverHTTP2 destination = (HttpDestinationOverHTTP2)context.get(HTTP_DESTINATION_CONTEXT_KEY);

View File

@ -34,20 +34,14 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport; import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.LeakTrackingConnectionPool; import org.eclipse.jetty.client.LeakTrackingConnectionPool;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BytesContentProvider; import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI; import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
import org.eclipse.jetty.fcgi.client.http.HttpDestinationOverFCGI;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.io.ArrayByteBufferPool;
@ -97,55 +91,31 @@ public class HttpClientLoadTest extends AbstractTest
case HTTP: case HTTP:
case HTTPS: case HTTPS:
{ {
return new HttpClientTransportOverHTTP(1) HttpClientTransport clientTransport = new HttpClientTransportOverHTTP(1);
clientTransport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
{ {
@Override @Override
public HttpDestination newHttpDestination(Origin origin) protected void leaked(LeakDetector.LeakInfo leakInfo)
{ {
return new HttpDestinationOverHTTP(getHttpClient(), origin) super.leaked(leakInfo);
{ connectionLeaks.incrementAndGet();
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
super.leaked(leakInfo);
connectionLeaks.incrementAndGet();
}
};
}
};
} }
}; });
return clientTransport;
} }
case FCGI: case FCGI:
{ {
return new HttpClientTransportOverFCGI(1, false, "") HttpClientTransport clientTransport = new HttpClientTransportOverFCGI(1, false, "");
clientTransport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
{ {
@Override @Override
public HttpDestination newHttpDestination(Origin origin) protected void leaked(LeakDetector.LeakInfo leakInfo)
{ {
return new HttpDestinationOverFCGI(getHttpClient(), origin) super.leaked(leakInfo);
{ connectionLeaks.incrementAndGet();
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
super.leaked(leakInfo);
connectionLeaks.incrementAndGet();
}
};
}
};
} }
}; });
return clientTransport;
} }
default: default:
{ {