Merge pull request #3474 from eclipse/jetty-9.4.x-3180-review_unixsocket

Issue #3180 - Review client support for Unix sockets.
This commit is contained in:
Simone Bordet 2019-04-06 11:55:29 +02:00 committed by GitHub
commit aa7e9ee060
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 381 additions and 487 deletions

View File

@ -148,6 +148,11 @@ public abstract class AbstractConnectorHttpClientTransport extends AbstractHttpC
return new ClientSelectorManager(client, getSelectors());
}
protected SelectorManager getSelectorManager()
{
return selectorManager;
}
protected class ClientSelectorManager extends SelectorManager
{
private final HttpClient client;

View File

@ -1,5 +1,6 @@
<?xml version="1.0"?>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="unixSocketHttpConfig" class="org.eclipse.jetty.server.HttpConfiguration">
<Call name="addCustomizer">
<Arg>

View File

@ -1,4 +1,4 @@
<?xml version="1.0"?>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="unixSocketConnector" class="org.eclipse.jetty.unixsocket.UnixSocketConnector">

View File

@ -1,9 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<!-- ============================================================= -->
<!-- Configure a HTTP2 on the ssl connector. -->
<!-- ============================================================= -->
<Configure id="unixSocketConnector" class="org.eclipse.jetty.unixsocket.UnixSocketConnector">
<Call name="addConnectionFactory">
<Arg>

View File

@ -1,7 +1,7 @@
<?xml version="1.0"?>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="unixSocketConnector" class="org.eclipse.jetty.server.ServerConnector">
<Configure id="unixSocketConnector" class="org.eclipse.jetty.unixsocket.UnixSocketConnector">
<Call name="addFirstConnectionFactory">
<Arg>
<New class="org.eclipse.jetty.server.ProxyConnectionFactory"/>

View File

@ -1,5 +1,6 @@
<?xml version="1.0"?>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="unixSocketHttpConfig" class="org.eclipse.jetty.server.HttpConfiguration">
<Call name="addCustomizer">
<Arg>

View File

@ -1,4 +1,4 @@
<?xml version="1.0"?>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="Server" class="org.eclipse.jetty.server.Server">
@ -11,11 +11,7 @@
<New id="unixSocketConnector" class="org.eclipse.jetty.unixsocket.UnixSocketConnector">
<Arg name="server"><Ref refid="Server" /></Arg>
<Arg name="selectors" type="int"><Property name="jetty.unixsocket.selectors" default="-1"/></Arg>
<Arg name="factories">
<Array type="org.eclipse.jetty.server.ConnectionFactory">
</Array>
</Arg>
<Set name="unixSocket"><Property name="jetty.unixsocket" default="/tmp/jetty.sock" /></Set>
<Set name="unixSocket"><Property name="jetty.unixsocket.path" default="/tmp/jetty.sock" /></Set>
<Set name="idleTimeout"><Property name="jetty.unixsocket.idleTimeout" default="30000"/></Set>
<Set name="acceptQueueSize"><Property name="jetty.unixsocket.acceptQueueSize" default="0"/></Set>
</New>

View File

@ -46,13 +46,13 @@ http://www.apache.org/licenses/LICENSE-2.0.html
[ini-template]
### Unix SocketHTTP Connector Configuration
## Connector host/address to bind to
# jetty.unixsocket=/tmp/jetty.sock
## Unix socket path to bind to
# jetty.unixsocket.path=/tmp/jetty.sock
## Connector idle timeout in milliseconds
# jetty.unixsocket.idleTimeout=30000
## Number of selectors (-1 picks default 1)
## Number of selectors (-1 picks default)
# jetty.unixsocket.selectors=-1
## ServerSocketChannel backlog (0 picks platform default)

View File

@ -28,8 +28,11 @@ import java.nio.channels.Selector;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import jnr.enxio.channels.NativeSelectorProvider;
import jnr.unixsocket.UnixServerSocketChannel;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
@ -49,164 +52,129 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
import jnr.enxio.channels.NativeSelectorProvider;
import jnr.unixsocket.UnixServerSocketChannel;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;
/**
*
* <p>A server-side connector for UNIX sockets.</p>
*/
@ManagedObject("Connector using UNIX Socket")
public class UnixSocketConnector extends AbstractConnector
{
// See SockAddrUnix.ADDR_LENGTH.
public static final int MAX_UNIX_SOCKET_PATH_LENGTH = 107;
private static final Logger LOG = Log.getLogger(UnixSocketConnector.class);
private final SelectorManager _manager;
private String _unixSocket = "/tmp/jetty.sock";
private volatile UnixServerSocketChannel _acceptChannel;
private volatile int _acceptQueueSize = 0;
private volatile boolean _reuseAddress = true;
/* ------------------------------------------------------------ */
/** HTTP Server Connection.
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.</p>
* @param server The {@link Server} this connector will accept connection for.
/**
* <p>Constructs a UnixSocketConnector with the default configuration.</p>
*
* @param server the {@link Server} this connector will accept connections for.
*/
public UnixSocketConnector( @Name("server") Server server)
public UnixSocketConnector(@Name("server") Server server)
{
this(server,null,null,null,-1,new HttpConnectionFactory());
}
/* ------------------------------------------------------------ */
/** HTTP Server Connection.
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.</p>
* @param server The {@link Server} this connector will accept connection for.
* @param selectors
* the number of selector threads, or &lt;=0 for a default value. Selectors notice and schedule established connection that can make IO progress.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("selectors") int selectors)
{
this(server,null,null,null,selectors,new HttpConnectionFactory());
}
/* ------------------------------------------------------------ */
/** HTTP Server Connection.
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.</p>
* @param server The {@link Server} this connector will accept connection for.
* @param selectors
* the number of selector threads, or &lt;=0 for a default value. Selectors notice and schedule established connection that can make IO progress.
* @param factories Zero or more {@link ConnectionFactory} instances used to create and configure connections.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("selectors") int selectors,
@Name("factories") ConnectionFactory... factories)
{
this(server,null,null,null,selectors,factories);
this(server, -1);
}
/* ------------------------------------------------------------ */
/** Generic Server Connection with default configuration.
* <p>Construct a Server Connector with the passed Connection factories.</p>
* @param server The {@link Server} this connector will accept connection for.
* @param factories Zero or more {@link ConnectionFactory} instances used to create and configure connections.
/**
* <p>Constructs a UnixSocketConnector with the given number of selectors</p>
*
* @param server the {@link Server} this connector will accept connections for.
* @param selectors the number of selectors, or &lt;=0 for a default value.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("factories") ConnectionFactory... factories)
public UnixSocketConnector(@Name("server") Server server, @Name("selectors") int selectors)
{
this(server,null,null,null,-1,factories);
this(server, selectors, new HttpConnectionFactory());
}
/* ------------------------------------------------------------ */
/** HTTP Server Connection.
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the primary protocol</p>.
* @param server The {@link Server} this connector will accept connection for.
* @param sslContextFactory If non null, then a {@link SslConnectionFactory} is instantiated and prepended to the
* list of HTTP Connection Factory.
/**
* <p>Constructs a UnixSocketConnector with the given ConnectionFactories.</p>
*
* @param server the {@link Server} this connector will accept connections for.
* @param factories zero or more {@link ConnectionFactory} instances used to create and configure connections.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("sslContextFactory") SslContextFactory sslContextFactory)
public UnixSocketConnector(@Name("server") Server server, @Name("factories") ConnectionFactory... factories)
{
this(server,null,null,null,-1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
this(server, -1, factories);
}
/* ------------------------------------------------------------ */
/** HTTP Server Connection.
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the primary protocol</p>.
* @param server The {@link Server} this connector will accept connection for.
* @param sslContextFactory If non null, then a {@link SslConnectionFactory} is instantiated and prepended to the
* list of HTTP Connection Factory.
* @param selectors
* the number of selector threads, or &lt;=0 for a default value. Selectors notice and schedule established connection that can make IO progress.
/**
* <p>Constructs a UnixSocketConnector with the given selectors and ConnectionFactories.</p>
*
* @param server the {@link Server} this connector will accept connections for.
* @param selectors the number of selectors, or &lt;=0 for a default value.
* @param factories zero or more {@link ConnectionFactory} instances used to create and configure connections.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("selectors") int selectors,
@Name("sslContextFactory") SslContextFactory sslContextFactory)
public UnixSocketConnector(@Name("server") Server server, @Name("selectors") int selectors, @Name("factories") ConnectionFactory... factories)
{
this(server,null,null,null,selectors,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
this(server, null, null, null, selectors, factories);
}
/* ------------------------------------------------------------ */
/** Generic SSL Server Connection.
* @param server The {@link Server} this connector will accept connection for.
* @param sslContextFactory If non null, then a {@link SslConnectionFactory} is instantiated and prepended to the
* list of ConnectionFactories, with the first factory being the default protocol for the SslConnectionFactory.
* @param factories Zero or more {@link ConnectionFactory} instances used to create and configure connections.
/**
* <p>Constructs a UnixSocketConnector with the given SslContextFactory.</p>
*
* @param server the {@link Server} this connector will accept connections for.
* @param sslContextFactory when non null a {@link SslConnectionFactory} prepended to the other ConnectionFactories
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("sslContextFactory") SslContextFactory sslContextFactory,
@Name("factories") ConnectionFactory... factories)
public UnixSocketConnector(@Name("server") Server server, @Name("sslContextFactory") SslContextFactory sslContextFactory)
{
this(server, -1, sslContextFactory);
}
/**
* <p>Constructs a UnixSocketConnector with the given selectors and SslContextFactory.</p>.
*
* @param server the {@link Server} this connector will accept connections for.
* @param sslContextFactory when non null a {@link SslConnectionFactory} prepended to the other ConnectionFactories
* @param selectors the number of selectors, or &lt;=0 for a default value.
*/
public UnixSocketConnector(@Name("server") Server server, @Name("selectors") int selectors, @Name("sslContextFactory") SslContextFactory sslContextFactory)
{
this(server, null, null, null, selectors, AbstractConnectionFactory.getFactories(sslContextFactory, new HttpConnectionFactory()));
}
/**
* <p>Constructs a UnixSocketConnector with the given SslContextFactory and ConnectionFactories.</p>.
*
* @param server the {@link Server} this connector will accept connections for.
* @param sslContextFactory when non null a {@link SslConnectionFactory} prepended to the other ConnectionFactories
* @param factories zero or more {@link ConnectionFactory} instances used to create and configure connections.
*/
public UnixSocketConnector(@Name("server") Server server, @Name("sslContextFactory") SslContextFactory sslContextFactory, @Name("factories") ConnectionFactory... factories)
{
this(server, null, null, null, -1, AbstractConnectionFactory.getFactories(sslContextFactory, factories));
}
/** Generic Server Connection.
* @param server
* The server this connector will be accept connection for.
* @param executor
* An executor used to run tasks for handling requests, acceptors and selectors.
* If null then use the servers executor
* @param scheduler
* A scheduler used to schedule timeouts. If null then use the servers scheduler
* @param bufferPool
* A ByteBuffer pool used to allocate buffers. If null then create a private pool with default configuration.
* @param selectors
* the number of selector threads, or &lt;=0 for a default value(1). Selectors notice and schedule established connection that can make IO progress.
* @param factories
* Zero or more {@link ConnectionFactory} instances used to create and configure connections.
/**
* <p>Constructs a UnixSocketConnector with the given parameters.</p>.
*
* @param server the {@link Server} this connector will accept connections for.
* @param executor the executor that runs tasks for handling requests, acceptors and selectors.
* @param scheduler the scheduler used to schedule timed tasks.
* @param bufferPool the ByteBufferPool used to allocate buffers.
* @param selectors the number of selectors, or &lt;=0 for a default value.
* @param factories zero or more {@link ConnectionFactory} instances used to create and configure connections.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("executor") Executor executor,
@Name("scheduler") Scheduler scheduler,
@Name("bufferPool") ByteBufferPool bufferPool,
@Name("selectors") int selectors,
@Name("factories") ConnectionFactory... factories)
public UnixSocketConnector(@Name("server") Server server, @Name("executor") Executor executor, @Name("scheduler") Scheduler scheduler, @Name("bufferPool") ByteBufferPool bufferPool, @Name("selectors") int selectors, @Name("factories") ConnectionFactory... factories)
{
super(server,executor,scheduler,bufferPool,0,factories);
_manager = newSelectorManager(getExecutor(), getScheduler(),
selectors>0?selectors:1);
super(server, executor, scheduler, bufferPool, 0, factories);
_manager = newSelectorManager(getExecutor(), getScheduler(), selectors > 0 ? selectors : 1);
addBean(_manager, true);
setAcceptorPriorityDelta(-2);
}
@ManagedAttribute
@ManagedAttribute("The UNIX socket file name")
public String getUnixSocket()
{
return _unixSocket;
}
public void setUnixSocket(String filename)
{
_unixSocket=filename;
if (filename.length() > MAX_UNIX_SOCKET_PATH_LENGTH)
throw new IllegalArgumentException("Unix socket path too long");
_unixSocket = filename;
}
protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors)
@ -219,11 +187,10 @@ public class UnixSocketConnector extends AbstractConnector
{
open();
super.doStart();
if (getAcceptors()==0)
if (getAcceptors() == 0)
_manager.acceptor(_acceptChannel);
}
@Override
protected void doStop() throws Exception
{
@ -234,10 +201,9 @@ public class UnixSocketConnector extends AbstractConnector
public boolean isOpen()
{
UnixServerSocketChannel channel = _acceptChannel;
return channel!=null && channel.isOpen();
return channel != null && channel.isOpen();
}
public void open() throws IOException
{
if (_acceptChannel == null)
@ -246,8 +212,7 @@ public class UnixSocketConnector extends AbstractConnector
file.deleteOnExit();
SocketAddress bindAddress = new UnixSocketAddress(file);
UnixServerSocketChannel serverChannel = UnixServerSocketChannel.open();
serverChannel.configureBlocking(getAcceptors()>0);
serverChannel.configureBlocking(getAcceptors() > 0);
try
{
serverChannel.socket().bind(bindAddress, getAcceptQueueSize());
@ -259,18 +224,11 @@ public class UnixSocketConnector extends AbstractConnector
}
addBean(serverChannel);
if (LOG.isDebugEnabled())
LOG.debug("opened {}",serverChannel);
LOG.debug("opened {}", serverChannel);
_acceptChannel = serverChannel;
}
}
@Override
public Future<Void> shutdown()
{
// shutdown all the connections
return super.shutdown();
}
public void close()
{
UnixServerSocketChannel serverChannel = _acceptChannel;
@ -297,7 +255,7 @@ public class UnixSocketConnector extends AbstractConnector
{
Files.deleteIfExists(Paths.get(_unixSocket));
}
catch ( IOException e )
catch (IOException e)
{
LOG.warn(e);
}
@ -311,16 +269,16 @@ public class UnixSocketConnector extends AbstractConnector
UnixServerSocketChannel serverChannel = _acceptChannel;
if (serverChannel != null && serverChannel.isOpen())
{
LOG.debug("accept {}",serverChannel);
LOG.debug("accept {}", serverChannel);
UnixSocketChannel channel = serverChannel.accept();
LOG.debug("accepted {}",channel);
LOG.debug("accepted {}", channel);
accepted(channel);
}
}
protected void accepted(UnixSocketChannel channel) throws IOException
{
channel.configureBlocking(false);
channel.configureBlocking(false);
_manager.accept(channel);
}
@ -335,12 +293,11 @@ public class UnixSocketConnector extends AbstractConnector
return _acceptChannel;
}
protected UnixSocketEndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException
protected UnixSocketEndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{
return new UnixSocketEndPoint((UnixSocketChannel)channel,selector,key,getScheduler());
return new UnixSocketEndPoint((UnixSocketChannel)channel, selector, key, getScheduler());
}
/**
* @return the accept queue size
*/
@ -362,6 +319,7 @@ public class UnixSocketConnector extends AbstractConnector
* @return whether the server socket reuses addresses
* @see ServerSocket#getReuseAddress()
*/
@ManagedAttribute("Whether the server socket reuses addresses")
public boolean getReuseAddress()
{
return _reuseAddress;
@ -376,15 +334,12 @@ public class UnixSocketConnector extends AbstractConnector
_reuseAddress = reuseAddress;
}
@Override
public String toString()
{
return String.format("%s{%s}",
super.toString(),
_unixSocket);
return String.format("%s{%s}", super.toString(), _unixSocket);
}
protected class UnixSocketConnectorManager extends SelectorManager
{
public UnixSocketConnectorManager(Executor executor, Scheduler scheduler, int selectors)
@ -403,17 +358,17 @@ public class UnixSocketConnector extends AbstractConnector
{
return NativeSelectorProvider.getInstance().openSelector();
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
UnixSocketEndPoint endp = UnixSocketConnector.this.newEndPoint(channel, selector, selectionKey);
endp.setIdleTimeout(getIdleTimeout());
return endp;
UnixSocketEndPoint endPoint = UnixSocketConnector.this.newEndPoint(channel, selector, selectionKey);
endPoint.setIdleTimeout(getIdleTimeout());
return endPoint;
}
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment)
{
return getDefaultConnectionFactory().newConnection(UnixSocketConnector.this, endpoint);
}
@ -448,10 +403,10 @@ public class UnixSocketConnector extends AbstractConnector
protected SelectableChannel doAccept(SelectableChannel server) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("doAccept async {}",server);
LOG.debug("doAccept async {}", server);
UnixSocketChannel channel = ((UnixServerSocketChannel)server).accept();
if (LOG.isDebugEnabled())
LOG.debug("accepted async {}",channel);
LOG.debug("accepted async {}", channel);
return channel;
}
}

