Merged branch 'jetty-12.0.x' into 'jetty-12.1.x'.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2024-09-27 18:13:33 +02:00
commit ce5518e257
No known key found for this signature in database
GPG Key ID: 1677D141BCF3584D
8 changed files with 323 additions and 149 deletions

View File

@ -35,7 +35,6 @@ import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.ee10.servlet.DefaultServlet;
import org.eclipse.jetty.ee10.servlet.ResourceServlet;
import org.eclipse.jetty.ee10.servlet.ResourceServlet;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.ee10.webapp.WebAppContext;
@ -61,6 +60,7 @@ import org.eclipse.jetty.rewrite.handler.RedirectRegexRule;
import org.eclipse.jetty.rewrite.handler.RewriteHandler;
import org.eclipse.jetty.rewrite.handler.RewriteRegexRule;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.CustomRequestLog;
import org.eclipse.jetty.server.FormFields;
@ -353,6 +353,28 @@ public class HTTPServerDocs
// end::sameRandomPort[]
}
public void connectionLimit()
{
// tag::connectionLimit[]
Server server = new Server();
// Limit connections to the server, across all connectors.
ConnectionLimit serverConnectionLimit = new ConnectionLimit(1024, server);
server.addBean(serverConnectionLimit);
ServerConnector connector1 = new ServerConnector(server);
connector1.setPort(8080);
server.addConnector(connector1);
ServerConnector connector2 = new ServerConnector(server);
connector2.setPort(9090);
server.addConnector(connector2);
// Limit connections for this connector only.
ConnectionLimit connectorConnectionLimit = new ConnectionLimit(64, connector2);
connector2.addBean(connectorConnectionLimit);
// end::connectionLimit[]
}
public void sslHandshakeListener() throws Exception
{
// tag::sslHandshakeListener[]

View File

@ -54,6 +54,22 @@ This property allows you to cap the max heap memory retained by the pool.
`jetty.byteBufferPool.maxDirectMemory`::
This property allows you to cap the max direct memory retained by the pool.
[[connectionlimit]]
== Module `connectionlimit`
The `connectionlimit` module limits the number of connections accepted by the server, across all connectors.
Once the configured maximum number of connections is reached, Jetty will not accept more connections.
Existing, established connections will work normally.
When existing connections are closed, accepting new connections will be resumed.
NOTE: The number of connections seen at the JVM level may be different from the number of connections seen at the OS level.
For more information, refer to xref:programming-guide:server/http.adoc#connector-limiting[this section].
The module file is `$JETTY_HOME/modules/connectionlimit.mod`:
include::{jetty-home}/modules/connectionlimit.mod[tags=documentation]
[[console-capture]]
== Module `console-capture`

View File

@ -394,6 +394,27 @@ For example:
include::code:example$src/main/java/org/eclipse/jetty/docs/programming/server/http/HTTPServerDocs.java[tags=sameRandomPort]
----
[[connector-limiting]]
=== Limiting Connections
It is possible to limit the number of connections accepted by the whole server (and therefore across all connectors), or by a specific connector.
This feature is implemented by class `org.eclipse.jetty.server.ConnectionLimit` and you can use it in this way:
[,java,indent=0,options=nowrap]
----
include::code:example$src/main/java/org/eclipse/jetty/docs/programming/server/http/HTTPServerDocs.java[tags=connectionLimit]
----
[NOTE]
====
When the maximum number of connections is reached, no more connections will be accepted _at the JVM level_ -- but they could be accepted at the OS level.
This means that if you are using OS tools (like Linux's `ss`) to count the number of established connections, you may find a number that may be greater than the maximum number of connections configured in a `ConnectionLimit`.
Note also that different operative systems may behave differently when Jetty is not accepting connections: some OS accepts connections at the TCP level anyway (but does not notify this event to the JVM), some other OS may not accept connections at the TCP level.
====
[[connector-protocol]]
=== Configuring Protocols

View File

@ -18,6 +18,7 @@ import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
@ -864,14 +865,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
_selectorManager.onAccepting(channel);
}
@Override
public void close()
{
if (LOG.isDebugEnabled())
LOG.debug("closed accept of {}", channel);
IO.close(channel);
}
@Override
public void update(Selector selector)
{
@ -882,10 +875,9 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
}
catch (Throwable x)
{
IO.close(channel);
_selectorManager.onAcceptFailed(channel, x);
if (LOG.isDebugEnabled())
LOG.debug("Could not register channel after accept {}", channel, x);
failed(x);
}
}
@ -894,22 +886,28 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
try
{
createEndPoint(channel, key);
_selectorManager.onAccepted(channel);
createEndPoint(channel, key);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not process accepted channel {}", channel, x);
failed(x);
}
}
protected void failed(Throwable failure)
@Override
public void close()
{
if (LOG.isDebugEnabled())
LOG.debug("Closed accept of {}", channel);
failed(new ClosedChannelException());
}
private void failed(Throwable failure)
{
IO.close(channel);
if (LOG.isDebugEnabled())
LOG.warn("Could not accept {}", channel, failure);
else
LOG.warn("Could not accept {}: {}", channel, String.valueOf(failure));
_selectorManager.onAcceptFailed(channel, failure);
}
@ -1028,6 +1026,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
IO.close((Closeable)attachment);
}
_selector = null;
if (LOG.isDebugEnabled())
LOG.debug("Closing {} on {}", selector, ManagedSelector.this);
IO.close(selector);
}
finally

