Issue #132 - ClientConnector abstraction.

Introduced ClientConnector and refactored HttpClient transports,
removing duplicated code that was connect() to a remote host.

Refactored also HTTP2Client to reference ClientConnector.

Refactored tests accordingly to the changes introduced in the
implementations.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-01-17 10:43:03 +01:00
parent 2f6d0b3fb7
commit 6c4ee083be
36 changed files with 795 additions and 770 deletions

View File

@ -27,13 +27,9 @@ import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.NegotiatingClientConnection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class ALPNClientConnection extends NegotiatingClientConnection
{
private static final Logger LOG = Log.getLogger(ALPNClientConnection.class);
private final List<String> protocols;
public ALPNClientConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, SSLEngine sslEngine, Map<String, Object> context, List<String> protocols)
@ -49,9 +45,9 @@ public class ALPNClientConnection extends NegotiatingClientConnection
public void selected(String protocol)
{
if (protocol==null || !protocols.contains(protocol))
if (protocol == null || !protocols.contains(protocol))
close();
else
super.completed();
completed();
}
}

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.alpn.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -96,7 +95,7 @@ public class ALPNClientConnectionFactory extends NegotiatingClientConnectionFact
}
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
SSLEngine engine = (SSLEngine)context.get(SslClientConnectionFactory.SSL_ENGINE_CONTEXT_KEY);
for (Client processor : processors)

View File

@ -42,6 +42,7 @@
<artifactId>jetty-alpn-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-client</artifactId>

View File

@ -35,22 +35,19 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class JDK9HTTP2Client
{
public static void main(String[] args) throws Exception
{
HTTP2Client client = new HTTP2Client();
SslContextFactory sslContextFactory = new SslContextFactory();
client.addBean(sslContextFactory);
client.start();
String host = "webtide.com";
int port = 443;
FuturePromise<Session> sessionPromise = new FuturePromise<>();
client.connect(sslContextFactory, new InetSocketAddress(host, port), new Session.Listener.Adapter(), sessionPromise);
client.connect(client.getClientConnector().getSslContextFactory(), new InetSocketAddress(host, port), new Session.Listener.Adapter(), sessionPromise);
Session session = sessionPromise.get(5, TimeUnit.SECONDS);
HttpFields requestFields = new HttpFields();

View File

@ -18,11 +18,7 @@
package org.eclipse.jetty.alpn.java.server;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
@ -31,7 +27,6 @@ import java.nio.charset.StandardCharsets;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -45,8 +40,12 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
public class JDK9ALPNTest
{
private Server server;
@ -66,6 +65,13 @@ public class JDK9ALPNTest
server.start();
}
@AfterEach
public void stopServer() throws Exception
{
if (server != null)
server.stop();
}
private SslContextFactory newSslContextFactory()
{
SslContextFactory sslContextFactory = new SslContextFactory();
@ -84,7 +90,7 @@ public class JDK9ALPNTest
startServer(new AbstractHandler.ErrorDispatchHandler()
{
@Override
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
}
@ -126,7 +132,7 @@ public class JDK9ALPNTest
startServer(new AbstractHandler.ErrorDispatchHandler()
{
@Override
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
}
@ -163,6 +169,5 @@ public class JDK9ALPNTest
break;
}
}
}
}

View File

@ -18,21 +18,12 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.Map;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ -40,147 +31,48 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
@ManagedObject
public abstract class AbstractConnectorHttpClientTransport extends AbstractHttpClientTransport
{
private final int selectors;
private SelectorManager selectorManager;
private final ClientConnector connector;
protected AbstractConnectorHttpClientTransport(int selectors)
protected AbstractConnectorHttpClientTransport(ClientConnector connector)
{
this.selectors = selectors;
this.connector = connector;
addBean(connector);
}
public ClientConnector getClientConnector()
{
return connector;
}
@ManagedAttribute(value = "The number of selectors", readonly = true)
public int getSelectors()
{
return selectors;
return connector.getSelectors();
}
@Override
protected void doStart() throws Exception
{
HttpClient httpClient = getHttpClient();
selectorManager = newSelectorManager(httpClient);
selectorManager.setConnectTimeout(httpClient.getConnectTimeout());
addBean(selectorManager);
connector.setBindAddress(httpClient.getBindAddress());
connector.setByteBufferPool(httpClient.getByteBufferPool());
connector.setConnectBlocking(httpClient.isConnectBlocking());
connector.setConnectTimeout(Duration.ofMillis(httpClient.getConnectTimeout()));
connector.setExecutor(httpClient.getExecutor());
connector.setIdleTimeout(Duration.ofMillis(httpClient.getIdleTimeout()));
connector.setScheduler(httpClient.getScheduler());
connector.setSslContextFactory(httpClient.getSslContextFactory());
super.doStart();
}
@Override
protected void doStop() throws Exception
{
super.doStop();
removeBean(selectorManager);
}
@Override
public void connect(InetSocketAddress address, Map<String, Object> context)
{
SocketChannel channel = null;
try
{
channel = SocketChannel.open();
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
SocketAddress bindAddress = client.getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
configure(client, channel);
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());
boolean connected = true;
if (client.isConnectBlocking())
{
channel.socket().connect(address, (int)client.getConnectTimeout());
channel.configureBlocking(false);
}
else
{
channel.configureBlocking(false);
connected = channel.connect(address);
}
if (connected)
selectorManager.accept(channel, context);
else
selectorManager.connect(channel, context);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
// If IPv6 is not deployed, a generic SocketException "Network is unreachable"
// exception is being thrown, so we attempt to provide a better error message.
if (x.getClass() == SocketException.class)
x = new SocketException("Could not connect to " + address).initCause(x);
try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
finally
{
connectFailed(context, x);
}
}
}
protected void connectFailed(Map<String, Object> context, Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
context.put(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY, destination.getClientConnectionFactory());
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(x);
}
protected void configure(HttpClient client, SocketChannel channel) throws IOException
{
channel.socket().setTcpNoDelay(client.isTCPNoDelay());
}
protected SelectorManager newSelectorManager(HttpClient client)
{
return new ClientSelectorManager(client, getSelectors());
}
protected class ClientSelectorManager extends SelectorManager
{
private final HttpClient client;
protected ClientSelectorManager(HttpClient client, int selectors)
{
super(client.getExecutor(), client.getScheduler(), selectors);
this.client = client;
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
endp.setIdleTimeout(client.getIdleTimeout());
return endp;
}
@Override
public org.eclipse.jetty.io.Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
}
@Override
protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
connectFailed(context, x);
}
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise));
connector.connect(address, context);
}
}

View File

@ -18,6 +18,10 @@
package org.eclipse.jetty.client;
import java.util.Map;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
@ -53,4 +57,13 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
{
this.factory = factory;
}
protected void connectFailed(Map<String, Object> context, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(failure);
}
}

View File