View File

@ -20,31 +20,25 @@ package org.eclipse.jetty.unixsocket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import jnr.unixsocket.UnixSocketChannel;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import jnr.unixsocket.UnixSocketChannel;
public class UnixSocketEndPoint extends ChannelEndPoint
{
private static final Logger LOG = Log.getLogger(UnixSocketEndPoint.class);
private static final Logger CEPLOG = Log.getLogger(ChannelEndPoint.class);
private final UnixSocketChannel _channel;
public UnixSocketEndPoint(UnixSocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(channel,selector,key,scheduler);
_channel=channel;
super(channel, selector, key, scheduler);
_channel = channel;
}
@Override
@ -59,7 +53,6 @@ public class UnixSocketEndPoint extends ChannelEndPoint
return null;
}
@Override
protected void doShutdownOutput()
{

View File

@ -19,77 +19,67 @@
package org.eclipse.jetty.unixsocket.client;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import jnr.enxio.channels.NativeSelectorProvider;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.unixsocket.UnixSocketEndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import jnr.enxio.channels.NativeSelectorProvider;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;
public class HttpClientTransportOverUnixSockets
extends HttpClientTransportOverHTTP
public class HttpClientTransportOverUnixSockets extends HttpClientTransportOverHTTP
{
private static final Logger LOG = Log.getLogger( HttpClientTransportOverUnixSockets.class );
private static final Logger LOG = Log.getLogger(HttpClientTransportOverUnixSockets.class);
private String _unixSocket;
private SelectorManager selectorManager;
private UnixSocketChannel channel;
public HttpClientTransportOverUnixSockets( String unixSocket )
public HttpClientTransportOverUnixSockets(String unixSocket)
{
if ( unixSocket == null )
{
throw new IllegalArgumentException( "Unix socket file cannot be null" );
}
if (unixSocket == null)
throw new IllegalArgumentException("Unix socket file cannot be null");
this._unixSocket = unixSocket;
}
@Override
protected SelectorManager newSelectorManager(HttpClient client)
{
return selectorManager = new UnixSocketSelectorManager(client,getSelectors());
return new UnixSocketSelectorManager(client, getSelectors());
}
@Override
public void connect( InetSocketAddress address, Map<String, Object> context )
public void connect(InetSocketAddress address, Map<String, Object> context)
{
UnixSocketChannel channel = null;
try
{
InetAddress inet = address.getAddress();
if (!inet.isLoopbackAddress() && !inet.isLinkLocalAddress() && !inet.isSiteLocalAddress())
throw new IOException("UnixSocket cannot connect to "+address.getHostString());
// Open a unix socket
UnixSocketAddress unixAddress = new UnixSocketAddress( this._unixSocket );
channel = UnixSocketChannel.open( unixAddress );
throw new ConnectException("UnixSocket cannot connect to " + address.getHostString());
UnixSocketAddress unixAddress = new UnixSocketAddress(_unixSocket);
channel = UnixSocketChannel.open(unixAddress);
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
configure(client, channel);
channel.configureBlocking(false);
selectorManager.accept(channel, context);
getSelectorManager().accept(channel, context);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
@ -120,7 +110,7 @@ public class HttpClientTransportOverUnixSockets
{
protected UnixSocketSelectorManager(HttpClient client, int selectors)
{
super(client,selectors);
super(client, selectors);
}
@Override
@ -137,20 +127,4 @@ public class HttpClientTransportOverUnixSockets
return endp;
}
}
@Override
protected void doStop()
throws Exception
{
super.doStop();
try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
}
}

View File

@ -36,130 +36,119 @@ public class JnrTest
java.io.File path = new java.io.File("/tmp/fubar.sock");
path.deleteOnExit();
UnixSocketAddress address = new UnixSocketAddress(path);
UnixServerSocketChannel serverChannel = UnixServerSocketChannel.open();
Selector serverSelector = NativeSelectorProvider.getInstance().openSelector();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(address);
serverChannel.register(serverSelector, SelectionKey.OP_ACCEPT, "SERVER");
System.err.printf("serverChannel=%s,%n",serverChannel);
UnixSocketChannel client = UnixSocketChannel.open( address );
System.err.printf("serverChannel=%s,%n", serverChannel);
UnixSocketChannel client = UnixSocketChannel.open(address);
Selector clientSelector = NativeSelectorProvider.getInstance().openSelector();
client.configureBlocking(false);
SelectionKey clientKey = client.register(clientSelector,0,"client");
System.err.printf("client=%s connected=%b pending=%b%n",client,client.isConnected(),client.isConnectionPending());
SelectionKey clientKey = client.register(clientSelector, 0, "client");
System.err.printf("client=%s connected=%b pending=%b%n", client, client.isConnected(), client.isConnectionPending());
int selected = serverSelector.select();
System.err.printf("serverSelected=%d %s%n",selected,serverSelector.selectedKeys());
System.err.printf("serverSelected=%d %s%n", selected, serverSelector.selectedKeys());
SelectionKey key = serverSelector.selectedKeys().iterator().next();
serverSelector.selectedKeys().clear();
System.err.printf("key=%s/%s c=%b a=%b r=%b w=%b%n",key,key.attachment(),key.isConnectable(),key.isAcceptable(),key.isReadable(),key.isWritable());
System.err.printf("key=%s/%s c=%b a=%b r=%b w=%b%n", key, key.attachment(), key.isConnectable(), key.isAcceptable(), key.isReadable(), key.isWritable());
UnixSocketChannel server = serverChannel.accept();
server.configureBlocking(false);
SelectionKey serverKey = server.register(serverSelector, SelectionKey.OP_READ, "server");
System.err.printf("server=%s connected=%b pending=%b%n",server,server.isConnected(),server.isConnectionPending());
System.err.printf("server=%s key=%s connected=%b pending=%b%n", server, serverKey, server.isConnected(), server.isConnectionPending());
selected = serverSelector.selectNow();
System.err.printf("serverSelected=%d %s%n",selected,serverSelector.selectedKeys());
System.err.printf("serverSelected=%d %s%n", selected, serverSelector.selectedKeys());
ByteBuffer buffer = ByteBuffer.allocate(32768);
buffer.clear();
int read = server.read(buffer);
buffer.flip();
System.err.printf("server read=%d%n",read);
System.err.printf("server read=%d%n", read);
selected = clientSelector.selectNow();
System.err.printf("clientSelected=%d %s%n",selected,clientSelector.selectedKeys());
System.err.printf("clientSelected=%d %s%n", selected, clientSelector.selectedKeys());
int wrote = client.write(ByteBuffer.wrap("Hello".getBytes(StandardCharsets.ISO_8859_1)));
System.err.printf("client wrote=%d%n",wrote);
System.err.printf("client wrote=%d%n", wrote);
selected = serverSelector.selectNow();
System.err.printf("serverSelected=%d %s%n",selected,serverSelector.selectedKeys());
System.err.printf("serverSelected=%d %s%n", selected, serverSelector.selectedKeys());
key = serverSelector.selectedKeys().iterator().next();
serverSelector.selectedKeys().clear();
System.err.printf("key=%s/%s c=%b a=%b r=%b w=%b ch=%s%n",key,key.attachment(),key.isConnectable(),key.isAcceptable(),key.isReadable(),key.isWritable(),key.channel());
System.err.printf("key=%s/%s c=%b a=%b r=%b w=%b ch=%s%n", key, key.attachment(), key.isConnectable(), key.isAcceptable(), key.isReadable(), key.isWritable(), key.channel());
buffer.clear();
read = server.read(buffer);
buffer.flip();
System.err.printf("server read=%d '%s'%n",read,new String(buffer.array(),0,buffer.limit(),StandardCharsets.ISO_8859_1));
System.err.printf("server read=%d '%s'%n", read, new String(buffer.array(), 0, buffer.limit(), StandardCharsets.ISO_8859_1));
selected = clientSelector.selectNow();
System.err.printf("clientSelected=%d %s%n",selected,clientSelector.selectedKeys());
System.err.printf("clientSelected=%d %s%n", selected, clientSelector.selectedKeys());
wrote = server.write(ByteBuffer.wrap("Ciao!".getBytes(StandardCharsets.ISO_8859_1)));
System.err.printf("server wrote=%d%n",wrote);
System.err.printf("server wrote=%d%n", wrote);
selected = clientSelector.selectNow();
System.err.printf("clientSelected=%d %s%n",selected,clientSelector.selectedKeys());
System.err.printf("clientSelected=%d %s%n", selected, clientSelector.selectedKeys());
clientKey.interestOps(SelectionKey.OP_READ);
selected = clientSelector.selectNow();
System.err.printf("clientSelected=%d %s%n",selected,clientSelector.selectedKeys());
System.err.printf("clientSelected=%d %s%n", selected, clientSelector.selectedKeys());
key = clientSelector.selectedKeys().iterator().next();
clientSelector.selectedKeys().clear();
System.err.printf("key=%s/%s c=%b a=%b r=%b w=%b ch=%s%n",key,key.attachment(),key.isConnectable(),key.isAcceptable(),key.isReadable(),key.isWritable(),key.channel());
System.err.printf("key=%s/%s c=%b a=%b r=%b w=%b ch=%s%n", key, key.attachment(), key.isConnectable(), key.isAcceptable(), key.isReadable(), key.isWritable(), key.channel());
buffer.clear();
read = client.read(buffer);
buffer.flip();
System.err.printf("client read=%d '%s'%n",read,new String(buffer.array(),0,buffer.limit(),StandardCharsets.ISO_8859_1));
System.err.printf("client read=%d '%s'%n", read, new String(buffer.array(), 0, buffer.limit(), StandardCharsets.ISO_8859_1));
System.err.println("So far so good.... now it gets strange...");
// Let's write until flow control hit
int size = buffer.capacity();
Arrays.fill(buffer.array(),0,size,(byte)'X');
Arrays.fill(buffer.array(), 0, size, (byte)'X');
long written = 0;
while(true)
while (true)
{
buffer.position(0).limit(size);
wrote = server.write(buffer);
System.err.printf("server wrote %d/%d remaining=%d%n",wrote,size,buffer.remaining());
if (buffer.remaining()!=(size-wrote))
System.err.printf("server wrote %d/%d remaining=%d%n", wrote, size, buffer.remaining());
if (buffer.remaining() != (size - wrote))
System.err.printf("BUG!!!!!!!!!!!!!!!!%n");
if (wrote==0)
if (wrote == 0)
break;
written+=wrote;
written += wrote;
}
System.err.printf("server wrote %d before flow control%n",written);
System.err.printf("server wrote %d before flow control%n", written);
selected = clientSelector.selectNow();
System.err.printf("clientSelected=%d %s%n",selected,clientSelector.selectedKeys());
System.err.printf("clientSelected=%d %s%n", selected, clientSelector.selectedKeys());
key = clientSelector.selectedKeys().iterator().next();
clientSelector.selectedKeys().clear();
System.err.printf("key=%s/%s c=%b a=%b r=%b w=%b ch=%s%n",key,key.attachment(),key.isConnectable(),key.isAcceptable(),key.isReadable(),key.isWritable(),key.channel());
System.err.printf("key=%s/%s c=%b a=%b r=%b w=%b ch=%s%n", key, key.attachment(), key.isConnectable(), key.isAcceptable(), key.isReadable(), key.isWritable(), key.channel());
buffer.clear();
buffer.limit(32);
read = client.read(buffer);
buffer.flip();
System.err.printf("client read=%d '%s'%n",read,new String(buffer.array(),0,buffer.limit(),StandardCharsets.ISO_8859_1));
System.err.printf("client read=%d '%s'%n", read, new String(buffer.array(), 0, buffer.limit(), StandardCharsets.ISO_8859_1));
server.close();
client.close();
}
}