View File

@ -1,7 +1,7 @@
# DO NOT EDIT THIS FILE - See: https://jetty.org/docs/
[description]
Enables a server-wide connection limit.
Enables a server-wide limit on TCP connections.
[tags]
connector
@ -13,9 +13,10 @@ server
etc/jetty-connectionlimit.xml
[ini-template]
## The limit of connections to apply
#tag::documentation[]
## The maximum number of TCP connections allowed across all connectors.
#jetty.connectionlimit.maxConnections=1000
## The idle timeout to apply (in milliseconds) when connections are limited
## The idle timeout to apply (in milliseconds) to existing connections when the connection limit is reached.
#jetty.connectionlimit.idleTimeout=1000
#end::documentation[]

View File

@ -15,14 +15,13 @@ package org.eclipse.jetty.server;
import java.nio.channels.SelectableChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.Connection.Listener;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
@ -36,18 +35,23 @@ import org.slf4j.LoggerFactory;
* <p>A Listener that limits the number of Connections.</p>
* <p>This listener applies a limit to the number of connections, which when
* exceeded results in a call to {@link AbstractConnector#setAccepting(boolean)}
* to prevent further connections being received. It can be applied to an
* entire server or to a specific connector by adding it via {@link Container#addBean(Object)}
* to prevent further connections being received.
* This listener can be applied to an entire {@link Server} or to a specific
* {@link Connector} by adding it via {@link Container#addBean(Object)}.
* </p>
* <p>When the number of connections is exceeded, the idle timeout of existing
* connections is changed with the value configured in this listener (typically
* a shorter value).</p>
* <p>
* <b>Usage:</b>
* </p>
* <pre>
* <pre>{@code
* Server server = new Server();
* server.addBean(new ConnectionLimit(5000,server));
* ...
* server.start();
* </pre>
* }</pre>
*
*
* @see LowResourceMonitor
* @see Connection.Listener
@ -61,7 +65,7 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener, Sele
private final AutoLock _lock = new AutoLock();
private final Server _server;
private final List<AbstractConnector> _connectors = new ArrayList<>();
private final Set<SelectableChannel> _accepting = new HashSet<>();
private int _accepting;
private int _connections;
private int _maxConnections;
private long _idleTimeout;
@ -81,12 +85,12 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener, Sele
if (c instanceof AbstractConnector)
_connectors.add((AbstractConnector)c);
else
LOG.warn("Connector {} is not an AbstractConnection. Connections not limited", c);
LOG.warn("Connector {} is not an AbstractConnector: connections will not be limited", c);
}
}
/**
* @return If &gt;= 0, the endpoint idle timeout in ms to apply when the connection limit is reached
* @return the endpoint idle timeout in ms to apply when the connection limit is reached
*/
@ManagedAttribute("The endpoint idle timeout in ms to apply when the connection limit is reached")
public long getIdleTimeout()
@ -95,7 +99,10 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener, Sele
}
/**
* @param idleTimeout If &gt;= 0 the endpoint idle timeout in ms to apply when the connection limit is reached
* <p>Sets the endpoint idle timeout in ms to apply when the connection limit is reached.</p>
* <p>A value less than or equal to zero will not change the existing idle timeout.</p>
*
* @param idleTimeout the endpoint idle timeout in ms to apply when the connection limit is reached
*/
public void setIdleTimeout(long idleTimeout)
{
@ -105,7 +112,7 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener, Sele
@ManagedAttribute("The maximum number of connections allowed")
public int getMaxConnections()
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
return _maxConnections;
}
@ -113,25 +120,34 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener, Sele
public void setMaxConnections(int max)
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
_maxConnections = max;
}
}
@ManagedAttribute("The current number of connections ")
@ManagedAttribute(value = "The current number of connections", readonly = true)
public int getConnections()
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
return _connections;
}
}
@ManagedAttribute(value = "The current number of pending connections", readonly = true)
public int getPendingConnections()
{
try (AutoLock ignored = _lock.lock())
{
return _accepting;
}
}
@Override
protected void doStart() throws Exception
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
if (_server != null)
{
@ -144,7 +160,7 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener, Sele
}
}
if (LOG.isDebugEnabled())
LOG.debug("ConnectionLimit {} for {}", _maxConnections, _connectors);
LOG.debug("Connection limit {} for {}", _maxConnections, _connectors);
_connections = 0;
_limiting = false;
for (AbstractConnector c : _connectors)
@ -157,7 +173,7 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener, Sele
@Override
protected void doStop() throws Exception
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
for (AbstractConnector c : _connectors)
{
@ -169,25 +185,29 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener, Sele
}
}
protected void check()
private boolean check()
{
if ((_accepting.size() + _connections) >= _maxConnections)
assert _lock.isHeldByCurrentThread();
int total = _accepting + _connections;
if (total >= _maxConnections)
{
if (!_limiting)
{
_limiting = true;
LOG.info("Connection Limit({}) reached for {}", _maxConnections, _connectors);
LOG.info("Connection limit {} reached for {}", _maxConnections, _connectors);
limit();
}
return total > _maxConnections;
}
else
{
if (_limiting)
{
_limiting = false;
LOG.info("Connection Limit({}) cleared for {}", _maxConnections, _connectors);
LOG.info("Connection limit {} cleared for {}", _maxConnections, _connectors);
unlimit();
}
return false;
}
}
@ -226,23 +246,24 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener, Sele
@Override
public void onAccepting(SelectableChannel channel)
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
_accepting.add(channel);
_accepting++;
if (LOG.isDebugEnabled())
LOG.debug("onAccepting ({}+{}) < {} {}", _accepting.size(), _connections, _maxConnections, channel);
check();
LOG.debug("Accepting ({}+{}) <= {} {}", _accepting, _connections, _maxConnections, channel);
if (check())
IO.close(channel);
}
}
@Override
public void onAcceptFailed(SelectableChannel channel, Throwable cause)
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
_accepting.remove(channel);
_accepting--;
if (LOG.isDebugEnabled())
LOG.debug("onAcceptFailed ({}+{}) < {} {} {}", _accepting.size(), _connections, _maxConnections, channel, cause);
LOG.debug("Accept failed ({}+{}) <= {} {}", _accepting, _connections, _maxConnections, channel, cause);
check();
}
}
@ -255,12 +276,12 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener, Sele
@Override
public void onOpened(Connection connection)
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
_accepting.remove(connection.getEndPoint().getTransport());
_accepting--;
_connections++;
if (LOG.isDebugEnabled())
LOG.debug("onOpened ({}+{}) < {} {}", _accepting.size(), _connections, _maxConnections, connection);
LOG.debug("Opened ({}+{}) <= {} {}", _accepting, _connections, _maxConnections, connection);
check();
}
}
@ -268,11 +289,11 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener, Sele
@Override
public void onClosed(Connection connection)
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
_connections--;
if (LOG.isDebugEnabled())
LOG.debug("onClosed ({}+{}) < {} {}", _accepting.size(), _connections, _maxConnections, connection);
LOG.debug("Closed ({}+{}) <= {} {}", _accepting, _connections, _maxConnections, connection);
check();
}
}

