Fixes #542 - Support Connection.Listener bean on clients.
Introduced ClientConnectionFactory.customize() to look for Connection.Listener beans. ClientConnectionFactory implementation calls customize() when they create a Connection instance, so the Connection.Listener beans are registered onto the Connection.
This commit is contained in:
parent
f6098497c0
commit
f3a805887e
|
@ -48,7 +48,8 @@ public class ALPNClientConnectionFactory extends NegotiatingClientConnectionFact
|
||||||
@Override
|
@Override
|
||||||
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
|
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
|
||||||
{
|
{
|
||||||
return new ALPNClientConnection(endPoint, executor, getClientConnectionFactory(),
|
ALPNClientConnection connection = new ALPNClientConnection(endPoint, executor, getClientConnectionFactory(),
|
||||||
(SSLEngine)context.get(SslClientConnectionFactory.SSL_ENGINE_CONTEXT_KEY), context, protocols);
|
(SSLEngine)context.get(SslClientConnectionFactory.SSL_ENGINE_CONTEXT_KEY), context, protocols);
|
||||||
|
return customize(connection, context);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,8 @@ public class Socks4Proxy extends ProxyConfiguration.Proxy
|
||||||
{
|
{
|
||||||
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
|
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
|
||||||
Executor executor = destination.getHttpClient().getExecutor();
|
Executor executor = destination.getHttpClient().getExecutor();
|
||||||
return new Socks4ProxyConnection(endPoint, executor, connectionFactory, context);
|
Socks4ProxyConnection connection = new Socks4ProxyConnection(endPoint, executor, connectionFactory, context);
|
||||||
|
return customize(connection, context);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class HttpClientTransportOverHTTP extends AbstractHttpClientTransport
|
||||||
HttpConnectionOverHTTP connection = newHttpConnection(endPoint, destination, promise);
|
HttpConnectionOverHTTP connection = newHttpConnection(endPoint, destination, promise);
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Created {}", connection);
|
LOG.debug("Created {}", connection);
|
||||||
return connection;
|
return customize(connection, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
|
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
|
||||||
|
|
|
@ -139,7 +139,8 @@ public class HttpClientCustomProxyTest
|
||||||
{
|
{
|
||||||
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
|
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
|
||||||
Executor executor = destination.getHttpClient().getExecutor();
|
Executor executor = destination.getHttpClient().getExecutor();
|
||||||
return new CAFEBABEConnection(endPoint, executor, connectionFactory, context);
|
CAFEBABEConnection connection = new CAFEBABEConnection(endPoint, executor, connectionFactory, context);
|
||||||
|
return customize(connection, context);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -78,7 +78,7 @@ public class HttpClientTransportOverFCGI extends AbstractHttpClientTransport
|
||||||
HttpConnectionOverFCGI connection = newHttpConnection(endPoint, destination, promise);
|
HttpConnectionOverFCGI connection = newHttpConnection(endPoint, destination, promise);
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Created {}", connection);
|
LOG.debug("Created {}", connection);
|
||||||
return connection;
|
return customize(connection, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected HttpConnectionOverFCGI newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
|
protected HttpConnectionOverFCGI newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
||||||
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
|
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
|
||||||
parser, session, client.getInputBufferSize(), client.getExecutionStrategyFactory(), promise, listener);
|
parser, session, client.getInputBufferSize(), client.getExecutionStrategyFactory(), promise, listener);
|
||||||
connection.addListener(connectionListener);
|
connection.addListener(connectionListener);
|
||||||
return connection;
|
return customize(connection, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -21,6 +21,8 @@ package org.eclipse.jetty.io;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory for client-side {@link Connection} instances.
|
* Factory for client-side {@link Connection} instances.
|
||||||
*/
|
*/
|
||||||
|
@ -36,4 +38,11 @@ public interface ClientConnectionFactory
|
||||||
* @throws IOException if the connection cannot be created
|
* @throws IOException if the connection cannot be created
|
||||||
*/
|
*/
|
||||||
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException;
|
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException;
|
||||||
|
|
||||||
|
public default Connection customize(Connection connection, Map<String, Object> context)
|
||||||
|
{
|
||||||
|
ContainerLifeCycle connector = (ContainerLifeCycle)context.get(CONNECTOR_CONTEXT_KEY);
|
||||||
|
connector.getBeans(Connection.Listener.class).forEach(connection::addListener);
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import javax.net.ssl.SSLEngine;
|
||||||
|
|
||||||
import org.eclipse.jetty.io.ByteBufferPool;
|
import org.eclipse.jetty.io.ByteBufferPool;
|
||||||
import org.eclipse.jetty.io.ClientConnectionFactory;
|
import org.eclipse.jetty.io.ClientConnectionFactory;
|
||||||
|
import org.eclipse.jetty.io.Connection;
|
||||||
import org.eclipse.jetty.io.EndPoint;
|
import org.eclipse.jetty.io.EndPoint;
|
||||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||||
|
@ -60,11 +61,9 @@ public class SslClientConnectionFactory implements ClientConnectionFactory
|
||||||
context.put(SSL_ENGINE_CONTEXT_KEY, engine);
|
context.put(SSL_ENGINE_CONTEXT_KEY, engine);
|
||||||
|
|
||||||
SslConnection sslConnection = newSslConnection(byteBufferPool, executor, endPoint, engine);
|
SslConnection sslConnection = newSslConnection(byteBufferPool, executor, endPoint, engine);
|
||||||
sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed());
|
|
||||||
endPoint.setConnection(sslConnection);
|
endPoint.setConnection(sslConnection);
|
||||||
|
|
||||||
ContainerLifeCycle connector = (ContainerLifeCycle)context.get(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY);
|
customize(sslConnection, context);
|
||||||
connector.getBeans(SslHandshakeListener.class).forEach(sslConnection::addHandshakeListener);
|
|
||||||
|
|
||||||
EndPoint appEndPoint = sslConnection.getDecryptedEndPoint();
|
EndPoint appEndPoint = sslConnection.getDecryptedEndPoint();
|
||||||
appEndPoint.setConnection(connectionFactory.newConnection(appEndPoint, context));
|
appEndPoint.setConnection(connectionFactory.newConnection(appEndPoint, context));
|
||||||
|
@ -76,4 +75,17 @@ public class SslClientConnectionFactory implements ClientConnectionFactory
|
||||||
{
|
{
|
||||||
return new SslConnection(byteBufferPool, executor, endPoint, engine);
|
return new SslConnection(byteBufferPool, executor, endPoint, engine);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Connection customize(Connection connection, Map<String, Object> context)
|
||||||
|
{
|
||||||
|
if (connection instanceof SslConnection)
|
||||||
|
{
|
||||||
|
SslConnection sslConnection = (SslConnection)connection;
|
||||||
|
sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed());
|
||||||
|
ContainerLifeCycle connector = (ContainerLifeCycle)context.get(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY);
|
||||||
|
connector.getBeans(SslHandshakeListener.class).forEach(sslConnection::addHandshakeListener);
|
||||||
|
}
|
||||||
|
return ClientConnectionFactory.super.customize(connection, context);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -407,6 +407,50 @@ public class HttpClientTest extends AbstractTest
|
||||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConnectionListener() throws Exception
|
||||||
|
{
|
||||||
|
start(new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
baseRequest.setHandled(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch openLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch closeLatch = new CountDownLatch(1);
|
||||||
|
client.addBean(new org.eclipse.jetty.io.Connection.Listener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onOpened(org.eclipse.jetty.io.Connection connection)
|
||||||
|
{
|
||||||
|
openLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClosed(org.eclipse.jetty.io.Connection connection)
|
||||||
|
{
|
||||||
|
closeLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
long idleTimeout = 1000;
|
||||||
|
client.setIdleTimeout(idleTimeout);
|
||||||
|
|
||||||
|
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||||
|
.scheme(getScheme())
|
||||||
|
.timeout(5, TimeUnit.SECONDS)
|
||||||
|
.send();
|
||||||
|
|
||||||
|
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||||
|
Assert.assertTrue(openLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Thread.sleep(2 * idleTimeout);
|
||||||
|
Assert.assertTrue(closeLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
private void sleep(long time) throws IOException
|
private void sleep(long time) throws IOException
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
|
Loading…
Reference in New Issue