Fixes #6372 - Review socket options configuration (#6610)

* Fixes #6372 - Review socket options configuration

Introduced in ClientConnector:

* tcpNoDelay
* reusePort
* receiveBufferSize
* sendBufferSize

Reworked configuration of socket options in ClientConnector.
JMX-ified ClientConnector.

Introduced reusePort in ServerConnector.
Updated server modules with the new reusePort property.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-08-13 17:39:52 +02:00 committed by GitHub
parent 2b0161e743
commit dbc0ce7c13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 307 additions and 21 deletions

View File

@ -923,8 +923,10 @@ public class HttpClient extends ContainerLifeCycle
/**
* @return whether TCP_NODELAY is enabled
* @deprecated use {@link ClientConnector#isTCPNoDelay()} instead
*/
@ManagedAttribute(value = "Whether the TCP_NODELAY option is enabled", name = "tcpNoDelay")
@Deprecated
public boolean isTCPNoDelay()
{
return tcpNoDelay;
@ -933,7 +935,9 @@ public class HttpClient extends ContainerLifeCycle
/**
* @param tcpNoDelay whether TCP_NODELAY is enabled
* @see java.net.Socket#setTcpNoDelay(boolean)
* @deprecated use {@link ClientConnector#setTCPNoDelay(boolean)} instead
*/
@Deprecated
public void setTCPNoDelay(boolean tcpNoDelay)
{
this.tcpNoDelay = tcpNoDelay;

View File

@ -73,6 +73,7 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
@ -1909,6 +1910,45 @@ public class HttpClientTest extends AbstractHttpClientServerTest
assertTrue(serverOnErrorLatch.await(5, TimeUnit.SECONDS), "serverOnErrorLatch didn't finish");
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testBindAddress(Scenario scenario) throws Exception
{
String bindAddress = "127.0.0.2";
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
assertEquals(bindAddress, request.getRemoteAddr());
}
});
client.setBindAddress(new InetSocketAddress(bindAddress, 0));
CountDownLatch latch = new CountDownLatch(1);
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/1")
.onRequestBegin(r ->
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/2")
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
latch.countDown();
});
})
.timeout(5, TimeUnit.SECONDS)
.send();
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(HttpStatus.OK_200, response.getStatus());
}
private void assertCopyRequest(Request original)
{
Request copy = client.copyRequest((HttpRequest)original, original.getURI());

View File

@ -93,6 +93,8 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
client.setInputBufferSize(httpClient.getResponseBufferSize());
client.setUseInputDirectByteBuffers(httpClient.isUseInputDirectByteBuffers());
client.setUseOutputDirectByteBuffers(httpClient.isUseOutputDirectByteBuffers());
client.setConnectBlocking(httpClient.isConnectBlocking());
client.setBindAddress(httpClient.getBindAddress());
}
addBean(client);
super.doStart();

View File