@ -81,16 +81,16 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.ThreadPool;
/**
* <p>{@link HttpClient} provides an efficient, asynchronous, non-blocking implementation
* <p>HttpClient provides an efficient, asynchronous, non-blocking implementation
* to perform HTTP requests to a server through a simple API that offers also blocking semantic.</p>
* <p>{@link HttpClient} provides easy-to-use methods such as {@link #GET(String)} that allow to perform HTTP
* <p>HttpClient provides easy-to-use methods such as {@link #GET(String)} that allow to perform HTTP
* requests in a one-liner, but also gives the ability to fine tune the configuration of requests via
* {@link HttpClient#newRequest(URI)}.</p>
* <p>{@link HttpClient} acts as a central configuration point for network parameters (such as idle timeouts)
* <p>HttpClient acts as a central configuration point for network parameters (such as idle timeouts)
* and HTTP parameters (such as whether to follow redirects).</p>
* <p>{@link HttpClient} transparently pools connections to servers, but allows direct control of connections
* <p>HttpClient transparently pools connections to servers, but allows direct control of connections
* for cases where this is needed.</p>
* <p>{@link HttpClient} also acts as a central configuration point for cookies, via {@link #getCookieStore()}.</p>
* <p>HttpClient also acts as a central configuration point for cookies, via {@link #getCookieStore()}.</p>
* <p>Typical usage:</p>
* <pre>
* HttpClient httpClient = new HttpClient();
@ -157,7 +157,7 @@ public class HttpClient extends ContainerLifeCycle
private String defaultRequestContentType = "application/octet-stream";
/**
* Creates a {@link HttpClient} instance that can perform requests to non-TLS destinations only
* Creates a HttpClient instance that can perform requests to non-TLS destinations only
* (that is, requests with the "http" scheme only, and not "https").
*
* @see #HttpClient(SslContextFactory) to perform requests to TLS destinations.
@ -168,7 +168,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* Creates a {@link HttpClient} instance that can perform requests to non-TLS and TLS destinations
* Creates a HttpClient instance that can perform requests to non-TLS and TLS destinations
* (that is, both requests with the "http" scheme and with the "https" scheme).
*
* @param sslContextFactory the {@link SslContextFactory} that manages TLS encryption
@ -517,7 +517,7 @@ public class HttpClient extends ContainerLifeCycle
/**
* Returns a {@link Destination} for the given scheme, host and port.
* Applications may use {@link Destination}s to create {@link Connection}s
* that will be outside {@link HttpClient}'s pooling mechanism, to explicitly
* that will be outside HttpClient's pooling mechanism, to explicitly
* control the connection lifecycle (in particular their termination with
* {@link Connection#close()}).
*
@ -570,7 +570,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @return the list of destinations known to this {@link HttpClient}.
* @return the list of destinations known to this HttpClient.
*/
public List<Destination> getDestinations()
{
@ -586,13 +586,13 @@ public class HttpClient extends ContainerLifeCycle
protected void newConnection(final HttpDestination destination, final Promise<Connection> promise)
{
Origin.Address address = destination.getConnectAddress();
resolver.resolve(address.getHost(), address.getPort(), new Promise<List<InetSocketAddress>>()
resolver.resolve(address.getHost(), address.getPort(), new Promise<>()
{
@Override
public void succeeded(List<InetSocketAddress> socketAddresses)
{
Map<String, Object> context = new HashMap<>();
context.put(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY, HttpClient.this);
context.put(ClientConnectionFactory.CLIENT_CONTEXT_KEY, HttpClient.this);
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination);
connect(socketAddresses, 0, context);
}
@ -605,7 +605,7 @@ public class HttpClient extends ContainerLifeCycle
private void connect(List<InetSocketAddress> socketAddresses, int index, Map<String, Object> context)
{
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<Connection>(promise)
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise)
{
@Override
public void failed(Throwable x)
@ -638,7 +638,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @return the {@link ByteBufferPool} of this {@link HttpClient}
* @return the {@link ByteBufferPool} of this HttpClient
*/
public ByteBufferPool getByteBufferPool()
{
@ -646,7 +646,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @param byteBufferPool the {@link ByteBufferPool} of this {@link HttpClient}
* @param byteBufferPool the {@link ByteBufferPool} of this HttpClient
*/
public void setByteBufferPool(ByteBufferPool byteBufferPool)
{
@ -706,7 +706,7 @@ public class HttpClient extends ContainerLifeCycle
/**
* <p>Sets the socket address resolution timeout used by the default {@link SocketAddressResolver}
* created by this {@link HttpClient} at startup.</p>
* created by this HttpClient at startup.</p>
* <p>For more fine tuned configuration of socket address resolution, see
* {@link #setSocketAddressResolver(SocketAddressResolver)}.</p>
*
@ -755,7 +755,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @return the "User-Agent" HTTP field of this {@link HttpClient}
* @return the "User-Agent" HTTP field of this HttpClient
*/
public HttpField getUserAgentField()
{
@ -763,7 +763,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @param agent the "User-Agent" HTTP header string of this {@link HttpClient}
* @param agent the "User-Agent" HTTP header string of this HttpClient
*/
public void setUserAgentField(HttpField agent)
{
@ -773,7 +773,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @return whether this {@link HttpClient} follows HTTP redirects
* @return whether this HttpClient follows HTTP redirects
* @see Request#isFollowRedirects()
*/
@ManagedAttribute("Whether HTTP redirects are followed")
@ -783,7 +783,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @param follow whether this {@link HttpClient} follows HTTP redirects
* @param follow whether this HttpClient follows HTTP redirects
* @see #setMaxRedirects(int)
*/
public void setFollowRedirects(boolean follow)
@ -792,7 +792,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @return the {@link Executor} of this {@link HttpClient}
* @return the {@link Executor} of this HttpClient
*/
public Executor getExecutor()
{
@ -800,7 +800,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @param executor the {@link Executor} of this {@link HttpClient}
* @param executor the {@link Executor} of this HttpClient
*/
public void setExecutor(Executor executor)
{
@ -811,7 +811,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @return the {@link Scheduler} of this {@link HttpClient}
* @return the {@link Scheduler} of this HttpClient
*/
public Scheduler getScheduler()
{
@ -819,7 +819,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @param scheduler the {@link Scheduler} of this {@link HttpClient}
* @param scheduler the {@link Scheduler} of this HttpClient
*/
public void setScheduler(Scheduler scheduler)
{
@ -830,7 +830,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @return the {@link SocketAddressResolver} of this {@link HttpClient}
* @return the {@link SocketAddressResolver} of this HttpClient
*/
public SocketAddressResolver getSocketAddressResolver()
{
@ -838,7 +838,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @param resolver the {@link SocketAddressResolver} of this {@link HttpClient}
* @param resolver the {@link SocketAddressResolver} of this HttpClient
*/
public void setSocketAddressResolver(SocketAddressResolver resolver)
{
@ -849,7 +849,7 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @return the max number of connections that this {@link HttpClient} opens to {@link Destination}s
* @return the max number of connections that this HttpClient opens to {@link Destination}s
*/
@ManagedAttribute("The max number of connections per each destination")
public int getMaxConnectionsPerDestination()
@ -862,11 +862,11 @@ public class HttpClient extends ContainerLifeCycle
* <p>
* RFC 2616 suggests that 2 connections should be opened per each destination,
* but browsers commonly open 6.
* If this {@link HttpClient} is used for load testing, it is common to have only one destination
* If this HttpClient is used for load testing, it is common to have only one destination
* (the server to load test), and it is recommended to set this value to a high value (at least as
* much as the threads present in the {@link #getExecutor() executor}).
*
* @param maxConnectionsPerDestination the max number of connections that this {@link HttpClient} opens to {@link Destination}s
* @param maxConnectionsPerDestination the max number of connections that this HttpClient opens to {@link Destination}s
*/
public void setMaxConnectionsPerDestination(int maxConnectionsPerDestination)
{
@ -885,11 +885,11 @@ public class HttpClient extends ContainerLifeCycle
/**
* Sets the max number of requests that may be queued to a destination.
* <p>
* If this {@link HttpClient} performs a high rate of requests to a destination,
* If this HttpClient performs a high rate of requests to a destination,
* and all the connections managed by that destination are busy with other requests,
* then new requests will be queued up in the destination.
* This parameter controls how many requests can be queued before starting to reject them.
* If this {@link HttpClient} is used for load testing, it is common to have this parameter
* If this HttpClient is used for load testing, it is common to have this parameter
* set to a high value, although this may impact latency (requests sit in the queue for a long
* time before being sent).
*
@ -970,35 +970,6 @@ public class HttpClient extends ContainerLifeCycle
this.tcpNoDelay = tcpNoDelay;
}
/**
* @return true to dispatch I/O operations in a different thread, false to execute them in the selector thread
* @see #setDispatchIO(boolean)
*/
@Deprecated
public boolean isDispatchIO()
{
// TODO this did default to true, so usage needs to be evaluated.
return false;
}
/**
* Whether to dispatch I/O operations from the selector thread to a different thread.
* <p>
* This implementation never blocks on I/O operation, but invokes application callbacks that may
* take time to execute or block on other I/O.
* If application callbacks are known to take time or block on I/O, then parameter {@code dispatchIO}
* should be set to true.
* If application callbacks are known to be quick and never block on I/O, then parameter {@code dispatchIO}
* may be set to false.
*
* @param dispatchIO true to dispatch I/O operations in a different thread,
* false to execute them in the selector thread
*/
@Deprecated
public void setDispatchIO(boolean dispatchIO)
{
}
/**
* Gets the http compliance mode for parsing http responses.
* The default http compliance level is {@link HttpCompliance#RFC7230} which is the latest HTTP/1.1 specification
@ -1256,7 +1227,7 @@ public class HttpClient extends ContainerLifeCycle
public Iterator<ContentDecoder.Factory> iterator()
{
final Iterator<ContentDecoder.Factory> iterator = set.iterator();
return new Iterator<ContentDecoder.Factory>()
return new Iterator<>()
{
@Override
public boolean hasNext()

View File

@ -36,8 +36,8 @@ import org.eclipse.jetty.io.ClientConnectionFactory;
*/
public interface HttpClientTransport extends ClientConnectionFactory
{
public static final String HTTP_DESTINATION_CONTEXT_KEY = "http.destination";
public static final String HTTP_CONNECTION_PROMISE_CONTEXT_KEY = "http.connection.promise";
public static final String HTTP_DESTINATION_CONTEXT_KEY = "org.eclipse.jetty.client.destination";
public static final String HTTP_CONNECTION_PROMISE_CONTEXT_KEY = "org.eclipse.jetty.client.connection.promise";
/**
* Sets the {@link HttpClient} instance on this transport.

View File

@ -48,6 +48,23 @@ public class Origin
return address;
}
@Override
public boolean equals(Object obj)
{
if (this == obj)
return true;
if (obj == null || getClass() != obj.getClass())
return false;
Origin that = (Origin)obj;
return scheme.equals(that.scheme) && address.equals(that.address);
}
@Override
public int hashCode()
{
return Objects.hash(scheme, address);
}
public String asString()
{
StringBuilder result = new StringBuilder();
@ -56,20 +73,9 @@ public class Origin
}
@Override
public boolean equals(Object obj)
public String toString()
{
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
Origin that = (Origin)obj;
return scheme.equals(that.scheme) && address.equals(that.address);
}
@Override
public int hashCode()
{
int result = scheme.hashCode();
result = 31 * result + address.hashCode();
return result;
return asString();
}
public static class Address
@ -96,8 +102,10 @@ public class Origin
@Override
public boolean equals(Object obj)
{
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (this == obj)
return true;
if (obj == null || getClass() != obj.getClass())
return false;
Address that = (Address)obj;
return host.equals(that.host) && port == that.port;
}
@ -105,9 +113,7 @@ public class Origin
@Override
public int hashCode()
{
int result = host.hashCode();
result = 31 * result + port;
return result;
return Objects.hash(host, port);
}
public String asString()

View File

@ -203,16 +203,7 @@ public class ResponseNotifier
public void forwardSuccess(List<Response.ResponseListener> listeners, Response response)
{
notifyBegin(listeners, response);
for (Iterator<HttpField> iterator = response.getHeaders().iterator(); iterator.hasNext();)
{
HttpField field = iterator.next();
if (!notifyHeader(listeners, response, field))
iterator.remove();
}
notifyHeaders(listeners, response);
if (response instanceof ContentResponse)
notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()), Callback.NOOP);
forwardEvents(listeners, response);
notifySuccess(listeners, response);
}
@ -223,9 +214,16 @@ public class ResponseNotifier
}
public void forwardFailure(List<Response.ResponseListener> listeners, Response response, Throwable failure)
{
forwardEvents(listeners, response);
notifyFailure(listeners, response, failure);
}
private void forwardEvents(List<Response.ResponseListener> listeners, Response response)
{
notifyBegin(listeners, response);
for (Iterator<HttpField> iterator = response.getHeaders().iterator(); iterator.hasNext();)
Iterator<HttpField> iterator = response.getHeaders().iterator();
while (iterator.hasNext())
{
HttpField field = iterator.next();
if (!notifyHeader(listeners, response, field))
@ -233,8 +231,11 @@ public class ResponseNotifier
}
notifyHeaders(listeners, response);
if (response instanceof ContentResponse)
notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()), Callback.NOOP);
notifyFailure(listeners, response, failure);
{
byte[] content = ((ContentResponse)response).getContent();
if (content != null && content.length > 0)
notifyContent(listeners, response, ByteBuffer.wrap(content), Callback.NOOP);
}
}
public void forwardFailureComplete(List<Response.ResponseListener> listeners, Request request, Throwable requestFailure, Response response, Throwable responseFailure)

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.Promise;
@ -36,12 +37,18 @@ public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTran
{
public HttpClientTransportOverHTTP()
{
this(Math.max( 1, ProcessorUtils.availableProcessors() / 2));
this(Math.max(1, ProcessorUtils.availableProcessors() / 2));
}
public HttpClientTransportOverHTTP(int selectors)
{
super(selectors);
this(new ClientConnector());
getClientConnector().setSelectors(selectors);
}
public HttpClientTransportOverHTTP(ClientConnector connector)
{
super(connector);
setConnectionPoolFactory(destination -> new DuplexConnectionPool(destination, getHttpClient().getMaxConnectionsPerDestination(), destination));
}

View File

@ -93,7 +93,6 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
return bytesOut.longValue();
}
protected void addBytesOut(long bytesOut)
{
this.bytesOut.add(bytesOut);

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.client;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
@ -49,9 +46,11 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class HttpClientCustomProxyTest
{
public static final byte[] CAFE_BABE = new byte[]{(byte)0xCA, (byte)0xFE, (byte)0xBA, (byte)0xBE};

View File

@ -18,15 +18,6 @@
package org.eclipse.jetty.client;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
@ -53,17 +44,22 @@ import org.eclipse.jetty.io.ssl.SslHandshakeListener;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.JavaVersion;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnJre;
import org.junit.jupiter.api.condition.JRE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpClientTLSTest
{
private Server server;

View File

@ -18,16 +18,6 @@
package org.eclipse.jetty.client;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -106,6 +96,16 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(WorkDirExtension.class)
public class HttpClientTest extends AbstractHttpClientServerTest
{
@ -1716,10 +1716,12 @@ public class HttpClientTest extends AbstractHttpClientServerTest
try (ServerSocket server = new ServerSocket(0))
{
startClient(scenario);
client.setMaxConnectionsPerDestination(1);
int idleTimeout = 2000;
client.setIdleTimeout(idleTimeout);
startClient(scenario, null, httpClient ->
{
httpClient.setMaxConnectionsPerDestination(1);
httpClient.setIdleTimeout(idleTimeout);
});
Request request = client.newRequest("localhost", server.getLocalPort())
.scheme(scenario.getScheme())

View File

@ -18,13 +18,12 @@
package org.eclipse.jetty.client;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class InsufficientThreadsDetectionTest
{
@Test
@ -33,9 +32,7 @@ public class InsufficientThreadsDetectionTest
QueuedThreadPool clientThreads = new QueuedThreadPool(1);
HttpClient httpClient = new HttpClient(new HttpClientTransportOverHTTP(1), null);
httpClient.setExecutor(clientThreads);
assertThrows(IllegalStateException.class, ()->{
httpClient.start();
});
assertThrows(IllegalStateException.class, httpClient::start);
}
@Test
@ -46,7 +43,8 @@ public class InsufficientThreadsDetectionTest
httpClient1.setExecutor(clientThreads);
httpClient1.start();
assertThrows(IllegalStateException.class, ()->{
assertThrows(IllegalStateException.class, () ->
{
// Share the same thread pool with another instance.
HttpClient httpClient2 = new HttpClient(new HttpClientTransportOverHTTP(1), null);
httpClient2.setExecutor(clientThreads);

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.client.http;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@ -39,10 +36,18 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
@ParameterizedTest
@ -178,10 +183,9 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
@ArgumentsSource(ScenarioProvider.class)
public void test_IdleConnection_IdleTimeout(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
startServer(scenario, new EmptyServerHandler());
long idleTimeout = 1000;
client.setIdleTimeout(idleTimeout);
startClient(scenario, null, httpClient -> httpClient.setIdleTimeout(idleTimeout));
try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())))
{

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.Promise;
@ -47,7 +48,13 @@ public class HttpClientTransportOverFCGI extends AbstractConnectorHttpClientTran
public HttpClientTransportOverFCGI(int selectors, String scriptRoot)
{
super(selectors);
this(new ClientConnector(), scriptRoot);
getClientConnector().setSelectors(selectors);
}
public HttpClientTransportOverFCGI(ClientConnector connector, String scriptRoot)
{
super(connector);
this.scriptRoot = scriptRoot;
setConnectionPoolFactory(destination ->
{

View File

@ -18,13 +18,10 @@
package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -38,32 +35,24 @@ import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* <p>{@link HTTP2Client} provides an asynchronous, non-blocking implementation
* <p>HTTP2Client provides an asynchronous, non-blocking implementation
* to send HTTP/2 frames to a server.</p>
* <p>Typical usage:</p>
* <pre>
* // Create and start HTTP2Client.
* HTTP2Client client = new HTTP2Client();
* SslContextFactory sslContextFactory = new SslContextFactory();
* client.addBean(sslContextFactory);
* client.start();
* SslContextFactory sslContextFactory = client.getClientConnector().getSslContextFactory();
*
* // Connect to host.
* String host = "webtide.com";
@ -108,7 +97,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
* // Use the Stream object to send request content, if any, using a DATA frame.
* ByteBuffer content = ...;
* DataFrame requestContent = new DataFrame(stream.getId(), content, true);
* stream.data(requestContent, Callback.Adapter.INSTANCE);
* stream.data(requestContent, Callback.NOOP);
*
* // When done, stop the client.
* client.stop();
@ -117,18 +106,9 @@ import org.eclipse.jetty.util.thread.Scheduler;
@ManagedObject
public class HTTP2Client extends ContainerLifeCycle
{
private Executor executor;
private Scheduler scheduler;
private ByteBufferPool bufferPool;
private ClientConnectionFactory connectionFactory;
private SelectorManager selector;
private int selectors = 1;
private long idleTimeout = 30000;
private long connectTimeout = 10000;
private boolean connectBlocking;
private SocketAddress bindAddress;
private final ClientConnector connector;
private int inputBufferSize = 8192;
private List<String> protocols = Arrays.asList("h2", "h2-17", "h2-16", "h2-15", "h2-14");
private List<String> protocols = List.of("h2");
private int initialSessionRecvWindow = 16 * 1024 * 1024;
private int initialStreamRecvWindow = 8 * 1024 * 1024;
private int maxFrameLength = Frame.DEFAULT_MAX_LENGTH;
@ -136,96 +116,50 @@ public class HTTP2Client extends ContainerLifeCycle
private int maxSettingsKeys = SettingsFrame.DEFAULT_MAX_KEYS;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
@Override
protected void doStart() throws Exception
public HTTP2Client()
{
if (executor == null)
setExecutor(new QueuedThreadPool());
if (scheduler == null)
setScheduler(new ScheduledExecutorScheduler());
if (bufferPool == null)
setByteBufferPool(new MappedByteBufferPool());
if (connectionFactory == null)
{
HTTP2ClientConnectionFactory h2 = new HTTP2ClientConnectionFactory();
setClientConnectionFactory((endPoint, context) ->
{
ClientConnectionFactory factory = h2;
SslContextFactory sslContextFactory = (SslContextFactory)context.get(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY);
if (sslContextFactory != null)
{
ALPNClientConnectionFactory alpn = new ALPNClientConnectionFactory(getExecutor(), h2, getProtocols());
factory = newSslClientConnectionFactory(sslContextFactory, alpn);
}
return factory.newConnection(endPoint, context);
});
}
if (selector == null)
{
selector = newSelectorManager();
addBean(selector);
}
selector.setConnectTimeout(getConnectTimeout());
super.doStart();
this(new ClientConnector());
}
protected SelectorManager newSelectorManager()
public HTTP2Client(ClientConnector connector)
{
return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors());
this.connector = connector;
addBean(connector);
}
protected ClientConnectionFactory newSslClientConnectionFactory(SslContextFactory sslContextFactory, ClientConnectionFactory connectionFactory)
public ClientConnector getClientConnector()
{
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), connectionFactory);
return connector;
}
public Executor getExecutor()
{
return executor;
return connector.getExecutor();
}
public void setExecutor(Executor executor)
{
this.updateBean(this.executor, executor);
this.executor = executor;
connector.setExecutor(executor);
}
public Scheduler getScheduler()
{
return scheduler;
return connector.getScheduler();
}
public void setScheduler(Scheduler scheduler)
{
this.updateBean(this.scheduler, scheduler);
this.scheduler = scheduler;
connector.setScheduler(scheduler);
}
public ByteBufferPool getByteBufferPool()
{
return bufferPool;
return connector.getByteBufferPool();
}
public void setByteBufferPool(ByteBufferPool bufferPool)
{
this.updateBean(this.bufferPool, bufferPool);
this.bufferPool = bufferPool;
}
public ClientConnectionFactory getClientConnectionFactory()
{
return connectionFactory;
}
public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
this.updateBean(this.connectionFactory, connectionFactory);
this.connectionFactory = connectionFactory;
connector.setByteBufferPool(bufferPool);
}
public FlowControlStrategy.Factory getFlowControlStrategyFactory()
@ -241,58 +175,55 @@ public class HTTP2Client extends ContainerLifeCycle
@ManagedAttribute("The number of selectors")
public int getSelectors()
{
return selectors;
return connector.getSelectors();
}
public void setSelectors(int selectors)
{
this.selectors = selectors;
connector.setSelectors(selectors);
}
@ManagedAttribute("The idle timeout in milliseconds")
public long getIdleTimeout()
{
return idleTimeout;
return connector.getIdleTimeout().toMillis();
}
public void setIdleTimeout(long idleTimeout)
{
this.idleTimeout = idleTimeout;
connector.setIdleTimeout(Duration.ofMillis(idleTimeout));
}
@ManagedAttribute("The connect timeout in milliseconds")
public long getConnectTimeout()
{
return connectTimeout;
return connector.getConnectTimeout().toMillis();
}
public void setConnectTimeout(long connectTimeout)
{
this.connectTimeout = connectTimeout;
SelectorManager selector = this.selector;
if (selector != null)
selector.setConnectTimeout(connectTimeout);
connector.setConnectTimeout(Duration.ofMillis(connectTimeout));
}
@ManagedAttribute("Whether the connect() operation is blocking")
public boolean isConnectBlocking()
{
return connectBlocking;
return connector.isConnectBlocking();
}
public void setConnectBlocking(boolean connectBlocking)
{
this.connectBlocking = connectBlocking;
connector.setConnectBlocking(connectBlocking);
}
public SocketAddress getBindAddress()
{
return bindAddress;
return connector.getBindAddress();
}
public void setBindAddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
connector.setBindAddress(bindAddress);
}
@ManagedAttribute("The size of the buffer used to read from the network")
@ -374,6 +305,7 @@ public class HTTP2Client extends ContainerLifeCycle
public void connect(InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
{
// Prior-knowledge clear-text HTTP/2 (h2c).
connect(null, address, listener, promise);
}
@ -384,112 +316,49 @@ public class HTTP2Client extends ContainerLifeCycle
public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
try
{
SocketChannel channel = SocketChannel.open();
SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
configure(channel);
boolean connected = true;
if (isConnectBlocking())
{
channel.socket().connect(address, (int)getConnectTimeout());
channel.configureBlocking(false);
}
else
{
channel.configureBlocking(false);
connected = channel.connect(address);
}
context = contextFrom(sslContextFactory, address, listener, promise, context);
if (connected)
selector.accept(channel, context);
else
selector.connect(channel, context);
}
catch (Throwable x)
{
promise.failed(x);
}
ClientConnectionFactory factory = newClientConnectionFactory(sslContextFactory);
connect(address, factory, listener, promise, context);
}
public void connect(SocketAddress address, ClientConnectionFactory factory, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
context = contextFrom(factory, listener, promise, context);
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise));
connector.connect(address, context);
}
public void accept(SslContextFactory sslContextFactory, SocketChannel channel, Session.Listener listener, Promise<Session> promise)
{
try
{
if (!channel.isConnected())
throw new IllegalStateException("SocketChannel must be connected");
channel.configureBlocking(false);
Map<String, Object> context = contextFrom(sslContextFactory, (InetSocketAddress)channel.getRemoteAddress(), listener, promise, null);
selector.accept(channel, context);
}
catch (Throwable x)
{
promise.failed(x);
}
ClientConnectionFactory factory = newClientConnectionFactory(sslContextFactory);
accept(channel, factory, listener, promise);
}
private Map<String, Object> contextFrom(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
public void accept(SocketChannel channel, ClientConnectionFactory factory, Session.Listener listener, Promise<Session> promise)
{
Map<String, Object> context = contextFrom(factory, listener, promise, null);
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise));
connector.accept(channel, context);
}
private Map<String, Object> contextFrom(ClientConnectionFactory factory, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
if (context == null)
context = new HashMap<>();
context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
if (sslContextFactory != null)
context.put(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY, sslContextFactory);
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, address.getHostString());
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, address.getPort());
context.putIfAbsent(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY, this);
context.put(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY, factory);
return context;
}
protected void configure(SocketChannel channel) throws IOException
private ClientConnectionFactory newClientConnectionFactory(SslContextFactory sslContextFactory)
{
channel.socket().setTcpNoDelay(true);
}
private class ClientSelectorManager extends SelectorManager
{
private ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
ClientConnectionFactory factory = new HTTP2ClientConnectionFactory();
if (sslContextFactory != null)
{
super(executor, scheduler, selectors);
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler());
endp.setIdleTimeout(getIdleTimeout());
return endp;
}
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
context.put(HTTP2ClientConnectionFactory.BYTE_BUFFER_POOL_CONTEXT_KEY, getByteBufferPool());
context.put(HTTP2ClientConnectionFactory.EXECUTOR_CONTEXT_KEY, getExecutor());
context.put(HTTP2ClientConnectionFactory.SCHEDULER_CONTEXT_KEY, getScheduler());
return getClientConnectionFactory().newConnection(endpoint, context);
}
@Override
protected void connectionFailed(SelectableChannel channel, Throwable failure, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
if (LOG.isDebugEnabled())
{
Object host = context.get(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY);
Object port = context.get(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY);
LOG.debug("Could not connect to {}:{}", host, port);
}
@SuppressWarnings("unchecked")
Promise<Session> promise = (Promise<Session>)context.get(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY);
promise.failed(failure);
ALPNClientConnectionFactory alpn = new ALPNClientConnectionFactory(getExecutor(), factory, getProtocols());
factory = new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), alpn);
}
return factory;
}
}

View File

@ -42,12 +42,9 @@ import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
{
public static final String CLIENT_CONTEXT_KEY = "http2.client";
public static final String BYTE_BUFFER_POOL_CONTEXT_KEY = "http2.client.byteBufferPool";
public static final String EXECUTOR_CONTEXT_KEY = "http2.client.executor";
public static final String SCHEDULER_CONTEXT_KEY = "http2.client.scheduler";
public static final String SESSION_LISTENER_CONTEXT_KEY = "http2.client.sessionListener";
public static final String SESSION_PROMISE_CONTEXT_KEY = "http2.client.sessionPromise";
public static final String CLIENT_CONTEXT_KEY = "org.eclipse.jetty.client.http2";
public static final String SESSION_LISTENER_CONTEXT_KEY = "org.eclipse.jetty.client.http2.sessionListener";
public static final String SESSION_PROMISE_CONTEXT_KEY = "org.eclipse.jetty.client.http2.sessionPromise";
private final Connection.Listener connectionListener = new ConnectionListener();
@ -55,9 +52,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
HTTP2Client client = (HTTP2Client)context.get(CLIENT_CONTEXT_KEY);
ByteBufferPool byteBufferPool = (ByteBufferPool)context.get(BYTE_BUFFER_POOL_CONTEXT_KEY);
Executor executor = (Executor)context.get(EXECUTOR_CONTEXT_KEY);
Scheduler scheduler = (Scheduler)context.get(SCHEDULER_CONTEXT_KEY);
ByteBufferPool byteBufferPool = client.getByteBufferPool();
Executor executor = client.getExecutor();
Scheduler scheduler = client.getScheduler();
Session.Listener listener = (Session.Listener)context.get(SESSION_LISTENER_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise<Session> promise = (Promise<Session>)context.get(SESSION_PROMISE_CONTEXT_KEY);

View File

@ -1,94 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.net.InetSocketAddress;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class Client
{
public static void main(String[] args) throws Exception
{
HTTP2Client client = new HTTP2Client();
SslContextFactory sslContextFactory = new SslContextFactory();
client.addBean(sslContextFactory);
client.start();
String host = "webtide.com";
int port = 443;
FuturePromise<Session> sessionPromise = new FuturePromise<>();
client.connect(sslContextFactory, new InetSocketAddress(host, port), new ServerSessionListener.Adapter(), sessionPromise);
Session session = sessionPromise.get(5, TimeUnit.SECONDS);
HttpFields requestFields = new HttpFields();
requestFields.put("User-Agent", client.getClass().getName() + "/" + Jetty.VERSION);
MetaData.Request metaData = new MetaData.Request("GET", new HttpURI("https://" + host + ":" + port + "/"), HttpVersion.HTTP_2, requestFields);
HeadersFrame headersFrame = new HeadersFrame(metaData, null, true);
final Phaser phaser = new Phaser(2);
session.newStream(headersFrame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
System.err.println(frame);
if (frame.isEndStream())
phaser.arrive();
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
System.err.println(frame);
callback.succeeded();
if (frame.isEndStream())
phaser.arrive();
}
@Override
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
{
System.err.println(frame);
phaser.register();
return this;
}
});
phaser.awaitAdvanceInterruptibly(phaser.arrive(), 5, TimeUnit.SECONDS);
client.stop();
}
}

View File

@ -45,7 +45,6 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ManagedObject("The HTTP/2 client transport")
public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
@ -101,13 +100,7 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
}
addBean(client);
super.doStart();
this.connectionFactory = new HTTP2ClientConnectionFactory();
client.setClientConnectionFactory((endPoint, context) ->
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
});
connectionFactory = new HTTP2ClientConnectionFactory();
}
@Override
@ -134,16 +127,12 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
SessionListenerPromise listenerPromise = new SessionListenerPromise(context);
HttpDestinationOverHTTP2 destination = (HttpDestinationOverHTTP2)context.get(HTTP_DESTINATION_CONTEXT_KEY);
SslContextFactory sslContextFactory = null;
if (HttpScheme.HTTPS.is(destination.getScheme()))
sslContextFactory = httpClient.getSslContextFactory();
connect(sslContextFactory, address, listenerPromise, listenerPromise, context);
connect(address, destination.getClientConnectionFactory(), listenerPromise, listenerPromise, context);
}
protected void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
protected void connect(InetSocketAddress address, ClientConnectionFactory factory, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
getHTTP2Client().connect(sslContextFactory, address, listener, promise, context);
getHTTP2Client().connect(address, factory, listener, promise, context);
}
@Override

View File

@ -51,11 +51,11 @@ import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.Test;
@ -171,9 +171,9 @@ public class MaxConcurrentStreamsTest extends AbstractTest
client = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client())
{
@Override
protected void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
protected void connect(InetSocketAddress address, ClientConnectionFactory factory, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
super.connect(sslContextFactory, address, new Wrapper(listener)
super.connect(address, factory, new Wrapper(listener)
{
@Override
public void onSettings(Session session, SettingsFrame frame)

View File

@ -28,10 +28,9 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
*/
public interface ClientConnectionFactory
{
public static final String CONNECTOR_CONTEXT_KEY = "client.connector";
public static final String CLIENT_CONTEXT_KEY = "org.eclipse.jetty.client";
/**
*
* @param endPoint the {@link org.eclipse.jetty.io.EndPoint} to link the newly created connection to
* @param context the context data to create the connection
* @return a new {@link Connection}
@ -41,8 +40,9 @@ public interface ClientConnectionFactory
public default Connection customize(Connection connection, Map<String, Object> context)
{
ContainerLifeCycle connector = (ContainerLifeCycle)context.get(CONNECTOR_CONTEXT_KEY);
connector.getBeans(Connection.Listener.class).forEach(connection::addListener);
ContainerLifeCycle client = (ContainerLifeCycle)context.get(CLIENT_CONTEXT_KEY);
if (client != null)
client.getBeans(Connection.Listener.class).forEach(connection::addListener);
return connection;
}
}

View File

@ -0,0 +1,333 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
public class ClientConnector extends ContainerLifeCycle
{
public static final String CLIENT_CONNECTOR_CONTEXT_KEY = "org.eclipse.jetty.client.connector";
public static final String SOCKET_ADDRESS_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".socketAddress";
public static final String CLIENT_CONNECTION_FACTORY_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".clientConnectionFactory";
public static final String CONNECTION_PROMISE_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".connectionPromise";
private static final Logger LOG = Log.getLogger(ClientConnector.class);
private Executor executor;
private Scheduler scheduler;
private ByteBufferPool byteBufferPool;
private SslContextFactory sslContextFactory;
private SelectorManager selectorManager;
private int selectors = 1;
private boolean connectBlocking;
private Duration connectTimeout = Duration.ofSeconds(5);
private Duration idleTimeout = Duration.ofSeconds(30);
private SocketAddress bindAddress;
public Executor getExecutor()
{
return executor;
}
public void setExecutor(Executor executor)
{
if (isStarted())
throw new IllegalStateException();
updateBean(this.executor, executor);
this.executor = executor;
}
public Scheduler getScheduler()
{
return scheduler;
}
public void setScheduler(Scheduler scheduler)
{
if (isStarted())
throw new IllegalStateException();
updateBean(this.scheduler, scheduler);
this.scheduler = scheduler;
}
public ByteBufferPool getByteBufferPool()
{
return byteBufferPool;
}
public void setByteBufferPool(ByteBufferPool byteBufferPool)
{
if (isStarted())
throw new IllegalStateException();
updateBean(this.byteBufferPool, byteBufferPool);
this.byteBufferPool = byteBufferPool;
}
public SslContextFactory getSslContextFactory()
{
return sslContextFactory;
}
public void setSslContextFactory(SslContextFactory sslContextFactory)
{
if (isStarted())
throw new IllegalStateException();
updateBean(this.sslContextFactory, sslContextFactory);
this.sslContextFactory = sslContextFactory;
}
public int getSelectors()
{
return selectors;
}
public void setSelectors(int selectors)
{
if (isStarted())
throw new IllegalStateException();
this.selectors = selectors;
}
public boolean isConnectBlocking()
{
return connectBlocking;
}
public void setConnectBlocking(boolean connectBlocking)
{
this.connectBlocking = connectBlocking;
}
public Duration getConnectTimeout()
{
return connectTimeout;
}
public void setConnectTimeout(Duration connectTimeout)
{
this.connectTimeout = connectTimeout;
if (selectorManager != null)
selectorManager.setConnectTimeout(connectTimeout.toMillis());
}
public Duration getIdleTimeout()
{
return idleTimeout;
}
public void setIdleTimeout(Duration idleTimeout)
{
this.idleTimeout = idleTimeout;
}
public SocketAddress getBindAddress()
{
return bindAddress;
}
public void setBindAddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
}
@Override
protected void doStart() throws Exception
{
if (executor == null)
setExecutor(new QueuedThreadPool());
if (scheduler == null)
setScheduler(new ScheduledExecutorScheduler());
if (byteBufferPool == null)
setByteBufferPool(new MappedByteBufferPool());
if (sslContextFactory == null)
setSslContextFactory(newSslContextFactory());
selectorManager = newSelectorManager();
selectorManager.setConnectTimeout(getConnectTimeout().toMillis());
addBean(selectorManager);
super.doStart();
}
@Override
protected void doStop() throws Exception
{
super.doStop();
removeBean(selectorManager);
}
protected SslContextFactory newSslContextFactory()
{
SslContextFactory sslContextFactory = new SslContextFactory(false);
sslContextFactory.setEndpointIdentificationAlgorithm("HTTPS");
return sslContextFactory;
}
protected SelectorManager newSelectorManager()
{
return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors());
}
public void connect(SocketAddress address, Map<String, Object> context)
{
SocketChannel channel = null;
try
{
if (context == null)
context = new HashMap<>();
context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this);
context.putIfAbsent(SOCKET_ADDRESS_CONTEXT_KEY, address);
channel = SocketChannel.open();
SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Binding to {} to connect to {}", bindAddress, address);
channel.bind(bindAddress);
}
configure(channel);
boolean connected = true;
boolean blocking = isConnectBlocking();
if (LOG.isDebugEnabled())
LOG.debug("Connecting {} to {}", blocking ? "blocking" : "non-blocking", address);
if (blocking)
{
channel.socket().connect(address, (int)getConnectTimeout().toMillis());
channel.configureBlocking(false);
}
else
{
channel.configureBlocking(false);
connected = channel.connect(address);
}
if (connected)
selectorManager.accept(channel, context);
else
selectorManager.connect(channel, context);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
// If IPv6 is not deployed, a generic SocketException "Network is unreachable"
// exception is being thrown, so we attempt to provide a better error message.
if (x.getClass() == SocketException.class)
x = new SocketException("Could not connect to " + address).initCause(x);
try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
finally
{
connectFailed(x, context);
}
}
}
public void accept(SocketChannel channel, Map<String, Object> context)
{
try
{
context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this);
if (!channel.isConnected())
throw new IllegalStateException("SocketChannel must be connected");
configure(channel);
channel.configureBlocking(false);
selectorManager.accept(channel, context);
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not accept {}", channel);
Promise<?> promise = (Promise<?>)context.get(CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(failure);
}
}
protected void configure(SocketChannel channel) throws IOException
{
channel.socket().setTcpNoDelay(true);
}
private void connectFailed(Throwable failure, Map<String, Object> context)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(SOCKET_ADDRESS_CONTEXT_KEY));
Promise<?> promise = (Promise<?>)context.get(CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(failure);
}
private class ClientSelectorManager extends SelectorManager
{
private ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
super(executor, scheduler, selectors);
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
SocketChannelEndPoint endPoint = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler());
endPoint.setIdleTimeout(getIdleTimeout().toMillis());
return endPoint;
}
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
ClientConnectionFactory factory = (ClientConnectionFactory)context.get(CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
return factory.newConnection(endPoint, context);
}
@Override
protected void connectionFailed(SelectableChannel channel, Throwable failure, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
connectFailed(failure, context);
}
}
}

View File

@ -37,9 +37,9 @@ public abstract class NegotiatingClientConnection extends AbstractConnection
private final Map<String, Object> context;
private volatile boolean completed;
protected NegotiatingClientConnection(EndPoint endp, Executor executor, SSLEngine sslEngine, ClientConnectionFactory connectionFactory, Map<String, Object> context)
protected NegotiatingClientConnection(EndPoint endPoint, Executor executor, SSLEngine sslEngine, ClientConnectionFactory connectionFactory, Map<String, Object> context)
{
super(endp, executor);
super(endPoint, executor);
this.engine = sslEngine;
this.connectionFactory = connectionFactory;
this.context = context;
@ -67,7 +67,7 @@ public abstract class NegotiatingClientConnection extends AbstractConnection
else
fillInterested();
}
catch (IOException x)
catch (Throwable x)
{
close();
throw new RuntimeIOException(x);

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.io.ssl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
@ -27,6 +28,7 @@ import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -34,10 +36,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
public class SslClientConnectionFactory implements ClientConnectionFactory
{
public static final String SSL_CONTEXT_FACTORY_CONTEXT_KEY = "ssl.context.factory";
public static final String SSL_PEER_HOST_CONTEXT_KEY = "ssl.peer.host";
public static final String SSL_PEER_PORT_CONTEXT_KEY = "ssl.peer.port";
public static final String SSL_ENGINE_CONTEXT_KEY = "ssl.engine";
public static final String SSL_ENGINE_CONTEXT_KEY = "org.eclipse.jetty.client.ssl.engine";
private final SslContextFactory sslContextFactory;
private final ByteBufferPool byteBufferPool;
@ -88,9 +87,8 @@ public class SslClientConnectionFactory implements ClientConnectionFactory
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
String host = (String)context.get(SSL_PEER_HOST_CONTEXT_KEY);
int port = (Integer)context.get(SSL_PEER_PORT_CONTEXT_KEY);
SSLEngine engine = sslContextFactory.newSSLEngine(host, port);
InetSocketAddress address = (InetSocketAddress)context.get(ClientConnector.SOCKET_ADDRESS_CONTEXT_KEY);
SSLEngine engine = sslContextFactory.newSSLEngine(address);
engine.setUseClientMode(true);
context.put(SSL_ENGINE_CONTEXT_KEY, engine);
@ -119,8 +117,9 @@ public class SslClientConnectionFactory implements ClientConnectionFactory
sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed());
sslConnection.setRenegotiationLimit(sslContextFactory.getRenegotiationLimit());
sslConnection.setAllowMissingCloseMessage(isAllowMissingCloseMessage());
ContainerLifeCycle connector = (ContainerLifeCycle)context.get(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY);
connector.getBeans(SslHandshakeListener.class).forEach(sslConnection::addHandshakeListener);
ContainerLifeCycle client = (ContainerLifeCycle)context.get(ClientConnectionFactory.CLIENT_CONTEXT_KEY);
if (client != null)
client.getBeans(SslHandshakeListener.class).forEach(sslConnection::addHandshakeListener);
}
return ClientConnectionFactory.super.customize(connection, context);
}

View File

@ -41,6 +41,7 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.zip.GZIPOutputStream;
@ -171,16 +172,23 @@ public class ProxyServletTest
private void startClient() throws Exception
{
client = prepareClient();
startClient(null);
}
private HttpClient prepareClient() throws Exception
private void startClient(Consumer<HttpClient> consumer) throws Exception
{
client = prepareClient(consumer);
}
private HttpClient prepareClient(Consumer<HttpClient> consumer) throws Exception
{
QueuedThreadPool clientPool = new QueuedThreadPool();
clientPool.setName("client");
HttpClient result = new HttpClient();
result.setExecutor(clientPool);
result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyConnector.getLocalPort()));
if (consumer != null)
consumer.accept(result);
result.start();
return result;
}
@ -987,7 +995,7 @@ public class ProxyServletTest
assertEquals(name, cookies.get(0).getName());
assertEquals(value1, cookies.get(0).getValue());
HttpClient client2 = prepareClient();
HttpClient client2 = prepareClient(null);
try
{
String value2 = "2";
@ -1373,10 +1381,8 @@ public class ProxyServletTest
}
});
startProxy(proxyServletClass);
startClient();
long idleTimeout = 1000;
client.setIdleTimeout(idleTimeout);
startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
byte[] content = new byte[1024];
new Random().nextBytes(content);

View File

@ -21,67 +21,97 @@ package org.eclipse.jetty.unixsocket.client;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.channels.SocketChannel;
import java.util.Map;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.unixsocket.UnixSocketEndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import jnr.enxio.channels.NativeSelectorProvider;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;
import org.eclipse.jetty.client.AbstractHttpClientTransport;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.unixsocket.UnixSocketEndPoint;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpClientTransportOverUnixSockets
extends HttpClientTransportOverHTTP
// TODO: this class needs a thorough review.
public class HttpClientTransportOverUnixSockets extends AbstractHttpClientTransport
{
private static final Logger LOG = Log.getLogger( HttpClientTransportOverUnixSockets.class );
private static final Logger LOG = Log.getLogger(HttpClientTransportOverUnixSockets.class);
private String _unixSocket;
private SelectorManager selectorManager;
private UnixSocketChannel channel;
public HttpClientTransportOverUnixSockets( String unixSocket )
public HttpClientTransportOverUnixSockets(String unixSocket)
{
if ( unixSocket == null )
{
throw new IllegalArgumentException( "Unix socket file cannot be null" );
}
if (unixSocket == null)
throw new IllegalArgumentException("Unix socket file cannot be null");
this._unixSocket = unixSocket;
setConnectionPoolFactory(destination ->
{
HttpClient httpClient = getHttpClient();
int maxConnections = httpClient.getMaxConnectionsPerDestination();
return new DuplexConnectionPool(destination, maxConnections, destination);
});
}
@Override
protected SelectorManager newSelectorManager(HttpClient client)
protected void doStart() throws Exception
{
return selectorManager = new UnixSocketSelectorManager(client,getSelectors());
HttpClient httpClient = getHttpClient();
selectorManager = new UnixSocketSelectorManager(httpClient, 1);
selectorManager.setConnectTimeout(httpClient.getConnectTimeout());
addBean(selectorManager);
super.doStart();
}
@Override
public void connect( InetSocketAddress address, Map<String, Object> context )
protected void doStop() throws Exception
{
super.doStop();
try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
}
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new HttpDestinationOverHTTP(getHttpClient(), origin);
}
@Override
public void connect(InetSocketAddress address, Map<String, Object> context)
{
try
{
InetAddress inet = address.getAddress();
if (!inet.isLoopbackAddress() && !inet.isLinkLocalAddress() && !inet.isSiteLocalAddress())
throw new IOException("UnixSocket cannot connect to "+address.getHostString());
throw new IOException("UnixSocket cannot connect to " + address.getHostString());
// Open a unix socket
UnixSocketAddress unixAddress = new UnixSocketAddress( this._unixSocket );
channel = UnixSocketChannel.open( unixAddress );
UnixSocketAddress unixAddress = new UnixSocketAddress(this._unixSocket);
channel = UnixSocketChannel.open(unixAddress);
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
@ -95,11 +125,6 @@ public class HttpClientTransportOverUnixSockets
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
// If IPv6 is not deployed, a generic SocketException "Network is unreachable"
// exception is being thrown, so we attempt to provide a better error message.
if (x.getClass() == SocketException.class)
x = new SocketException("Could not connect to " + address).initCause(x);
try
{
if (channel != null)
@ -116,11 +141,33 @@ public class HttpClientTransportOverUnixSockets
}
}
public class UnixSocketSelectorManager extends ClientSelectorManager
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise<org.eclipse.jetty.client.api.Connection> promise = (Promise<org.eclipse.jetty.client.api.Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
HttpConnectionOverHTTP connection = newHttpConnection(endPoint, destination, promise);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", connection);
return customize(connection, context);
}
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<org.eclipse.jetty.client.api.Connection> promise)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise);
}
protected void configure(HttpClient client, SocketChannel channel) throws IOException
{
channel.socket().setTcpNoDelay(client.isTCPNoDelay());
}
public class UnixSocketSelectorManager extends SelectorManager
{
protected UnixSocketSelectorManager(HttpClient client, int selectors)
{
super(client,selectors);
super(client.getExecutor(), client.getScheduler(), selectors);
}
@Override
@ -132,25 +179,26 @@ public class HttpClientTransportOverUnixSockets
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{
UnixSocketEndPoint endp = new UnixSocketEndPoint((UnixSocketChannel)channel, selector, key, getScheduler());
endp.setIdleTimeout(getHttpClient().getIdleTimeout());
return endp;
UnixSocketEndPoint endPoint = new UnixSocketEndPoint((UnixSocketChannel)channel, selector, key, getScheduler());
endPoint.setIdleTimeout(getHttpClient().getIdleTimeout());
return endPoint;
}
}
@Override
protected void doStop()
throws Exception
{
super.doStop();
try
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
if (channel != null)
channel.close();
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
}
catch (IOException xx)
@Override
protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
{
LOG.ignore(xx);
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
connectFailed(context, x);
}
}
}

View File

@ -18,14 +18,6 @@
package org.eclipse.jetty.unixsocket;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.condition.OS.LINUX;
import static org.junit.jupiter.api.condition.OS.MAC;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
@ -34,7 +26,6 @@ import java.nio.file.Paths;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -54,6 +45,14 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.condition.OS.LINUX;
import static org.junit.jupiter.api.condition.OS.MAC;
@EnabledOnOs({LINUX, MAC})
public class UnixSocketTest
{
@ -68,94 +67,86 @@ public class UnixSocketTest
{
server = null;
httpClient = null;
String unixSocketTmp = System.getProperty( "unix.socket.tmp" );
if(StringUtil.isNotBlank( unixSocketTmp ) )
{
sockFile = Files.createTempFile( Paths.get(unixSocketTmp), "unix", ".sock" );
} else {
sockFile = Files.createTempFile("unix", ".sock" );
}
assertTrue(Files.deleteIfExists(sockFile),"temp sock file cannot be deleted");
String unixSocketTmp = System.getProperty("unix.socket.tmp");
if (StringUtil.isNotBlank(unixSocketTmp))
sockFile = Files.createTempFile(Paths.get(unixSocketTmp), "unix", ".sock");
else
sockFile = Files.createTempFile("unix", ".sock");
assertTrue(Files.deleteIfExists(sockFile), "temp sock file cannot be deleted");
}
@AfterEach
public void after() throws Exception
{
if (httpClient!=null)
if (httpClient != null)
httpClient.stop();
if (server!=null)
if (server != null)
server.stop();
// Force delete, this will fail if UnixSocket was not closed properly in the implementation
FS.delete( sockFile);
FS.delete(sockFile);
}
@Test
public void testUnixSocket() throws Exception
{
server = new Server();
HttpConnectionFactory http = new HttpConnectionFactory();
UnixSocketConnector connector = new UnixSocketConnector(server, http);
connector.setUnixSocket(sockFile.toString());
server.addConnector(connector);
UnixSocketConnector connector = new UnixSocketConnector( server, http );
connector.setUnixSocket( sockFile.toString() );
server.addConnector( connector );
server.setHandler( new AbstractHandler.ErrorDispatchHandler()
server.setHandler(new AbstractHandler.ErrorDispatchHandler()
{
@Override
protected void doNonErrorHandle( String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response )
throws IOException, ServletException
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
int l = 0;
if ( request.getContentLength() != 0 )
if (request.getContentLength() != 0)
{
InputStream in = request.getInputStream();
byte[] buffer = new byte[4096];
int r = 0;
while ( r >= 0 )
while (r >= 0)
{
l += r;
r = in.read( buffer );
r = in.read(buffer);
}
}
log.info( "UnixSocketTest: request received" );
baseRequest.setHandled( true );
response.setStatus( 200 );
response.getWriter().write( "Hello World " + new Date() + "\r\n" );
log.info("UnixSocketTest: request received");
baseRequest.setHandled(true);
response.setStatus(200);
response.getWriter().write("Hello World " + new Date() + "\r\n");
response.getWriter().write(
"remote=" + request.getRemoteAddr() + ":" + request.getRemotePort() + "\r\n" );
"remote=" + request.getRemoteAddr() + ":" + request.getRemotePort() + "\r\n");
response.getWriter().write(
"local =" + request.getLocalAddr() + ":" + request.getLocalPort() + "\r\n" );
response.getWriter().write( "read =" + l + "\r\n" );
"local =" + request.getLocalAddr() + ":" + request.getLocalPort() + "\r\n");
response.getWriter().write("read =" + l + "\r\n");
}
} );
});
server.start();
httpClient = new HttpClient( new HttpClientTransportOverUnixSockets( sockFile.toString() ), null );
httpClient = new HttpClient(new HttpClientTransportOverUnixSockets(sockFile.toString()), null);
httpClient.start();
ContentResponse contentResponse = httpClient
.newRequest( "http://localhost" )
.newRequest("http://localhost")
.send();
log.debug( "response from server: {}", contentResponse.getContentAsString() );
log.debug("response from server: {}", contentResponse.getContentAsString());
assertThat(contentResponse.getContentAsString(), containsString( "Hello World" ));
assertThat(contentResponse.getContentAsString(), containsString("Hello World"));
}
@Test
public void testNotLocal() throws Exception
{
httpClient = new HttpClient( new HttpClientTransportOverUnixSockets( sockFile.toString() ), null );
httpClient = new HttpClient(new HttpClientTransportOverUnixSockets(sockFile.toString()), null);
httpClient.start();
ExecutionException e = assertThrows(ExecutionException.class, ()->{
httpClient.newRequest( "http://google.com" ).send();
});
ExecutionException e = assertThrows(ExecutionException.class, () -> httpClient.newRequest("http://google.com").send());
assertThat(e.getCause(), instanceOf(IOException.class));
assertThat(e.getCause().getMessage(),containsString("UnixSocket cannot connect to google.com"));
assertThat(e.getCause().getMessage(), containsString("UnixSocket cannot connect to google.com"));
}
}

View File

@ -18,10 +18,6 @@
package org.eclipse.jetty.http.client;
import static org.eclipse.jetty.http.client.Transport.H2C;
import static org.eclipse.jetty.http.client.Transport.HTTP;
import static org.hamcrest.MatcherAssert.assertThat;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -44,6 +40,11 @@ import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.eclipse.jetty.http.client.Transport.H2C;
import static org.eclipse.jetty.http.client.Transport.HTTP;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
public class ConnectionStatisticsTest extends AbstractTest<TransportScenario>
{
@Override
@ -93,19 +94,19 @@ public class ConnectionStatisticsTest extends AbstractTest<TransportScenario>
scenario.client.addBean(closer);
clientStats.start();
scenario.client.setIdleTimeout(1000);
long idleTimeout = 1000;
scenario.client.setIdleTimeout(idleTimeout);
byte[] content = new byte[3072];
long contentLength = content.length;
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.header(HttpHeader.CONNECTION,"close")
.header(HttpHeader.CONNECTION, "close")
.content(new BytesContentProvider(content))
.timeout(5, TimeUnit.SECONDS)
.send();
assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200));
closed.await();
assertTrue(closed.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertThat(serverStats.getConnectionsMax(), Matchers.greaterThan(0L));
assertThat(serverStats.getReceivedBytes(), Matchers.greaterThan(contentLength));

View File

@ -327,7 +327,7 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
{
init(transport);
final long idleTimeout = 1000;
scenario.start(new AbstractHandler()
scenario.startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
@ -343,8 +343,7 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
}
}
});
scenario.client.setIdleTimeout(2 * idleTimeout);
scenario.startClient(httpClient -> httpClient.setIdleTimeout(2 * idleTimeout));
byte[] content = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1);
@ -375,7 +374,7 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
{
init(transport);
final long idleTimeout = 1000;
scenario.start(new AbstractHandler()
scenario.startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
@ -393,8 +392,7 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
}
}
});
scenario.client.setIdleTimeout(idleTimeout);
scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
byte[] content = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1);

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.http.client;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -37,6 +34,9 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpClientIdleTimeoutTest extends AbstractTest<TransportScenario>
{
private long idleTimeout = 1000;
@ -52,7 +52,7 @@ public class HttpClientIdleTimeoutTest extends AbstractTest<TransportScenario>
public void testClientIdleTimeout(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
scenario.startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
@ -65,9 +65,7 @@ public class HttpClientIdleTimeoutTest extends AbstractTest<TransportScenario>
}
}
});
scenario.client.stop();
scenario.client.setIdleTimeout(idleTimeout);
scenario.client.start();
scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
final CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
@ -126,10 +124,8 @@ public class HttpClientIdleTimeoutTest extends AbstractTest<TransportScenario>
public void testIdleClientIdleTimeout(Transport transport) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler());
scenario.client.stop();
scenario.client.setIdleTimeout(idleTimeout);
scenario.client.start();
scenario.startServer(new EmptyServerHandler());
scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
// Make a first request to open a connection.
ContentResponse response = scenario.client.newRequest(scenario.newURI()).send();

View File

@ -18,15 +18,6 @@
package org.eclipse.jetty.http.client;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
@ -63,6 +54,15 @@ import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class HttpClientTest extends AbstractTest<TransportScenario>
{
@Override
@ -453,7 +453,9 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
public void testConnectionListener(Transport transport) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler());
scenario.startServer(new EmptyServerHandler());
long idleTimeout = 1000;
scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
CountDownLatch openLatch = new CountDownLatch(1);
CountDownLatch closeLatch = new CountDownLatch(1);
@ -472,9 +474,6 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
});
long idleTimeout = 1000;
scenario.client.setIdleTimeout(idleTimeout);
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.scheme(scenario.getScheme())
.timeout(5, TimeUnit.SECONDS)

View File

@ -2,6 +2,6 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.client.LEVEL=DEBUG
#org.eclipse.jetty.http2.LEVEL=DEBUG
#org.eclipse.jetty.http2.hpack.LEVEL=INFO
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.http2.client.LEVEL=DEBUG
#org.eclipse.jetty.io.LEVEL=DEBUG