Issue #132 - ClientConnector abstraction.

Rewrote HttpClientTransportOverUnixSockets in light of ClientConnector.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-01-22 17:33:29 +01:00
parent 64a2bc346e
commit 51730a7ccf
7 changed files with 103 additions and 175 deletions

View File

@ -18,12 +18,14 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Map;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ -75,4 +77,18 @@ public abstract class AbstractConnectorHttpClientTransport extends AbstractHttpC
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<>(promise));
connector.connect(address, context);
}
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
org.eclipse.jetty.io.Connection connection = newHttpConnection(endPoint, destination, promise);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", connection);
return customize(connection, context);
}
protected abstract org.eclipse.jetty.io.Connection newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise);
}

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.client.http;
import java.io.IOException;
import java.util.Map;
import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpDestination;
@ -58,18 +55,6 @@ public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTran
return new HttpDestinationOverHTTP(getHttpClient(), origin);
}
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
HttpConnectionOverHTTP connection = newHttpConnection(endPoint, destination, promise);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", connection);
return customize(connection, context);
}
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise);

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.fcgi.client.http;
import java.util.Map;
import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpClient;
@ -76,18 +74,6 @@ public class HttpClientTransportOverFCGI extends AbstractConnectorHttpClientTran
return new HttpDestinationOverFCGI(getHttpClient(), origin);
}
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
HttpConnectionOverFCGI connection = newHttpConnection(endPoint, destination, promise);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", connection);
return customize(connection, context);
}
protected HttpConnectionOverFCGI newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
return new HttpConnectionOverFCGI(endPoint, destination, promise);

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.io;
import java.io.Closeable;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
@ -251,20 +252,8 @@ public class ClientConnector extends ContainerLifeCycle
// exception is being thrown, so we attempt to provide a better error message.
if (x.getClass() == SocketException.class)
x = new SocketException("Could not connect to " + address).initCause(x);
try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
finally
{
connectFailed(x, context);
}
safeClose(channel);
connectFailed(x, context);
}
}
@ -273,7 +262,6 @@ public class ClientConnector extends ContainerLifeCycle
try
{
context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this);
if (!channel.isConnected())
throw new IllegalStateException("SocketChannel must be connected");
configure(channel);
@ -284,17 +272,31 @@ public class ClientConnector extends ContainerLifeCycle
{
if (LOG.isDebugEnabled())
LOG.debug("Could not accept {}", channel);
safeClose(channel);
Promise<?> promise = (Promise<?>)context.get(CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(failure);
}
}
protected void safeClose(Closeable closeable)
{
try
{
if (closeable != null)
closeable.close();
}
catch (Throwable x)
{
LOG.ignore(x);
}
}
protected void configure(SocketChannel channel) throws IOException
{
channel.socket().setTcpNoDelay(true);
}
private void connectFailed(Throwable failure, Map<String, Object> context)
protected void connectFailed(Throwable failure, Map<String, Object> context)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(SOCKET_ADDRESS_CONTEXT_KEY));
@ -302,9 +304,9 @@ public class ClientConnector extends ContainerLifeCycle
promise.failed(failure);
}
private class ClientSelectorManager extends SelectorManager
protected class ClientSelectorManager extends SelectorManager
{
private ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
protected ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
super(executor, scheduler, selectors);
}

View File