View File

@ -0,0 +1,188 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class ConnectionLimitTest
{
private Server server;
private ServerConnector connector;
private void prepare(int acceptors, Handler handler)
{
if (server == null)
server = new Server();
connector = new ServerConnector(server, acceptors, 1);
server.addConnector(connector);
server.setHandler(handler);
}
@AfterEach
public void dispose()
{
LifeCycle.stop(server);
}
@ParameterizedTest
@ValueSource(ints = {0, 1})
public void testConnectionLimitWithConnector(int acceptors) throws Exception
{
prepare(acceptors, new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
callback.succeeded();
return true;
}
});
int maxConnections = 2;
ConnectionLimit limiter = new ConnectionLimit(maxConnections, connector);
connector.addBean(limiter);
server.start();
List<SocketChannel> channels = new ArrayList<>();
for (int i = 0; i < maxConnections; ++i)
{
SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()));
channels.add(channel);
}
// On the client connections may be accepted, but on server not yet.
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxConnections, limiter.getConnections()));
// The limit was reached.
assertFalse(connector.isAccepting());
// An extra connection is accepted at the TCP level, but not notified to the JVM yet:
// it remains in the connector accept queue, which cannot be configured to be zero.
List<SocketChannel> extraChannels = new ArrayList<>();
for (int i = 0; i < 2; ++i)
{
SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()));
extraChannels.add(channel);
}
await().during(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxConnections, limiter.getConnections()));
// Closing one existing connection may accept
// all the extra connections when acceptors=0.
channels.remove(0).close();
// Verify that we are still correctly limited
// and that we have accepted a pending connection.
await().atMost(5, TimeUnit.SECONDS).during(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxConnections, limiter.getConnections()));
extraChannels.forEach(IO::close);
channels.forEach(IO::close);
}
@Test
public void testConnectionLimitWithServer() throws Exception
{
prepare(1, new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
callback.succeeded();
return true;
}
});
ServerConnector connector2 = new ServerConnector(server, 0, 1);
server.addConnector(connector2);
int maxConnections = 2;
ConnectionLimit limiter = new ConnectionLimit(maxConnections, server);
server.addBean(limiter);
server.start();
// Max out the connections.
SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()));
SocketChannel.open(new InetSocketAddress("localhost", connector2.getLocalPort()));
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxConnections, limiter.getConnections()));
// Try to create more, should not be possible.
SocketChannel extraChannel1 = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()));
await().during(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxConnections, limiter.getConnections()));
SocketChannel extraChannel2 = SocketChannel.open(new InetSocketAddress("localhost", connector2.getLocalPort()));
await().during(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxConnections, limiter.getConnections()));
extraChannel2.close();
extraChannel1.close();
}
@Test
public void testAcceptRejectedByExecutor() throws Exception
{
// One acceptor, one selector, one application.
int maxThreads = 3;
int maxQueue = 1;
QueuedThreadPool serverThreads = new QueuedThreadPool(maxThreads, 0, new ArrayBlockingQueue<>(maxQueue));
serverThreads.setReservedThreads(0);
serverThreads.setDetailedDump(true);
server = new Server(serverThreads);
prepare(1, new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
callback.succeeded();
return true;
}
});
int maxConnections = 2;
ConnectionLimit limiter = new ConnectionLimit(maxConnections, connector);
connector.addBean(limiter);
server.start();
// Block the last thread.
CompletableFuture<Void> blocker = new CompletableFuture<>();
serverThreads.execute(blocker::join);
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(maxThreads, serverThreads.getThreads()));
// Fill the thread pool queue.
IntStream.range(0, maxQueue).forEach(i -> serverThreads.execute(() ->
{
}));
// Try to connect, the accept task should be rejected.
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())))
{
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
assertEquals(-1, channel.read(byteBuffer));
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(0, limiter.getPendingConnections()));
}
// Release the blocked task.
blocker.complete(null);
}
}