View File

@ -24,12 +24,10 @@ import java.io.PrintWriter;
import java.nio.CharBuffer;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import org.eclipse.jetty.toolchain.test.IO;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;
import org.eclipse.jetty.toolchain.test.IO;
public class UnixSocketClient
{
@ -37,7 +35,7 @@ public class UnixSocketClient
{
java.io.File path = new java.io.File("/tmp/jetty.sock");
java.io.File content = new java.io.File("/tmp/data.txt");
String method = "GET";
int content_length = 0;
String body = null;
@ -47,12 +45,12 @@ public class UnixSocketClient
body = IO.readToString(content);
content_length = body.length();
}
String data = method+" / HTTP/1.1\r\n"
+ "Host: unixsock\r\n"
+ "Content-Length: "+content_length+"\r\n"
+ "Connection: close\r\n"
+ "\r\n";
if (body!=null)
String data = method + " / HTTP/1.1\r\n"
+ "Host: unixsock\r\n"
+ "Content-Length: " + content_length + "\r\n"
+ "Connection: close\r\n"
+ "\r\n";
if (body != null)
data += body;
while (true)
@ -61,18 +59,18 @@ public class UnixSocketClient
UnixSocketChannel channel = UnixSocketChannel.open(address);
System.out.println("connected to " + channel.getRemoteSocketAddress());
PrintWriter w = new PrintWriter(new OutputStreamWriter(Channels.newOutputStream(channel),StandardCharsets.ISO_8859_1));
PrintWriter w = new PrintWriter(new OutputStreamWriter(Channels.newOutputStream(channel), StandardCharsets.ISO_8859_1));
InputStreamReader r = new InputStreamReader(Channels.newInputStream(channel));
w.print(data);
w.flush();
CharBuffer result = CharBuffer.allocate(4096);
String total="";
String total = "";
int l = 0;
while (l>=0)
while (l >= 0)
{
if (l>0)
if (l > 0)
{
result.flip();
total += result.toString();

View File

@ -25,7 +25,6 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Date;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -37,32 +36,31 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
public class UnixSocketProxyServer
{
public static void main (String... args) throws Exception
public static void main(String... args) throws Exception
{
Server server = new Server();
HttpConnectionFactory http = new HttpConnectionFactory();
ProxyConnectionFactory proxy = new ProxyConnectionFactory(http.getProtocol());
UnixSocketConnector connector = new UnixSocketConnector(server,proxy,http);
UnixSocketConnector connector = new UnixSocketConnector(server, proxy, http);
server.addConnector(connector);
Path socket = Paths.get(connector.getUnixSocket());
if (Files.exists(socket))
Files.delete(socket);
server.setHandler(new AbstractHandler.ErrorDispatchHandler()
{
@Override
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
int l = 0;
if (request.getContentLength()!=0)
if (request.getContentLength() != 0)
{
InputStream in = request.getInputStream();
byte[] buffer = new byte[4096];
int r = 0;
while (r>=0)
while (r >= 0)
{
l += r;
r = in.read(buffer);
@ -70,21 +68,21 @@ public class UnixSocketProxyServer
}
baseRequest.setHandled(true);
response.setStatus(200);
response.getWriter().write("Hello World "+new Date() + "\r\n");
response.getWriter().write("remote="+request.getRemoteAddr()+":"+request.getRemotePort()+"\r\n");
response.getWriter().write("local ="+request.getLocalAddr()+":"+request.getLocalPort()+"\r\n");
response.getWriter().write("read ="+l+"\r\n");
response.getWriter().write("Hello World " + new Date() + "\r\n");
response.getWriter().write("remote=" + request.getRemoteAddr() + ":" + request.getRemotePort() + "\r\n");
response.getWriter().write("local =" + request.getLocalAddr() + ":" + request.getLocalPort() + "\r\n");
response.getWriter().write("read =" + l + "\r\n");
}
});
server.start();
while (true)
{
Thread.sleep(5000);
connector.dumpStdErr();
}
// server.join();
}
}

View File

@ -25,43 +25,40 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Date;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
public class UnixSocketServer
{
public static void main (String... args) throws Exception
public static void main(String... args) throws Exception
{
Server server = new Server();
HttpConnectionFactory http = new HttpConnectionFactory();
UnixSocketConnector connector = new UnixSocketConnector(server,http);
UnixSocketConnector connector = new UnixSocketConnector(server, http);
server.addConnector(connector);
Path socket = Paths.get(connector.getUnixSocket());
if (Files.exists(socket))
Files.delete(socket);
server.setHandler(new AbstractHandler.ErrorDispatchHandler()
{
@Override
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
int l = 0;
if (request.getContentLength()!=0)
if (request.getContentLength() != 0)
{
InputStream in = request.getInputStream();
byte[] buffer = new byte[4096];
int r = 0;
while (r>=0)
while (r >= 0)
{
l += r;
r = in.read(buffer);
@ -69,15 +66,15 @@ public class UnixSocketServer
}
baseRequest.setHandled(true);
response.setStatus(200);
response.getWriter().write("Hello World "+new Date() + "\r\n");
response.getWriter().write("remote="+request.getRemoteAddr()+":"+request.getRemotePort()+"\r\n");
response.getWriter().write("local ="+request.getLocalAddr()+":"+request.getLocalPort()+"\r\n");
response.getWriter().write("read ="+l+"\r\n");
response.getWriter().write("Hello World " + new Date() + "\r\n");
response.getWriter().write("remote=" + request.getRemoteAddr() + ":" + request.getRemotePort() + "\r\n");
response.getWriter().write("local =" + request.getLocalAddr() + ":" + request.getLocalPort() + "\r\n");
response.getWriter().write("read =" + l + "\r\n");
}
});
server.start();
while (true)
{
Thread.sleep(5000);
@ -85,7 +82,7 @@ public class UnixSocketServer
System.err.println("==============================");
connector.dumpStdErr();
}
// server.join();
}
}

View File

@ -18,23 +18,15 @@
package org.eclipse.jetty.unixsocket;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.condition.OS.LINUX;
import static org.junit.jupiter.api.condition.OS.MAC;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -44,7 +36,6 @@ import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.FS;
import org.eclipse.jetty.unixsocket.client.HttpClientTransportOverUnixSockets;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
@ -54,6 +45,16 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.junit.jupiter.api.condition.OS.LINUX;
import static org.junit.jupiter.api.condition.OS.MAC;
@EnabledOnOs({LINUX, MAC})
public class UnixSocketTest
{
@ -68,94 +69,90 @@ public class UnixSocketTest
{
server = null;
httpClient = null;
String unixSocketTmp = System.getProperty( "unix.socket.tmp" );
if(StringUtil.isNotBlank( unixSocketTmp ) )
String unixSocketTmp = System.getProperty("unix.socket.tmp");
if (StringUtil.isNotBlank(unixSocketTmp))
sockFile = Files.createTempFile(Paths.get(unixSocketTmp), "unix", ".sock");
else
sockFile = Files.createTempFile("unix", ".sock");
if (sockFile.toAbsolutePath().toString().length() > UnixSocketConnector.MAX_UNIX_SOCKET_PATH_LENGTH)
{
sockFile = Files.createTempFile( Paths.get(unixSocketTmp), "unix", ".sock" );
} else {
sockFile = Files.createTempFile("unix", ".sock" );
Path tmp = Paths.get("/tmp");
assumeTrue(Files.exists(tmp) && Files.isDirectory(tmp));
sockFile = Files.createTempFile(tmp, "unix", ".sock");
}
assertTrue(Files.deleteIfExists(sockFile),"temp sock file cannot be deleted");
assertTrue(Files.deleteIfExists(sockFile), "temp sock file cannot be deleted");
}
@AfterEach
public void after() throws Exception
{
if (httpClient!=null)
if (httpClient != null)
httpClient.stop();
if (server!=null)
if (server != null)
server.stop();
// Force delete, this will fail if UnixSocket was not closed properly in the implementation
FS.delete( sockFile);
if (sockFile != null)
assertFalse(Files.exists(sockFile));
}
@Test
public void testUnixSocket() throws Exception
{
server = new Server();
HttpConnectionFactory http = new HttpConnectionFactory();
UnixSocketConnector connector = new UnixSocketConnector(server, http);
connector.setUnixSocket(sockFile.toString());
server.addConnector(connector);
UnixSocketConnector connector = new UnixSocketConnector( server, http );
connector.setUnixSocket( sockFile.toString() );
server.addConnector( connector );
server.setHandler( new AbstractHandler.ErrorDispatchHandler()
server.setHandler(new AbstractHandler.ErrorDispatchHandler()
{
@Override
protected void doNonErrorHandle( String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response )
throws IOException, ServletException
protected void doNonErrorHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
int l = 0;
if ( request.getContentLength() != 0 )
if (request.getContentLength() != 0)
{
InputStream in = request.getInputStream();
byte[] buffer = new byte[4096];
int r = 0;
while ( r >= 0 )
while (r >= 0)
{
l += r;
r = in.read( buffer );
r = in.read(buffer);
}
}
log.info( "UnixSocketTest: request received" );
baseRequest.setHandled( true );
response.setStatus( 200 );
response.getWriter().write( "Hello World " + new Date() + "\r\n" );
log.info("UnixSocketTest: request received");
baseRequest.setHandled(true);
response.setStatus(200);
response.getWriter().write("Hello World " + new Date() + "\r\n");
response.getWriter().write(
"remote=" + request.getRemoteAddr() + ":" + request.getRemotePort() + "\r\n" );
"remote=" + request.getRemoteAddr() + ":" + request.getRemotePort() + "\r\n");
response.getWriter().write(
"local =" + request.getLocalAddr() + ":" + request.getLocalPort() + "\r\n" );
response.getWriter().write( "read =" + l + "\r\n" );
"local =" + request.getLocalAddr() + ":" + request.getLocalPort() + "\r\n");
response.getWriter().write("read =" + l + "\r\n");
}
} );
});
server.start();
httpClient = new HttpClient( new HttpClientTransportOverUnixSockets( sockFile.toString() ), null );
httpClient = new HttpClient(new HttpClientTransportOverUnixSockets(sockFile.toString()), null);
httpClient.start();
ContentResponse contentResponse = httpClient
.newRequest( "http://localhost" )
.newRequest("http://localhost")
.send();
log.debug( "response from server: {}", contentResponse.getContentAsString() );
log.debug("response from server: {}", contentResponse.getContentAsString());
assertThat(contentResponse.getContentAsString(), containsString( "Hello World" ));
assertThat(contentResponse.getContentAsString(), containsString("Hello World"));
}
@Test
public void testNotLocal() throws Exception
{
httpClient = new HttpClient( new HttpClientTransportOverUnixSockets( sockFile.toString() ), null );
{
httpClient = new HttpClient(new HttpClientTransportOverUnixSockets(sockFile.toString()), null);
httpClient.start();
ExecutionException e = assertThrows(ExecutionException.class, ()->{
httpClient.newRequest( "http://google.com" ).send();
});
assertThat(e.getCause(), instanceOf(IOException.class));
assertThat(e.getCause().getMessage(),containsString("UnixSocket cannot connect to google.com"));
ExecutionException e = assertThrows(ExecutionException.class, () -> httpClient.newRequest("http://google.com").send());
assertThat(e.getCause(), instanceOf(ConnectException.class));
}
}

