Fixes #1656 - Improve configurability of ConnectionPools.
Introduced ConnectionPool.Factory and HttpClientTransport.connectionPoolFactory. This allows applications to create a ConnectionPool given the HttpDestination.
This commit is contained in:
parent
578df575e1
commit
9d0bcdbce4
|
@ -0,0 +1,183 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.nio.channels.SelectableChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.ManagedSelector;
|
||||
import org.eclipse.jetty.io.SelectorManager;
|
||||
import org.eclipse.jetty.io.SocketChannelEndPoint;
|
||||
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
|
||||
public abstract class AbstractConnectorHttpClientTransport extends AbstractHttpClientTransport
|
||||
{
|
||||
private final int selectors;
|
||||
private SelectorManager selectorManager;
|
||||
|
||||
protected AbstractConnectorHttpClientTransport(int selectors)
|
||||
{
|
||||
this.selectors = selectors;
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "The number of selectors", readonly = true)
|
||||
public int getSelectors()
|
||||
{
|
||||
return selectors;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
HttpClient httpClient = getHttpClient();
|
||||
selectorManager = newSelectorManager(httpClient);
|
||||
selectorManager.setConnectTimeout(httpClient.getConnectTimeout());
|
||||
addBean(selectorManager);
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
super.doStop();
|
||||
removeBean(selectorManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect(InetSocketAddress address, Map<String, Object> context)
|
||||
{
|
||||
SocketChannel channel = null;
|
||||
try
|
||||
{
|
||||
channel = SocketChannel.open();
|
||||
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
|
||||
HttpClient client = destination.getHttpClient();
|
||||
SocketAddress bindAddress = client.getBindAddress();
|
||||
if (bindAddress != null)
|
||||
channel.bind(bindAddress);
|
||||
configure(client, channel);
|
||||
|
||||
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
|
||||
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());
|
||||
|
||||
if (client.isConnectBlocking())
|
||||
{
|
||||
channel.socket().connect(address, (int)client.getConnectTimeout());
|
||||
channel.configureBlocking(false);
|
||||
selectorManager.accept(channel, context);
|
||||
}
|
||||
else
|
||||
{
|
||||
channel.configureBlocking(false);
|
||||
if (channel.connect(address))
|
||||
selectorManager.accept(channel, context);
|
||||
else
|
||||
selectorManager.connect(channel, context);
|
||||
}
|
||||
}
|
||||
// Must catch all exceptions, since some like
|
||||
// UnresolvedAddressException are not IOExceptions.
|
||||
catch (Throwable x)
|
||||
{
|
||||
// If IPv6 is not deployed, a generic SocketException "Network is unreachable"
|
||||
// exception is being thrown, so we attempt to provide a better error message.
|
||||
if (x.getClass() == SocketException.class)
|
||||
x = new SocketException("Could not connect to " + address).initCause(x);
|
||||
|
||||
try
|
||||
{
|
||||
if (channel != null)
|
||||
channel.close();
|
||||
}
|
||||
catch (IOException xx)
|
||||
{
|
||||
LOG.ignore(xx);
|
||||
}
|
||||
finally
|
||||
{
|
||||
connectFailed(context, x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void connectFailed(Map<String, Object> context, Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
|
||||
@SuppressWarnings("unchecked")
|
||||
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
|
||||
promise.failed(x);
|
||||
}
|
||||
|
||||
protected void configure(HttpClient client, SocketChannel channel) throws IOException
|
||||
{
|
||||
channel.socket().setTcpNoDelay(client.isTCPNoDelay());
|
||||
}
|
||||
|
||||
protected SelectorManager newSelectorManager(HttpClient client)
|
||||
{
|
||||
return new ClientSelectorManager(client, getSelectors());
|
||||
}
|
||||
|
||||
protected class ClientSelectorManager extends SelectorManager
|
||||
{
|
||||
private final HttpClient client;
|
||||
|
||||
protected ClientSelectorManager(HttpClient client, int selectors)
|
||||
{
|
||||
super(client.getExecutor(), client.getScheduler(), selectors);
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
|
||||
{
|
||||
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
|
||||
endp.setIdleTimeout(client.getIdleTimeout());
|
||||
return endp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.eclipse.jetty.io.Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> context = (Map<String, Object>)attachment;
|
||||
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
|
||||
return destination.getClientConnectionFactory().newConnection(endPoint, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> context = (Map<String, Object>)attachment;
|
||||
connectFailed(context, x);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,24 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.nio.channels.SelectableChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.ManagedSelector;
|
||||
import org.eclipse.jetty.io.SelectChannelEndPoint;
|
||||
import org.eclipse.jetty.io.SelectorManager;
|
||||
import org.eclipse.jetty.io.SocketChannelEndPoint;
|
||||
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
|
@ -46,14 +28,8 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
|
|||
{
|
||||
protected static final Logger LOG = Log.getLogger(HttpClientTransport.class);
|
||||
|
||||
private final int selectors;
|
||||
private volatile HttpClient client;
|
||||
private volatile SelectorManager selectorManager;
|
||||
|
||||
protected AbstractHttpClientTransport(int selectors)
|
||||
{
|
||||
this.selectors = selectors;
|
||||
}
|
||||
private HttpClient client;
|
||||
private ConnectionPool.Factory factory;
|
||||
|
||||
protected HttpClient getHttpClient()
|
||||
{
|
||||
|
@ -66,137 +42,15 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
|
|||
this.client = client;
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "The number of selectors", readonly = true)
|
||||
public int getSelectors()
|
||||
@Override
|
||||
public ConnectionPool.Factory getConnectionPoolFactory()
|
||||
{
|
||||
return selectors;
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
public void setConnectionPoolFactory(ConnectionPool.Factory factory)
|
||||
{
|
||||
selectorManager = newSelectorManager(client);
|
||||
selectorManager.setConnectTimeout(client.getConnectTimeout());
|
||||
addBean(selectorManager);
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
super.doStop();
|
||||
removeBean(selectorManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect(InetSocketAddress address, Map<String, Object> context)
|
||||
{
|
||||
SocketChannel channel = null;
|
||||
try
|
||||
{
|
||||
channel = SocketChannel.open();
|
||||
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
|
||||
HttpClient client = destination.getHttpClient();
|
||||
SocketAddress bindAddress = client.getBindAddress();
|
||||
if (bindAddress != null)
|
||||
channel.bind(bindAddress);
|
||||
configure(client, channel);
|
||||
|
||||
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
|
||||
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());
|
||||
|
||||
if (client.isConnectBlocking())
|
||||
{
|
||||
channel.socket().connect(address, (int)client.getConnectTimeout());
|
||||
channel.configureBlocking(false);
|
||||
selectorManager.accept(channel, context);
|
||||
}
|
||||
else
|
||||
{
|
||||
channel.configureBlocking(false);
|
||||
if (channel.connect(address))
|
||||
selectorManager.accept(channel, context);
|
||||
else
|
||||
selectorManager.connect(channel, context);
|
||||
}
|
||||
}
|
||||
// Must catch all exceptions, since some like
|
||||
// UnresolvedAddressException are not IOExceptions.
|
||||
catch (Throwable x)
|
||||
{
|
||||
// If IPv6 is not deployed, a generic SocketException "Network is unreachable"
|
||||
// exception is being thrown, so we attempt to provide a better error message.
|
||||
if (x.getClass() == SocketException.class)
|
||||
x = new SocketException("Could not connect to " + address).initCause(x);
|
||||
|
||||
try
|
||||
{
|
||||
if (channel != null)
|
||||
channel.close();
|
||||
}
|
||||
catch (IOException xx)
|
||||
{
|
||||
LOG.ignore(xx);
|
||||
}
|
||||
finally
|
||||
{
|
||||
connectFailed(context, x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void connectFailed(Map<String, Object> context, Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
|
||||
@SuppressWarnings("unchecked")
|
||||
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
|
||||
promise.failed(x);
|
||||
}
|
||||
|
||||
protected void configure(HttpClient client, SocketChannel channel) throws IOException
|
||||
{
|
||||
channel.socket().setTcpNoDelay(client.isTCPNoDelay());
|
||||
}
|
||||
|
||||
protected SelectorManager newSelectorManager(HttpClient client)
|
||||
{
|
||||
return new ClientSelectorManager(client, selectors);
|
||||
}
|
||||
|
||||
protected class ClientSelectorManager extends SelectorManager
|
||||
{
|
||||
private final HttpClient client;
|
||||
|
||||
protected ClientSelectorManager(HttpClient client, int selectors)
|
||||
{
|
||||
super(client.getExecutor(), client.getScheduler(), selectors);
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
|
||||
{
|
||||
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
|
||||
endp.setIdleTimeout(client.getIdleTimeout());
|
||||
return endp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.eclipse.jetty.io.Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> context = (Map<String, Object>)attachment;
|
||||
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
|
||||
return destination.getClientConnectionFactory().newConnection(endPoint, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> context = (Map<String, Object>)attachment;
|
||||
connectFailed(context, x);
|
||||
}
|
||||
this.factory = factory;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,20 +22,73 @@ import java.io.Closeable;
|
|||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
|
||||
/**
|
||||
* <p>Client-side connection pool abstraction.</p>
|
||||
*/
|
||||
public interface ConnectionPool extends Closeable
|
||||
{
|
||||
/**
|
||||
* @param connection the connection to test
|
||||
* @return whether the given connection is currently in use
|
||||
*/
|
||||
boolean isActive(Connection connection);
|
||||
|
||||
/**
|
||||
* @return whether this ConnectionPool has no open connections
|
||||
*/
|
||||
boolean isEmpty();
|
||||
|
||||
/**
|
||||
* @return whether this ConnectionPool has been closed
|
||||
* @see #close()
|
||||
*/
|
||||
boolean isClosed();
|
||||
|
||||
/**
|
||||
* <p>Returns an idle connection, if available, or schedules the opening
|
||||
* of a new connection and returns {@code null}.</p>
|
||||
*
|
||||
* @return an available connection, or null
|
||||
*/
|
||||
Connection acquire();
|
||||
|
||||
/**
|
||||
* <p>Returns the given connection, previously obtained via {@link #acquire()},
|
||||
* back to this ConnectionPool.</p>
|
||||
*
|
||||
* @param connection the connection to release
|
||||
* @return true if the connection has been released, false if the connection
|
||||
* was not obtained from the this ConnectionPool
|
||||
*/
|
||||
boolean release(Connection connection);
|
||||
|
||||
/**
|
||||
* <p>Removes the given connection from this ConnectionPool.</p>
|
||||
*
|
||||
* @param connection the connection to remove
|
||||
* @return true if the connection was removed from this ConnectionPool
|
||||
*/
|
||||
boolean remove(Connection connection);
|
||||
|
||||
/**
|
||||
* Closes this ConnectionPool.
|
||||
*
|
||||
* @see #isClosed()
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
|
||||
/**
|
||||
* Factory for ConnectionPool instances.
|
||||
*/
|
||||
interface Factory
|
||||
{
|
||||
/**
|
||||
* Creates a new ConnectionPool for the given destination.
|
||||
*
|
||||
* @param destination the destination to create the ConnectionPool for
|
||||
* @return the newly created ConnectionPool
|
||||
*/
|
||||
ConnectionPool newConnectionPool(HttpDestination destination);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,4 +68,14 @@ public interface HttpClientTransport extends ClientConnectionFactory
|
|||
* @param context the context information to establish the connection
|
||||
*/
|
||||
public void connect(InetSocketAddress address, Map<String, Object> context);
|
||||
|
||||
/**
|
||||
* @return the factory for ConnectionPool instances
|
||||
*/
|
||||
public ConnectionPool.Factory getConnectionPoolFactory();
|
||||
|
||||
/**
|
||||
* @param factory the factory for ConnectionPool instances
|
||||
*/
|
||||
public void setConnectionPoolFactory(ConnectionPool.Factory factory);
|
||||
}
|
||||
|
|
|
@ -114,7 +114,10 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
removeBean(connectionPool);
|
||||
}
|
||||
|
||||
protected abstract ConnectionPool newConnectionPool(HttpClient client);
|
||||
protected ConnectionPool newConnectionPool(HttpClient client)
|
||||
{
|
||||
return client.getTransport().getConnectionPoolFactory().newConnectionPool(this);
|
||||
}
|
||||
|
||||
protected Queue<HttpExchange> newExchangeQueue(HttpClient client)
|
||||
{
|
||||
|
|
|
@ -25,12 +25,6 @@ public abstract class MultiplexHttpDestination extends HttpDestination
|
|||
super(client, origin);
|
||||
}
|
||||
|
||||
protected ConnectionPool newConnectionPool(HttpClient client)
|
||||
{
|
||||
return new MultiplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this,
|
||||
client.getMaxRequestsQueuedPerDestination());
|
||||
}
|
||||
|
||||
public int getMaxRequestsPerConnection()
|
||||
{
|
||||
ConnectionPool connectionPool = getConnectionPool();
|
||||
|
|
|
@ -24,9 +24,4 @@ public abstract class PoolingHttpDestination extends HttpDestination
|
|||
{
|
||||
super(client, origin);
|
||||
}
|
||||
|
||||
protected ConnectionPool newConnectionPool(HttpClient client)
|
||||
{
|
||||
return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,8 @@ package org.eclipse.jetty.client.http;
|
|||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jetty.client.AbstractHttpClientTransport;
|
||||
import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
|
||||
import org.eclipse.jetty.client.DuplexConnectionPool;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
import org.eclipse.jetty.client.Origin;
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
|
@ -30,7 +31,7 @@ import org.eclipse.jetty.util.Promise;
|
|||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
|
||||
@ManagedObject("The HTTP/1.1 client transport")
|
||||
public class HttpClientTransportOverHTTP extends AbstractHttpClientTransport
|
||||
public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTransport
|
||||
{
|
||||
public HttpClientTransportOverHTTP()
|
||||
{
|
||||
|
@ -40,6 +41,7 @@ public class HttpClientTransportOverHTTP extends AbstractHttpClientTransport
|
|||
public HttpClientTransportOverHTTP(int selectors)
|
||||
{
|
||||
super(selectors);
|
||||
setConnectionPoolFactory(destination -> new DuplexConnectionPool(destination, getHttpClient().getMaxConnectionsPerDestination(), destination));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -29,7 +29,6 @@ import javax.servlet.http.HttpServletResponse;
|
|||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
||||
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
|
||||
import org.eclipse.jetty.client.util.FutureResponseListener;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
|
@ -50,7 +49,11 @@ public class ValidatingConnectionPoolTest extends AbstractHttpClientServerTest
|
|||
@Override
|
||||
protected void startClient() throws Exception
|
||||
{
|
||||
startClient(new ValidatingHttpClientTransportOverHTTP(1000));
|
||||
long timeout = 1000;
|
||||
HttpClientTransportOverHTTP transport = new HttpClientTransportOverHTTP(1);
|
||||
transport.setConnectionPoolFactory(destination ->
|
||||
new ValidatingConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, destination.getHttpClient().getScheduler(), timeout));
|
||||
startClient(transport);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -177,28 +180,4 @@ public class ValidatingConnectionPoolTest extends AbstractHttpClientServerTest
|
|||
ContentResponse response2 = listener2.get(5, TimeUnit.SECONDS);
|
||||
Assert.assertEquals(200, response2.getStatus());
|
||||
}
|
||||
|
||||
private static class ValidatingHttpClientTransportOverHTTP extends HttpClientTransportOverHTTP
|
||||
{
|
||||
private final long timeout;
|
||||
|
||||
public ValidatingHttpClientTransportOverHTTP(long timeout)
|
||||
{
|
||||
super(1);
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpDestination newHttpDestination(Origin origin)
|
||||
{
|
||||
return new HttpDestinationOverHTTP(getHttpClient(), origin)
|
||||
{
|
||||
@Override
|
||||
protected DuplexConnectionPool newConnectionPool(HttpClient client)
|
||||
{
|
||||
return new ValidatingConnectionPool(this, client.getMaxConnectionsPerDestination(), this, client.getScheduler(), timeout);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,8 +21,11 @@ package org.eclipse.jetty.fcgi.client.http;
|
|||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jetty.client.AbstractHttpClientTransport;
|
||||
import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
|
||||
import org.eclipse.jetty.client.DuplexConnectionPool;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
import org.eclipse.jetty.client.MultiplexConnectionPool;
|
||||
import org.eclipse.jetty.client.Origin;
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
|
@ -34,7 +37,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
|||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
|
||||
@ManagedObject("The FastCGI/1.0 client transport")
|
||||
public class HttpClientTransportOverFCGI extends AbstractHttpClientTransport
|
||||
public class HttpClientTransportOverFCGI extends AbstractConnectorHttpClientTransport
|
||||
{
|
||||
private final boolean multiplexed;
|
||||
private final String scriptRoot;
|
||||
|
@ -49,6 +52,14 @@ public class HttpClientTransportOverFCGI extends AbstractHttpClientTransport
|
|||
super(selectors);
|
||||
this.multiplexed = multiplexed;
|
||||
this.scriptRoot = scriptRoot;
|
||||
setConnectionPoolFactory(destination ->
|
||||
{
|
||||
HttpClient httpClient = getHttpClient();
|
||||
int maxConnections = httpClient.getMaxConnectionsPerDestination();
|
||||
return isMultiplexed() ?
|
||||
new MultiplexConnectionPool(destination, maxConnections, destination, httpClient.getMaxRequestsQueuedPerDestination()) :
|
||||
new DuplexConnectionPool(destination, maxConnections, destination);
|
||||
});
|
||||
}
|
||||
|
||||
public boolean isMultiplexed()
|
||||
|
|
|
@ -20,13 +20,10 @@ package org.eclipse.jetty.fcgi.server;
|
|||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.eclipse.jetty.client.DuplexConnectionPool;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
import org.eclipse.jetty.client.HttpClientTransport;
|
||||
import org.eclipse.jetty.client.LeakTrackingConnectionPool;
|
||||
import org.eclipse.jetty.client.Origin;
|
||||
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
|
||||
import org.eclipse.jetty.fcgi.client.http.HttpDestinationOverFCGI;
|
||||
import org.eclipse.jetty.http.HttpScheme;
|
||||
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
|
||||
import org.eclipse.jetty.io.MappedByteBufferPool;
|
||||
|
@ -72,28 +69,16 @@ public abstract class AbstractHttpClientServerTest
|
|||
QueuedThreadPool executor = new QueuedThreadPool();
|
||||
executor.setName(executor.getName() + "-client");
|
||||
|
||||
client = new HttpClient(new HttpClientTransportOverFCGI(1, false, "")
|
||||
HttpClientTransport transport = new HttpClientTransportOverFCGI(1, false, "");
|
||||
transport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
|
||||
{
|
||||
@Override
|
||||
public HttpDestination newHttpDestination(Origin origin)
|
||||
protected void leaked(LeakDetector.LeakInfo leakInfo)
|
||||
{
|
||||
return new HttpDestinationOverFCGI(client, origin)
|
||||
{
|
||||
@Override
|
||||
protected DuplexConnectionPool newConnectionPool(HttpClient client)
|
||||
{
|
||||
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
|
||||
{
|
||||
@Override
|
||||
protected void leaked(LeakDetector.LeakInfo leakInfo)
|
||||
{
|
||||
connectionLeaks.incrementAndGet();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
connectionLeaks.incrementAndGet();
|
||||
}
|
||||
}, null);
|
||||
});
|
||||
client = new HttpClient(transport, null);
|
||||
client.setExecutor(executor);
|
||||
clientBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged());
|
||||
client.setByteBufferPool(clientBufferPool);
|
||||
|
|
|
@ -24,9 +24,10 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
|
||||
import org.eclipse.jetty.client.AbstractHttpClientTransport;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpClientTransport;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
import org.eclipse.jetty.client.MultiplexConnectionPool;
|
||||
import org.eclipse.jetty.client.Origin;
|
||||
import org.eclipse.jetty.client.ProxyConfiguration;
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
|
@ -42,20 +43,23 @@ import org.eclipse.jetty.io.EndPoint;
|
|||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
|
||||
@ManagedObject("The HTTP/2 client transport")
|
||||
public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements HttpClientTransport
|
||||
public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
|
||||
{
|
||||
private final HTTP2Client client;
|
||||
private ClientConnectionFactory connectionFactory;
|
||||
private HttpClient httpClient;
|
||||
private boolean useALPN = true;
|
||||
|
||||
public HttpClientTransportOverHTTP2(HTTP2Client client)
|
||||
{
|
||||
this.client = client;
|
||||
setConnectionPoolFactory(destination ->
|
||||
{
|
||||
HttpClient httpClient = getHttpClient();
|
||||
return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, httpClient.getMaxRequestsQueuedPerDestination());
|
||||
});
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "The number of selectors", readonly = true)
|
||||
|
@ -79,6 +83,7 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
|
|||
{
|
||||
if (!client.isStarted())
|
||||
{
|
||||
HttpClient httpClient = getHttpClient();
|
||||
client.setExecutor(httpClient.getExecutor());
|
||||
client.setScheduler(httpClient.getScheduler());
|
||||
client.setByteBufferPool(httpClient.getByteBufferPool());
|
||||
|
@ -104,34 +109,23 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
|
|||
removeBean(client);
|
||||
}
|
||||
|
||||
protected HttpClient getHttpClient()
|
||||
{
|
||||
return httpClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setHttpClient(HttpClient client)
|
||||
{
|
||||
httpClient = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpDestination newHttpDestination(Origin origin)
|
||||
{
|
||||
return new HttpDestinationOverHTTP2(httpClient, origin);
|
||||
return new HttpDestinationOverHTTP2(getHttpClient(), origin);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect(InetSocketAddress address, Map<String, Object> context)
|
||||
{
|
||||
client.setConnectTimeout(httpClient.getConnectTimeout());
|
||||
client.setConnectTimeout(getHttpClient().getConnectTimeout());
|
||||
|
||||
SessionListenerPromise listenerPromise = new SessionListenerPromise(context);
|
||||
|
||||
HttpDestinationOverHTTP2 destination = (HttpDestinationOverHTTP2)context.get(HTTP_DESTINATION_CONTEXT_KEY);
|
||||
SslContextFactory sslContextFactory = null;
|
||||
if (HttpScheme.HTTPS.is(destination.getScheme()))
|
||||
sslContextFactory = httpClient.getSslContextFactory();
|
||||
sslContextFactory = getHttpClient().getSslContextFactory();
|
||||
|
||||
client.connect(sslContextFactory, address, listenerPromise, listenerPromise, context);
|
||||
}
|
||||
|
@ -139,7 +133,7 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
|
|||
@Override
|
||||
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
|
||||
{
|
||||
endPoint.setIdleTimeout(httpClient.getIdleTimeout());
|
||||
endPoint.setIdleTimeout(getHttpClient().getIdleTimeout());
|
||||
|
||||
ClientConnectionFactory factory = connectionFactory;
|
||||
HttpDestinationOverHTTP2 destination = (HttpDestinationOverHTTP2)context.get(HTTP_DESTINATION_CONTEXT_KEY);
|
||||
|
|
|
@ -34,20 +34,14 @@ import javax.servlet.ServletException;
|
|||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.ConnectionPool;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpClientTransport;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
import org.eclipse.jetty.client.LeakTrackingConnectionPool;
|
||||
import org.eclipse.jetty.client.Origin;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
||||
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
|
||||
import org.eclipse.jetty.client.util.BytesContentProvider;
|
||||
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
|
||||
import org.eclipse.jetty.fcgi.client.http.HttpDestinationOverFCGI;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.io.ArrayByteBufferPool;
|
||||
|
@ -97,55 +91,31 @@ public class HttpClientLoadTest extends AbstractTest
|
|||
case HTTP:
|
||||
case HTTPS:
|
||||
{
|
||||
return new HttpClientTransportOverHTTP(1)
|
||||
HttpClientTransport clientTransport = new HttpClientTransportOverHTTP(1);
|
||||
clientTransport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
|
||||
{
|
||||
@Override
|
||||
public HttpDestination newHttpDestination(Origin origin)
|
||||
protected void leaked(LeakDetector.LeakInfo leakInfo)
|
||||
{
|
||||
return new HttpDestinationOverHTTP(getHttpClient(), origin)
|
||||
{
|
||||
@Override
|
||||
protected ConnectionPool newConnectionPool(HttpClient client)
|
||||
{
|
||||
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
|
||||
{
|
||||
@Override
|
||||
protected void leaked(LeakDetector.LeakInfo leakInfo)
|
||||
{
|
||||
super.leaked(leakInfo);
|
||||
connectionLeaks.incrementAndGet();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
super.leaked(leakInfo);
|
||||
connectionLeaks.incrementAndGet();
|
||||
}
|
||||
};
|
||||
});
|
||||
return clientTransport;
|
||||
}
|
||||
case FCGI:
|
||||
{
|
||||
return new HttpClientTransportOverFCGI(1, false, "")
|
||||
HttpClientTransport clientTransport = new HttpClientTransportOverFCGI(1, false, "");
|
||||
clientTransport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
|
||||
{
|
||||
@Override
|
||||
public HttpDestination newHttpDestination(Origin origin)
|
||||
protected void leaked(LeakDetector.LeakInfo leakInfo)
|
||||
{
|
||||
return new HttpDestinationOverFCGI(getHttpClient(), origin)
|
||||
{
|
||||
@Override
|
||||
protected ConnectionPool newConnectionPool(HttpClient client)
|
||||
{
|
||||
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
|
||||
{
|
||||
@Override
|
||||
protected void leaked(LeakDetector.LeakInfo leakInfo)
|
||||
{
|
||||
super.leaked(leakInfo);
|
||||
connectionLeaks.incrementAndGet();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
super.leaked(leakInfo);
|
||||
connectionLeaks.incrementAndGet();
|
||||
}
|
||||
};
|
||||
});
|
||||
return clientTransport;
|
||||
}
|
||||
default:
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue