Fixes #9121 - Flaky BlockedWritesWithSmallThreadPoolTest.testServerThreadsBlockedInWrites(). (#12178)
The test uncovered a larger problem detailed in the issue: the Handler Callback should be non-blocking. Since all implementations of HttpStream are non-blocking, overridden HttpStream.getInvocationType() to return NON_BLOCKING. This guarantees that even in case of all server threads blocked, blocked/pending writes can be completed. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
21f2f2acea
commit
930d91568a
|
@ -33,6 +33,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||||
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
|
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
|
||||||
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
|
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
|
||||||
import org.eclipse.jetty.io.AbstractEndPoint;
|
import org.eclipse.jetty.io.AbstractEndPoint;
|
||||||
|
import org.eclipse.jetty.io.Content;
|
||||||
import org.eclipse.jetty.server.Handler;
|
import org.eclipse.jetty.server.Handler;
|
||||||
import org.eclipse.jetty.server.HttpConfiguration;
|
import org.eclipse.jetty.server.HttpConfiguration;
|
||||||
import org.eclipse.jetty.server.Request;
|
import org.eclipse.jetty.server.Request;
|
||||||
|
@ -45,7 +46,6 @@ import org.eclipse.jetty.util.Promise;
|
||||||
import org.eclipse.jetty.util.component.LifeCycle;
|
import org.eclipse.jetty.util.component.LifeCycle;
|
||||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.Tag;
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
@ -100,7 +100,6 @@ public class BlockedWritesWithSmallThreadPoolTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Tag("flaky")
|
|
||||||
public void testServerThreadsBlockedInWrites() throws Exception
|
public void testServerThreadsBlockedInWrites() throws Exception
|
||||||
{
|
{
|
||||||
int contentLength = 16 * 1024 * 1024;
|
int contentLength = 16 * 1024 * 1024;
|
||||||
|
@ -108,11 +107,12 @@ public class BlockedWritesWithSmallThreadPoolTest
|
||||||
start(new Handler.Abstract()
|
start(new Handler.Abstract()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public boolean handle(Request request, Response response, Callback callback)
|
public boolean handle(Request request, Response response, Callback callback) throws Exception
|
||||||
{
|
{
|
||||||
serverEndPointRef.compareAndSet(null, (AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint());
|
serverEndPointRef.compareAndSet(null, (AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint());
|
||||||
// Write a large content to cause TCP congestion.
|
// Blocking write a large content to cause TCP congestion.
|
||||||
response.write(true, ByteBuffer.wrap(new byte[contentLength]), callback);
|
Content.Sink.write(response, true, ByteBuffer.wrap(new byte[contentLength]));
|
||||||
|
callback.succeeded();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -138,21 +138,20 @@ public class BlockedWritesWithSmallThreadPoolTest
|
||||||
@Override
|
@Override
|
||||||
public void onDataAvailable(Stream stream)
|
public void onDataAvailable(Stream stream)
|
||||||
{
|
{
|
||||||
Stream.Data data = stream.readData();
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Block here to stop reading from the network
|
// Block here to stop reading from the network
|
||||||
// to cause the server to TCP congest.
|
// to cause the server to TCP congest.
|
||||||
clientBlockLatch.await(5, SECONDS);
|
clientBlockLatch.await(5, SECONDS);
|
||||||
|
Stream.Data data = stream.readData();
|
||||||
data.release();
|
data.release();
|
||||||
if (data.frame().isEndStream())
|
if (data.frame().isEndStream())
|
||||||
clientDataLatch.countDown();
|
clientDataLatch.countDown();
|
||||||
else
|
else
|
||||||
stream.demand();
|
stream.demand();
|
||||||
}
|
}
|
||||||
catch (InterruptedException x)
|
catch (InterruptedException ignored)
|
||||||
{
|
{
|
||||||
data.release();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -172,18 +171,139 @@ public class BlockedWritesWithSmallThreadPoolTest
|
||||||
await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
|
await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
|
||||||
}
|
}
|
||||||
// Use the reserved thread for a blocking operation, simulating another blocking write.
|
// Use the reserved thread for a blocking operation, simulating another blocking write.
|
||||||
|
long delaySeconds = 10;
|
||||||
CountDownLatch serverBlockLatch = new CountDownLatch(1);
|
CountDownLatch serverBlockLatch = new CountDownLatch(1);
|
||||||
assertTrue(serverThreads.tryExecute(() -> await().atMost(20, SECONDS).until(() -> serverBlockLatch.await(15, SECONDS), b -> true)));
|
assertTrue(serverThreads.tryExecute(() ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
serverBlockLatch.await(2 * delaySeconds, SECONDS);
|
||||||
|
}
|
||||||
|
catch (InterruptedException ignored)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
// No more threads are available on the server.
|
||||||
assertEquals(0, serverThreads.getReadyThreads());
|
assertEquals(0, serverThreads.getReadyThreads());
|
||||||
|
|
||||||
// Unblock the client to read from the network, which should unblock the server write().
|
// Unblock the client to read from the network, which should unblock the server write().
|
||||||
clientBlockLatch.countDown();
|
clientBlockLatch.countDown();
|
||||||
|
|
||||||
assertTrue(clientDataLatch.await(10, SECONDS), server.dump());
|
assertTrue(clientDataLatch.await(delaySeconds, SECONDS), server.dump());
|
||||||
|
|
||||||
|
// Unblock blocked threads.
|
||||||
serverBlockLatch.countDown();
|
serverBlockLatch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerThreadsInPendingWrites() throws Exception
|
||||||
|
{
|
||||||
|
int contentLength = 16 * 1024 * 1024;
|
||||||
|
AtomicReference<AbstractEndPoint> serverEndPointRef = new AtomicReference<>();
|
||||||
|
start(new Handler.Abstract()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public boolean handle(Request request, Response response, Callback callback)
|
||||||
|
{
|
||||||
|
serverEndPointRef.set((AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint());
|
||||||
|
// Large write that will TCP congest, but it is non-blocking.
|
||||||
|
response.write(true, ByteBuffer.allocate(contentLength), callback);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
client = new HTTP2Client();
|
||||||
|
// Set large flow control windows so the server hits TCP congestion.
|
||||||
|
int window = 2 * contentLength;
|
||||||
|
client.setInitialSessionRecvWindow(window);
|
||||||
|
client.setInitialStreamRecvWindow(window);
|
||||||
|
client.start();
|
||||||
|
|
||||||
|
CountDownLatch clientBlockLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientDataLatch = new CountDownLatch(1);
|
||||||
|
Session session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
|
||||||
|
.get(5, SECONDS);
|
||||||
|
HttpURI uri = HttpURI.build("http://localhost:" + connector.getLocalPort() + "/congest");
|
||||||
|
MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, HttpFields.EMPTY);
|
||||||
|
session.newStream(new HeadersFrame(request, null, true), new Stream.Listener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onDataAvailable(Stream stream)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Block here to stop reading from the network
|
||||||
|
// to cause the server to TCP congest.
|
||||||
|
clientBlockLatch.await(5, SECONDS);
|
||||||
|
Stream.Data data = stream.readData();
|
||||||
|
data.release();
|
||||||
|
if (data.frame().isEndStream())
|
||||||
|
clientDataLatch.countDown();
|
||||||
|
else
|
||||||
|
stream.demand();
|
||||||
|
}
|
||||||
|
catch (InterruptedException ignored)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
await().atMost(5, SECONDS).until(() ->
|
||||||
|
{
|
||||||
|
AbstractEndPoint serverEndPoint = serverEndPointRef.get();
|
||||||
|
return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending();
|
||||||
|
});
|
||||||
|
// Wait for NIO on the server to be OP_WRITE interested.
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// Handler.handle() should have returned, make sure we block that thread.
|
||||||
|
long delaySeconds = 10;
|
||||||
|
await().atMost(5, SECONDS).until(() -> serverThreads.getIdleThreads() == 1);
|
||||||
|
CountDownLatch serverBlockLatch = new CountDownLatch(1);
|
||||||
|
serverThreads.execute(() ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
serverBlockLatch.await(2 * delaySeconds, SECONDS);
|
||||||
|
}
|
||||||
|
catch (InterruptedException ignored)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Make sure there is a reserved thread.
|
||||||
|
if (serverThreads.getAvailableReservedThreads() != 1)
|
||||||
|
{
|
||||||
|
assertFalse(serverThreads.tryExecute(() -> {}));
|
||||||
|
await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
|
||||||
|
}
|
||||||
|
// Use the reserved thread for a blocking operation, simulating another blocking write.
|
||||||
|
CountDownLatch reservedBlockLatch = new CountDownLatch(1);
|
||||||
|
assertTrue(serverThreads.tryExecute(() ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
reservedBlockLatch.await(2 * delaySeconds, SECONDS);
|
||||||
|
}
|
||||||
|
catch (InterruptedException ignored)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
// No more threads are available on the server.
|
||||||
|
assertEquals(0, serverThreads.getReadyThreads());
|
||||||
|
|
||||||
|
// Unblock the client to read from the network, which must unblock the server write() and send a response.
|
||||||
|
clientBlockLatch.countDown();
|
||||||
|
|
||||||
|
assertTrue(clientDataLatch.await(delaySeconds, SECONDS), server.dump());
|
||||||
|
|
||||||
|
// Unblock blocked threads.
|
||||||
|
serverBlockLatch.countDown();
|
||||||
|
reservedBlockLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClientThreadsBlockedInWrite() throws Exception
|
public void testClientThreadsBlockedInWrite() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -200,12 +320,12 @@ public class BlockedWritesWithSmallThreadPoolTest
|
||||||
@Override
|
@Override
|
||||||
public void onDataAvailable(Stream stream)
|
public void onDataAvailable(Stream stream)
|
||||||
{
|
{
|
||||||
Stream.Data data = stream.readData();
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Block here to stop reading from the network
|
// Block here to stop reading from the network
|
||||||
// to cause the client to TCP congest.
|
// to cause the client to TCP congest.
|
||||||
serverBlockLatch.await(5, SECONDS);
|
serverBlockLatch.await(5, SECONDS);
|
||||||
|
Stream.Data data = stream.readData();
|
||||||
data.release();
|
data.release();
|
||||||
if (data.frame().isEndStream())
|
if (data.frame().isEndStream())
|
||||||
{
|
{
|
||||||
|
@ -217,9 +337,8 @@ public class BlockedWritesWithSmallThreadPoolTest
|
||||||
stream.demand();
|
stream.demand();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (InterruptedException x)
|
catch (InterruptedException ignored)
|
||||||
{
|
{
|
||||||
data.release();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -277,14 +396,27 @@ public class BlockedWritesWithSmallThreadPoolTest
|
||||||
await().atMost(5, SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1);
|
await().atMost(5, SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1);
|
||||||
}
|
}
|
||||||
// Use the reserved thread for a blocking operation, simulating another blocking write.
|
// Use the reserved thread for a blocking operation, simulating another blocking write.
|
||||||
assertTrue(clientThreads.tryExecute(() -> await().until(() -> clientBlockLatch.await(15, SECONDS), b -> true)));
|
long delaySeconds = 10;
|
||||||
|
assertTrue(clientThreads.tryExecute(() ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
clientBlockLatch.await(2 * delaySeconds, SECONDS);
|
||||||
|
}
|
||||||
|
catch (InterruptedException ignored)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
// No more threads are available on the client.
|
||||||
await().atMost(5, SECONDS).until(() -> clientThreads.getReadyThreads() == 0);
|
await().atMost(5, SECONDS).until(() -> clientThreads.getReadyThreads() == 0);
|
||||||
|
|
||||||
// Unblock the server to read from the network, which should unblock the client.
|
// Unblock the server to read from the network, which should unblock the client.
|
||||||
serverBlockLatch.countDown();
|
serverBlockLatch.countDown();
|
||||||
|
|
||||||
assertTrue(latch.await(10, SECONDS), client.dump());
|
assertTrue(latch.await(delaySeconds, SECONDS), client.dump());
|
||||||
|
|
||||||
|
// Unblock blocked threads.
|
||||||
clientBlockLatch.countDown();
|
clientBlockLatch.countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,6 +135,12 @@ public interface HttpStream extends Callback
|
||||||
return CONTENT_NOT_CONSUMED;
|
return CONTENT_NOT_CONSUMED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
default InvocationType getInvocationType()
|
||||||
|
{
|
||||||
|
return InvocationType.NON_BLOCKING;
|
||||||
|
}
|
||||||
|
|
||||||
class Wrapper implements HttpStream
|
class Wrapper implements HttpStream
|
||||||
{
|
{
|
||||||
private final HttpStream _wrapped;
|
private final HttpStream _wrapped;
|
||||||
|
|
|
@ -1636,7 +1636,6 @@ public class HttpChannelState implements HttpChannel, Components
|
||||||
@Override
|
@Override
|
||||||
public InvocationType getInvocationType()
|
public InvocationType getInvocationType()
|
||||||
{
|
{
|
||||||
// TODO review this as it is probably not correct
|
|
||||||
return _request.getHttpStream().getInvocationType();
|
return _request.getHttpStream().getInvocationType();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1610,12 +1610,6 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
|
||||||
{
|
{
|
||||||
getEndPoint().close(failure);
|
getEndPoint().close(failure);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public InvocationType getInvocationType()
|
|
||||||
{
|
|
||||||
return HttpStream.super.getInvocationType();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private class TunnelSupportOverHTTP1 implements TunnelSupport
|
private class TunnelSupportOverHTTP1 implements TunnelSupport
|
||||||
|
|
Loading…
Reference in New Issue