@ -20,31 +20,25 @@ package org.eclipse.jetty.unixsocket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import jnr.unixsocket.UnixSocketChannel;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import jnr.unixsocket.UnixSocketChannel;
public class UnixSocketEndPoint extends ChannelEndPoint
{
private static final Logger LOG = Log.getLogger(UnixSocketEndPoint.class);
private static final Logger CEPLOG = Log.getLogger(ChannelEndPoint.class);
private final UnixSocketChannel _channel;
public UnixSocketEndPoint(UnixSocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(channel,selector,key,scheduler);
_channel=channel;
super(channel, selector, key, scheduler);
_channel = channel;
}
@Override
@ -59,7 +53,6 @@ public class UnixSocketEndPoint extends ChannelEndPoint
return null;
}
@Override
protected void doShutdownOutput()
{

View File

@ -19,25 +19,28 @@
package org.eclipse.jetty.unixsocket.client;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.Executor;
import jnr.enxio.channels.NativeSelectorProvider;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;
import org.eclipse.jetty.client.AbstractHttpClientTransport;
import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
@ -45,22 +48,21 @@ import org.eclipse.jetty.unixsocket.UnixSocketEndPoint;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
// TODO: this class needs a thorough review.
public class HttpClientTransportOverUnixSockets extends AbstractHttpClientTransport
public class HttpClientTransportOverUnixSockets extends AbstractConnectorHttpClientTransport
{
private static final Logger LOG = Log.getLogger(HttpClientTransportOverUnixSockets.class);
private String _unixSocket;
private SelectorManager selectorManager;
private UnixSocketChannel channel;
public HttpClientTransportOverUnixSockets(String unixSocket)
{
if (unixSocket == null)
throw new IllegalArgumentException("Unix socket file cannot be null");
this._unixSocket = unixSocket;
this(new UnixSocketClientConnector(unixSocket));
}
private HttpClientTransportOverUnixSockets(ClientConnector connector)
{
super(connector);
setConnectionPoolFactory(destination ->
{
HttpClient httpClient = getHttpClient();
@ -69,136 +71,80 @@ public class HttpClientTransportOverUnixSockets extends AbstractHttpClientTransp
});
}
@Override
protected void doStart() throws Exception
{
HttpClient httpClient = getHttpClient();
selectorManager = new UnixSocketSelectorManager(httpClient, 1);
selectorManager.setConnectTimeout(httpClient.getConnectTimeout());
addBean(selectorManager);
super.doStart();
}
@Override
protected void doStop() throws Exception
{
super.doStop();
try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
}
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new HttpDestinationOverHTTP(getHttpClient(), origin);
}
@Override
public void connect(InetSocketAddress address, Map<String, Object> context)
{
try
{
InetAddress inet = address.getAddress();
if (!inet.isLoopbackAddress() && !inet.isLinkLocalAddress() && !inet.isSiteLocalAddress())
throw new IOException("UnixSocket cannot connect to " + address.getHostString());
// Open a unix socket
UnixSocketAddress unixAddress = new UnixSocketAddress(this._unixSocket);
channel = UnixSocketChannel.open(unixAddress);
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
configure(client, channel);
channel.configureBlocking(false);
selectorManager.accept(channel, context);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
finally
{
connectFailed(context, x);
}
}
}
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise<org.eclipse.jetty.client.api.Connection> promise = (Promise<org.eclipse.jetty.client.api.Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
HttpConnectionOverHTTP connection = newHttpConnection(endPoint, destination, promise);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", connection);
return customize(connection, context);
}
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<org.eclipse.jetty.client.api.Connection> promise)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise);
}
protected void configure(HttpClient client, SocketChannel channel) throws IOException
private static class UnixSocketClientConnector extends ClientConnector
{
channel.socket().setTcpNoDelay(client.isTCPNoDelay());
}
private final String unixSocket;
public class UnixSocketSelectorManager extends SelectorManager
{
protected UnixSocketSelectorManager(HttpClient client, int selectors)
private UnixSocketClientConnector(String unixSocket)
{
super(client.getExecutor(), client.getScheduler(), selectors);
this.unixSocket = unixSocket;
}
@Override
protected Selector newSelector() throws IOException
protected SelectorManager newSelectorManager()
{
return NativeSelectorProvider.getInstance().openSelector();
return new UnixSocketSelectorManager(getExecutor(), getScheduler(), getSelectors());
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
public void connect(SocketAddress address, Map<String, Object> context)
{
UnixSocketEndPoint endPoint = new UnixSocketEndPoint((UnixSocketChannel)channel, selector, key, getScheduler());
endPoint.setIdleTimeout(getHttpClient().getIdleTimeout());
return endPoint;
InetSocketAddress socketAddress = (InetSocketAddress)address;
InetAddress inetAddress = socketAddress.getAddress();
if (inetAddress.isLoopbackAddress() || inetAddress.isLinkLocalAddress() || inetAddress.isSiteLocalAddress())
{
SocketChannel channel = null;
try
{
UnixSocketAddress unixAddress = new UnixSocketAddress(unixSocket);
channel = UnixSocketChannel.open(unixAddress);
if (LOG.isDebugEnabled())
LOG.debug("Created {} for {}", channel, unixAddress);
accept(channel, context);
}
catch (Throwable x)
{
safeClose(channel);
connectFailed(x, context);
}
}
else
{
connectFailed(new ConnectException("UnixSocket cannot connect to " + socketAddress.getHostString()), context);
}
}
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
private class UnixSocketSelectorManager extends ClientSelectorManager
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
}
private UnixSocketSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
super(executor, scheduler, selectors);
}
@Override
protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
connectFailed(context, x);
@Override
protected Selector newSelector() throws IOException
{
return NativeSelectorProvider.getInstance().openSelector();
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{
UnixSocketEndPoint endPoint = new UnixSocketEndPoint((UnixSocketChannel)channel, selector, key, getScheduler());
endPoint.setIdleTimeout(getIdleTimeout().toMillis());
return endPoint;
}
}
}
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.unixsocket;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -146,7 +147,6 @@ public class UnixSocketTest
httpClient.start();
ExecutionException e = assertThrows(ExecutionException.class, () -> httpClient.newRequest("http://google.com").send());
assertThat(e.getCause(), instanceOf(IOException.class));
assertThat(e.getCause().getMessage(), containsString("UnixSocket cannot connect to google.com"));
assertThat(e.getCause(), instanceOf(ConnectException.class));
}
}