View File

@ -1051,7 +1051,7 @@
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-unixsocket</artifactId>
<version>0.20</version>
<version>0.22</version>
</dependency>
<dependency>
<groupId>junit</groupId>

View File

@ -18,20 +18,6 @@
package org.eclipse.jetty.http.client;
import static java.nio.ByteBuffer.wrap;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.eclipse.jetty.util.BufferUtil.toArray;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -91,11 +77,24 @@ import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static java.nio.ByteBuffer.wrap;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.eclipse.jetty.util.BufferUtil.toArray;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTransportScenario>
{
@Override
@ -395,12 +394,8 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@Tag("Unstable")
@Disabled // TODO fix this test! #2243
public void testAsyncWriteClosed(Transport transport) throws Exception
{
// TODO work out why this test fails for UNIX_SOCKET
Assumptions.assumeFalse(transport==Transport.UNIX_SOCKET);
init(transport);
String text = "Now is the winter of our discontent. How Now Brown Cow. The quick brown fox jumped over the lazy dog.\n";
@ -466,8 +461,6 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
@ArgumentsSource(TransportProvider.class)
public void testAsyncWriteLessThanContentLengthFlushed(Transport transport) throws Exception
{
// TODO work out why this test fails for UNIX_SOCKET
Assumptions.assumeFalse(transport==Transport.UNIX_SOCKET);
init(transport);
CountDownLatch complete = new CountDownLatch(1);
@ -1076,7 +1069,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
// the server passes the response to encrypt and write, SSLEngine
// only generates the close alert back, without encrypting the
// response, so we need to skip the transports over TLS.
Assumptions.assumeFalse(scenario.isTransportSecure());
Assumptions.assumeFalse(scenario.transport.isTlsBased());
String content = "jetty";
int responseCode = HttpStatus.NO_CONTENT_204;

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.http.client;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
@ -68,6 +65,9 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTransportScenario>
{
private final Logger logger = Log.getLogger(HttpClientLoadTest.class);
@ -178,7 +178,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
for (String failure : failures)
logger.info("FAILED: {}", failure);
assertTrue(failures.isEmpty(),failures.toString());
assertTrue(failures.isEmpty(), failures.toString());
}
private void test(final CountDownLatch latch, final List<String> failures)
@ -364,20 +364,18 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
}
@Override
public Connector newServerConnector( Server server)
public Connector newServerConnector(Server server)
{
if (transport == Transport.UNIX_SOCKET)
{
UnixSocketConnector
unixSocketConnector = new UnixSocketConnector( server, provideServerConnectionFactory( transport ));
unixSocketConnector.setUnixSocket( sockFile.toString() );
return unixSocketConnector;
}
int cores = ProcessorUtils.availableProcessors();
int selectors = Math.min(1, ProcessorUtils.availableProcessors() / 2);
ByteBufferPool byteBufferPool = new ArrayByteBufferPool();
byteBufferPool = new LeakTrackingByteBufferPool(byteBufferPool);
return new ServerConnector(server, null, null, byteBufferPool,
1, Math.min(1, cores / 2), provideServerConnectionFactory(transport));
if (transport == Transport.UNIX_SOCKET)
{
UnixSocketConnector unixSocketConnector = new UnixSocketConnector(server, null, null, byteBufferPool, selectors, provideServerConnectionFactory(transport));
unixSocketConnector.setUnixSocket(sockFile.toString());
return unixSocketConnector;
}
return new ServerConnector(server, null, null, byteBufferPool, 1, selectors, provideServerConnectionFactory(transport));
}
@Override
@ -416,7 +414,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
}
case UNIX_SOCKET:
{
HttpClientTransportOverUnixSockets clientTransport = new HttpClientTransportOverUnixSockets( sockFile.toString() );
HttpClientTransportOverUnixSockets clientTransport = new HttpClientTransportOverUnixSockets(sockFile.toString());
clientTransport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
{
@Override

View File

@ -338,7 +338,7 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
{
init(transport);
// Only run this test for transports over TLS.
Assumptions.assumeTrue(scenario.isTransportSecure());
Assumptions.assumeTrue(scenario.transport.isTlsBased());
scenario.startServer(new EmptyServerHandler());

View File

@ -18,10 +18,6 @@
package org.eclipse.jetty.http.client;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -43,6 +39,10 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario>
{
@Override
@ -98,7 +98,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
int expected = remotePorts.get(base);
int candidate = remotePorts.get(i);
assertThat(scenario.client.dump() + System.lineSeparator() + remotePorts.toString(), expected, Matchers.equalTo(candidate));
if (i > 0)
if (transport != Transport.UNIX_SOCKET && i > 0)
assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate)));
}
}
@ -109,7 +109,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
{
init(transport);
int multiplex = 1;
if (scenario.isHttp2Based())
if (scenario.transport.isHttp2Based())
multiplex = 4;
int maxMultiplex = multiplex;
@ -188,7 +188,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
int expected = remotePorts.get(base);
int candidate = remotePorts.get(i);
assertThat(scenario.client.dump() + System.lineSeparator() + remotePorts.toString(), expected, Matchers.equalTo(candidate));
if (i > 0)
if (transport != Transport.UNIX_SOCKET && i > 0)
assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate)));
}
}