@ -18,6 +18,7 @@ import java.net.InetSocketAddress;
import java.net.ProtocolFamily;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketOption;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.nio.channels.SelectableChannel;
@ -33,6 +34,8 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.JavaVersion;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
@ -41,6 +44,32 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>The client-side component that connects to server sockets.</p>
* <p>ClientConnector delegates the handling of {@link SocketChannel}s
* to a {@link SelectorManager}, and centralizes the configuration of
* necessary components such as the executor, the scheduler, etc.</p>
* <p>ClientConnector offers a low-level API that can be used to
* connect {@link SocketChannel}s to listening servers via the
* {@link #connect(SocketAddress, Map)} method.</p>
* <p>However, a ClientConnector instance is typically just configured
* and then passed to an HttpClient transport, so that applications
* can use high-level APIs to make HTTP requests to servers:</p>
* <pre>
* // Create a ClientConnector instance.
* ClientConnector connector = new ClientConnector();
*
* // Configure the ClientConnector.
* connector.setSelectors(1);
* connector.setSslContextFactory(new SslContextFactory.Client());
*
* // Pass it to the HttpClient transport.
* HttpClientTransport transport = new HttpClientTransportDynamic(connector);
* HttpClient httpClient = new HttpClient(transport);
* httpClient.start();
* </pre>
*/
@ManagedObject
public class ClientConnector extends ContainerLifeCycle
{
public static final String CLIENT_CONNECTOR_CONTEXT_KEY = "org.eclipse.jetty.client.connector";
@ -49,6 +78,12 @@ public class ClientConnector extends ContainerLifeCycle
public static final String CONNECTION_PROMISE_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".connectionPromise";
private static final Logger LOG = LoggerFactory.getLogger(ClientConnector.class);
/**
* <p>Creates a ClientConnector configured to connect via Unix-Domain sockets to the given Unix-Domain path</p>
*
* @param path the Unix-Domain path to connect to
* @return a ClientConnector that connects to the given Unix-Domain path
*/
public static ClientConnector forUnixDomain(Path path)
{
return new ClientConnector(SocketChannelWithAddress.Factory.forUnixDomain(path));
@ -65,7 +100,11 @@ public class ClientConnector extends ContainerLifeCycle
private Duration connectTimeout = Duration.ofSeconds(5);
private Duration idleTimeout = Duration.ofSeconds(30);
private SocketAddress bindAddress;
private boolean tcpNoDelay = true;
private boolean reuseAddress = true;
private boolean reusePort;
private int receiveBufferSize = -1;
private int sendBufferSize = -1;
public ClientConnector()
{
@ -129,6 +168,10 @@ public class ClientConnector extends ContainerLifeCycle
this.sslContextFactory = sslContextFactory;
}
/**
* @return the number of NIO selectors
*/
@ManagedAttribute("The number of NIO selectors")
public int getSelectors()
{
return selectors;
@ -141,6 +184,10 @@ public class ClientConnector extends ContainerLifeCycle
this.selectors = selectors;
}
/**
* @return whether {@link #connect(SocketAddress, Map)} operations are performed in blocking mode
*/
@ManagedAttribute("Whether connect operations are performed in blocking mode")
public boolean isConnectBlocking()
{
return connectBlocking;
@ -151,6 +198,10 @@ public class ClientConnector extends ContainerLifeCycle
this.connectBlocking = connectBlocking;
}
/**
* @return the timeout of {@link #connect(SocketAddress, Map)} operations
*/
@ManagedAttribute("The timeout of connect operations")
public Duration getConnectTimeout()
{
return connectTimeout;
@ -163,6 +214,10 @@ public class ClientConnector extends ContainerLifeCycle
selectorManager.setConnectTimeout(connectTimeout.toMillis());
}
/**
* @return the max duration for which a connection can be idle (that is, without traffic of bytes in either direction)
*/
@ManagedAttribute("The duration for which a connection can be idle")
public Duration getIdleTimeout()
{
return idleTimeout;
@ -173,26 +228,120 @@ public class ClientConnector extends ContainerLifeCycle
this.idleTimeout = idleTimeout;
}
/**
* @return the address to bind a socket to before the connect operation
*/
@ManagedAttribute("The socket address to bind sockets to before the connect operation")
public SocketAddress getBindAddress()
{
return bindAddress;
}
/**
* <p>Sets the bind address of sockets before the connect operation.</p>
* <p>In multi-homed hosts, you may want to connect from a specific address:</p>
* <pre>
* clientConnector.setBindAddress(new InetSocketAddress("127.0.0.2", 0));
* </pre>
* <p>Note the use of the port {@code 0} to indicate that a different ephemeral port
* should be used for each different connection.</p>
* <p>In the rare cases where you want to use the same port for all connections,
* you must also call {@link #setReusePort(boolean) setReusePort(true)}.</p>
*
* @param bindAddress the socket address to bind to before the connect operation
*/
public void setBindAddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
}
/**
* @return whether small TCP packets are sent without delay
*/
@ManagedAttribute("Whether small TCP packets are sent without delay")
public boolean isTCPNoDelay()
{
return tcpNoDelay;
}
public void setTCPNoDelay(boolean tcpNoDelay)
{
this.tcpNoDelay = tcpNoDelay;
}
/**
* @return whether rebinding is allowed with sockets in tear-down states
*/
@ManagedAttribute("Whether rebinding is allowed with sockets in tear-down states")
public boolean getReuseAddress()
{
return reuseAddress;
}
/**
* <p>Sets whether it is allowed to bind a socket to a socket address
* that may be in use by another socket in tear-down state, for example
* in TIME_WAIT state.</p>
* <p>This is useful when ClientConnector is restarted: an existing connection
* may still be using a network address (same host and same port) that is also
* chosen for a new connection.</p>
*
* @param reuseAddress whether rebinding is allowed with sockets in tear-down states
* @see #setReusePort(boolean)
*/
public void setReuseAddress(boolean reuseAddress)
{
this.reuseAddress = reuseAddress;
}
/**
* @return whether binding to same host and port is allowed
*/
@ManagedAttribute("Whether binding to same host and port is allowed")
public boolean isReusePort()
{
return reusePort;
}
/**
* <p>Sets whether it is allowed to bind multiple sockets to the same
* socket address (same host and same port).</p>
*
* @param reusePort whether binding to same host and port is allowed
*/
public void setReusePort(boolean reusePort)
{
this.reusePort = reusePort;
}
/**
* @return the receive buffer size in bytes, or -1 for the default value
*/
@ManagedAttribute("The receive buffer size in bytes")
public int getReceiveBufferSize()
{
return receiveBufferSize;
}
public void setReceiveBufferSize(int receiveBufferSize)
{
this.receiveBufferSize = receiveBufferSize;
}
/**
* @return the send buffer size in bytes, or -1 for the default value
*/
@ManagedAttribute("The send buffer size in bytes")
public int getSendBufferSize()
{
return sendBufferSize;
}
public void setSendBufferSize(int sendBufferSize)
{
this.sendBufferSize = sendBufferSize;
}
@Override
protected void doStart() throws Exception
{
@ -246,10 +395,12 @@ public class ClientConnector extends ContainerLifeCycle
SocketChannelWithAddress channelWithAddress = factory.newSocketChannelWithAddress(address, context);
channel = channelWithAddress.getSocketChannel();
address = channelWithAddress.getSocketAddress();
configure(channel);
SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
bind(channel, bindAddress);
configure(channel);
boolean connected = true;
boolean blocking = isConnectBlocking() && address instanceof InetSocketAddress;
@ -306,33 +457,36 @@ public class ClientConnector extends ContainerLifeCycle
}
}
private void bind(SocketChannel channel, SocketAddress bindAddress)
private void bind(SocketChannel channel, SocketAddress bindAddress) throws IOException
{
try
{
boolean reuseAddress = getReuseAddress();
if (LOG.isDebugEnabled())
LOG.debug("Binding to {} reusing address {}", bindAddress, reuseAddress);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
channel.bind(bindAddress);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not bind {}", channel);
}
if (LOG.isDebugEnabled())
LOG.debug("Binding {} to {}", channel, bindAddress);
channel.bind(bindAddress);
}
protected void configure(SocketChannel channel) throws IOException
{
setSocketOption(channel, StandardSocketOptions.TCP_NODELAY, isTCPNoDelay());
setSocketOption(channel, StandardSocketOptions.SO_REUSEADDR, getReuseAddress());
setSocketOption(channel, StandardSocketOptions.SO_REUSEPORT, isReusePort());
int receiveBufferSize = getReceiveBufferSize();
if (receiveBufferSize >= 0)
setSocketOption(channel, StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
int sendBufferSize = getSendBufferSize();
if (sendBufferSize >= 0)
setSocketOption(channel, StandardSocketOptions.SO_SNDBUF, sendBufferSize);
}
private <T> void setSocketOption(SocketChannel channel, SocketOption<T> option, T value)
{
try
{
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(option, value);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not configure {}", channel);
LOG.debug("Could not configure {} to {} on {}", option, value, channel);
}
}

View File

@ -39,6 +39,7 @@
<Set name="acceptorPriorityDelta" property="jetty.http.acceptorPriorityDelta" />
<Set name="acceptQueueSize" property="jetty.http.acceptQueueSize" />
<Set name="reuseAddress"><Property name="jetty.http.reuseAddress" default="true"/></Set>
<Set name="reusePort"><Property name="jetty.http.reusePort" default="false"/></Set>
<Set name="acceptedTcpNoDelay"><Property name="jetty.http.acceptedTcpNoDelay" default="true"/></Set>
<Set name="acceptedReceiveBufferSize" property="jetty.http.acceptedReceiveBufferSize" />
<Set name="acceptedSendBufferSize" property="jetty.http.acceptedSendBufferSize" />

View File

@ -32,6 +32,7 @@
<Set name="acceptorPriorityDelta" property="jetty.ssl.acceptorPriorityDelta"/>
<Set name="acceptQueueSize" property="jetty.ssl.acceptQueueSize"/>
<Set name="reuseAddress"><Property name="jetty.ssl.reuseAddress" default="true"/></Set>
<Set name="reusePort"><Property name="jetty.ssl.reusePort" default="false"/></Set>
<Set name="acceptedTcpNoDelay"><Property name="jetty.ssl.acceptedTcpNoDelay" default="true"/></Set>
<Set name="acceptedReceiveBufferSize" property="jetty.ssl.acceptedReceiveBufferSize" />
<Set name="acceptedSendBufferSize" property="jetty.ssl.acceptedSendBufferSize" />

View File

@ -40,6 +40,9 @@ etc/jetty-http.xml
## Whether to enable the SO_REUSEADDR socket option.
# jetty.http.reuseAddress=true
## Whether to enable the SO_REUSEPORT socket option.
# jetty.http.reusePort=false
## Whether to enable the TCP_NODELAY socket option on accepted sockets.
# jetty.http.acceptedTcpNoDelay=true

View File

@ -42,6 +42,9 @@ etc/jetty-ssl-context.xml
## Whether to enable the SO_REUSEADDR socket option.
# jetty.ssl.reuseAddress=true
## Whether to enable the SO_REUSEPORT socket option.
# jetty.ssl.reusePort=false
## Whether to enable the TCP_NODELAY socket option on accepted sockets.
# jetty.ssl.acceptedTcpNoDelay=true

View File

@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.StandardSocketOptions;
import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
@ -77,6 +78,7 @@ public class ServerConnector extends AbstractNetworkConnector
private volatile int _localPort = -1;
private volatile int _acceptQueueSize = 0;
private volatile boolean _reuseAddress = true;
private volatile boolean _reusePort = false;
private volatile boolean _acceptedTcpNoDelay = true;
private volatile int _acceptedReceiveBufferSize = -1;
private volatile int _acceptedSendBufferSize = -1;
@ -332,8 +334,9 @@ public class ServerConnector extends AbstractNetworkConnector
serverChannel = ServerSocketChannel.open();
try
{
serverChannel.socket().setReuseAddress(getReuseAddress());
serverChannel.socket().bind(bindAddress, getAcceptQueueSize());
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, getReuseAddress());
serverChannel.setOption(StandardSocketOptions.SO_REUSEPORT, isReusePort());
serverChannel.bind(bindAddress, getAcceptQueueSize());
}
catch (Throwable e)
{
@ -450,7 +453,7 @@ public class ServerConnector extends AbstractNetworkConnector
}
/**
* @return whether the server socket reuses addresses
* @return whether rebinding the server socket is allowed with sockets in tear-down states
* @see ServerSocket#getReuseAddress()
*/
@ManagedAttribute("Server Socket SO_REUSEADDR")
@ -460,7 +463,7 @@ public class ServerConnector extends AbstractNetworkConnector
}
/**
* @param reuseAddress whether the server socket reuses addresses
* @param reuseAddress whether rebinding the server socket is allowed with sockets in tear-down states
* @see ServerSocket#setReuseAddress(boolean)
*/
public void setReuseAddress(boolean reuseAddress)
@ -468,6 +471,23 @@ public class ServerConnector extends AbstractNetworkConnector
_reuseAddress = reuseAddress;
}
/**
* @return whether it is allowed to bind multiple server sockets to the same host and port
*/
@ManagedAttribute("Server Socket SO_REUSEPORT")
public boolean isReusePort()
{
return _reusePort;
}
/**
* @param reusePort whether it is allowed to bind multiple server sockets to the same host and port
*/
public void setReusePort(boolean reusePort)
{
_reusePort = reusePort;
}
/**
* @return whether the accepted socket gets {@link java.net.SocketOptions#TCP_NODELAY TCP_NODELAY} enabled.
* @see Socket#getTcpNoDelay()

View File

@ -25,6 +25,7 @@ import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
@ -32,6 +33,9 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.logging.StacklessLogging;
@ -50,6 +54,7 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -208,6 +213,59 @@ public class ServerConnectorTest
}
}
@Test
public void testReusePort() throws Exception
{
int port;
try (ServerSocket server = new ServerSocket())
{
server.setReuseAddress(true);
server.bind(new InetSocketAddress("localhost", 0));
port = server.getLocalPort();
}
Server server = new Server();
try
{
// Two connectors listening on the same port.
ServerConnector connector1 = new ServerConnector(server, 1, 1);
connector1.setReuseAddress(true);
connector1.setReusePort(true);
connector1.setPort(port);
server.addConnector(connector1);
ServerConnector connector2 = new ServerConnector(server, 1, 1);
connector2.setReuseAddress(true);
connector2.setReusePort(true);
connector2.setPort(port);
server.addConnector(connector2);
server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
jettyRequest.setHandled(true);
}
});
server.start();
try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", port)))
{
HttpTester.Request request = HttpTester.newRequest();
request.put(HttpHeader.HOST, "localhost");
client.write(request.generate());
HttpTester.Response response = HttpTester.parseResponse(HttpTester.from(client));
assertNotNull(response);
assertEquals(HttpStatus.OK_200, response.getStatus());
}
}
finally
{
server.stop();
}
}
@Test
public void testAddFirstConnectionFactory()
{