View File

@ -17,21 +17,16 @@ import java.net.Socket;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.LocalConnector.LocalEndPoint;
import org.eclipse.jetty.server.handler.HelloHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.NanoTime;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@ -40,7 +35,6 @@ import static org.junit.jupiter.api.Assertions.fail;
@Disabled // TODO
public class NotAcceptingTest
{
private static final Logger LOG = LoggerFactory.getLogger(NotAcceptingTest.class);
private final long idleTimeout = 2000;
Server server;
LocalConnector localConnector;
@ -362,93 +356,4 @@ public class NotAcceptingTest
assertThat(blockingConnector.isAccepting(), is(true));
assertThat(asyncConnector.isAccepting(), is(true));
}
@Test
public void testConnectionLimit() throws Exception
{
server.addBean(new ConnectionLimit(9, server));
server.setHandler(new HelloHandler());
server.start();
LOG.debug("CONNECT:");
try (
LocalEndPoint local0 = localConnector.connect();
LocalEndPoint local1 = localConnector.connect();
LocalEndPoint local2 = localConnector.connect();
Socket blocking0 = new Socket("localhost", blockingConnector.getLocalPort());
Socket blocking1 = new Socket("localhost", blockingConnector.getLocalPort());
Socket blocking2 = new Socket("localhost", blockingConnector.getLocalPort());
Socket async0 = new Socket("localhost", asyncConnector.getLocalPort());
Socket async1 = new Socket("localhost", asyncConnector.getLocalPort());
Socket async2 = new Socket("localhost", asyncConnector.getLocalPort());
)
{
String expectedContent = "Hello" + System.lineSeparator();
LOG.debug("LOCAL:");
for (LocalEndPoint client : new LocalEndPoint[]{local0, local1, local2})
{
client.addInputAndExecute(BufferUtil.toBuffer("GET /test HTTP/1.1\r\nHost:localhost\r\n\r\n"));
HttpTester.Response response = HttpTester.parseResponse(client.getResponse());
assertThat(response.getStatus(), is(200));
assertThat(response.getContent(), is(expectedContent));
}
LOG.debug("NETWORK:");
for (Socket client : new Socket[]{blocking0, blocking1, blocking2, async0, async1, async2})
{
HttpTester.Input in = HttpTester.from(client.getInputStream());
client.getOutputStream().write("GET /test HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes());
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response.getStatus(), is(200));
assertThat(response.getContent(), is(expectedContent));
}
assertThat(localConnector.isAccepting(), is(false));
assertThat(blockingConnector.isAccepting(), is(false));
assertThat(asyncConnector.isAccepting(), is(false));
{
// Close a async connection
HttpTester.Input in = HttpTester.from(async1.getInputStream());
async1.getOutputStream().write("GET /test HTTP/1.1\r\nHost:localhost\r\nConnection: close\r\n\r\n".getBytes());
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response.getStatus(), is(200));
assertThat(response.getContent(), is(expectedContent));
}
}
waitFor(localConnector::isAccepting, is(true), 2 * idleTimeout, TimeUnit.MILLISECONDS);
waitFor(blockingConnector::isAccepting, is(true), 2 * idleTimeout, TimeUnit.MILLISECONDS);
waitFor(asyncConnector::isAccepting, is(true), 2 * idleTimeout, TimeUnit.MILLISECONDS);
}
public static <T> void waitFor(Supplier<T> value, Matcher<T> matcher, long wait, TimeUnit units)
{
long start = NanoTime.now();
while (true)
{
try
{
matcher.matches(value.get());
return;
}
catch (Throwable e)
{
if (NanoTime.since(start) > units.toNanos(wait))
throw e;
}
try
{
TimeUnit.MILLISECONDS.sleep(50);
}
catch (InterruptedException e)
{
// no op
}
}
}
}