View File

@ -18,18 +18,6 @@
package org.eclipse.jetty.http.client;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.eclipse.jetty.http.client.Transport.UNIX_SOCKET;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
@ -69,6 +57,17 @@ import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
{
@Override
@ -552,8 +551,6 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
public void testAsyncWriteIdleTimeoutFires(Transport transport) throws Exception
{
init(transport);
// TODO work out why this test fails for UNIX_SOCKET
Assumptions.assumeFalse(scenario.transport == UNIX_SOCKET);
CountDownLatch handlerLatch = new CountDownLatch(1);
scenario.start(new AbstractHandler.ErrorDispatchHandler()

View File

@ -20,10 +20,10 @@ package org.eclipse.jetty.http.client;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Set;
import java.util.stream.Stream;
import org.eclipse.jetty.util.StringUtil;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
@ -35,21 +35,16 @@ public class TransportProvider implements ArgumentsProvider
String transports = System.getProperty(Transport.class.getName());
if (!StringUtil.isBlank(transports))
{
return Arrays.stream(transports.split("\\s*,\\s*"))
.map(Transport::valueOf);
}
return Arrays.stream(transports.split("\\s*,\\s*")).map(Transport::valueOf);
// TODO #2014 too many test failures, don't test unix socket client for now.
// if (OS.IS_UNIX)
// return Transport.values();
if (OS.LINUX.isCurrentOs())
return Arrays.stream(Transport.values());
return EnumSet.complementOf(EnumSet.of(Transport.UNIX_SOCKET))
.stream();
return EnumSet.complementOf(EnumSet.of(Transport.UNIX_SOCKET)).stream();
}
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception
public Stream<? extends Arguments> provideArguments(ExtensionContext context)
{
return getActiveTransports().map(Arguments::of);
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@ -60,11 +61,14 @@ import org.eclipse.jetty.unixsocket.UnixSocketConnector;
import org.eclipse.jetty.unixsocket.client.HttpClientTransportOverUnixSockets;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
public class TransportScenario
{
private static final Logger LOG = Log.getLogger(TransportScenario.class);
@ -78,25 +82,35 @@ public class TransportScenario
protected String servletPath = "/servlet";
protected HttpClient client;
protected Path sockFile;
protected final BlockingQueue<String> requestLog= new BlockingArrayQueue<>();
protected final BlockingQueue<String> requestLog = new BlockingArrayQueue<>();
public TransportScenario(final Transport transport) throws IOException
{
this.transport = transport;
if(sockFile == null || !Files.exists( sockFile ))
Path unixSocketTmp;
String tmpProp = System.getProperty("unix.socket.tmp");
if (StringUtil.isBlank(tmpProp))
unixSocketTmp = MavenTestingUtils.getTargetPath();
else
unixSocketTmp = Paths.get(tmpProp);
sockFile = Files.createTempFile(unixSocketTmp, "unix", ".sock");
if (sockFile.toAbsolutePath().toString().length() > UnixSocketConnector.MAX_UNIX_SOCKET_PATH_LENGTH)
{
Path target = MavenTestingUtils.getTargetPath();
sockFile = Files.createTempFile(target,"unix", ".sock" );
Files.delete( sockFile );
Files.delete(sockFile);
Path tmp = Paths.get("/tmp");
assumeTrue(Files.exists(tmp) && Files.isDirectory(tmp));
sockFile = Files.createTempFile(tmp, "unix", ".sock");
}
Files.delete(sockFile);
}
public Optional<String> getNetworkConnectorLocalPort()
{
if (connector instanceof ServerConnector)
{
ServerConnector serverConnector = (ServerConnector) connector;
ServerConnector serverConnector = (ServerConnector)connector;
return Optional.of(Integer.toString(serverConnector.getLocalPort()));
}
@ -107,7 +121,7 @@ public class TransportScenario
{
if (connector instanceof ServerConnector)
{
ServerConnector serverConnector = (ServerConnector) connector;
ServerConnector serverConnector = (ServerConnector)connector;
return Optional.of(serverConnector.getLocalPort());
}
@ -116,7 +130,7 @@ public class TransportScenario
public String getScheme()
{
return isTransportSecure() ? "https" : "http";
return transport.isTlsBased() ? "https" : "http";
}
@Deprecated
@ -149,12 +163,12 @@ public class TransportScenario
return new HttpClient(transport, sslContextFactory);
}
public Connector newServerConnector(Server server) throws Exception
public Connector newServerConnector(Server server)
{
if (transport == Transport.UNIX_SOCKET)
{
UnixSocketConnector unixSocketConnector = new UnixSocketConnector(server, provideServerConnectionFactory( transport ));
unixSocketConnector.setUnixSocket( sockFile.toString() );
UnixSocketConnector unixSocketConnector = new UnixSocketConnector(server, provideServerConnectionFactory(transport));
unixSocketConnector.setUnixSocket(sockFile.toString());
return unixSocketConnector;
}
return new ServerConnector(server, provideServerConnectionFactory(transport));
@ -166,10 +180,7 @@ public class TransportScenario
ret.append(getScheme());
ret.append("://localhost");
Optional<String> localPort = getNetworkConnectorLocalPort();
if (localPort.isPresent())
{
ret.append(':').append(localPort.get());
}
localPort.ifPresent(s -> ret.append(':').append(s));
return ret.toString();
}
@ -199,7 +210,7 @@ public class TransportScenario
}
case UNIX_SOCKET:
{
return new HttpClientTransportOverUnixSockets( sockFile.toString() );
return new HttpClientTransportOverUnixSockets(sockFile.toString());
}
default:
{
@ -254,13 +265,13 @@ public class TransportScenario
throw new IllegalArgumentException();
}
}
return result.toArray(new ConnectionFactory[result.size()]);
return result.toArray(new ConnectionFactory[0]);
}
public void setConnectionIdleTimeout(long idleTimeout)
{
if (connector instanceof AbstractConnector)
AbstractConnector.class.cast(connector).setIdleTimeout(idleTimeout);
((AbstractConnector)connector).setIdleTimeout(idleTimeout);
}
public void setServerIdleTimeout(long idleTimeout)
@ -271,9 +282,10 @@ public class TransportScenario
else
setConnectionIdleTimeout(idleTimeout);
}
public void start(Handler handler) throws Exception
{
start(handler,null);
start(handler, null);
}
public void start(Handler handler, Consumer<HttpClient> config) throws Exception
@ -303,7 +315,7 @@ public class TransportScenario
client.setExecutor(clientThreads);
client.setSocketAddressResolver(new SocketAddressResolver.Sync());
if (config!=null)
if (config != null)
config.accept(client);
client.start();
@ -336,7 +348,7 @@ public class TransportScenario
server.setRequestLog((request, response) ->
{
int status = response.getCommittedMetaData().getStatus();
requestLog.offer(String.format("%s %s %s %03d",request.getMethod(),request.getRequestURI(),request.getProtocol(),status));
requestLog.offer(String.format("%s %s %s %03d", request.getMethod(), request.getRequestURI(), request.getProtocol(), status));
});
server.setHandler(handler);
@ -345,7 +357,7 @@ public class TransportScenario
{
server.start();
}
catch ( Exception e )
catch (Exception e)
{
e.printStackTrace();
}
@ -394,25 +406,25 @@ public class TransportScenario
{
stopClient();
}
catch (Exception ignore)
catch (Exception x)
{
LOG.ignore(ignore);
LOG.ignore(x);
}
try
{
stopServer();
}
catch (Exception ignore)
catch (Exception x)
{
LOG.ignore(ignore);
LOG.ignore(x);
}
if (sockFile!=null)
if (sockFile != null)
{
try
{
Files.deleteIfExists( sockFile );
Files.deleteIfExists(sockFile);
}
catch (IOException e)
{
@ -420,6 +432,4 @@ public class TransportScenario
}
}
}
}