Merged branch 'jetty-12.0.x' into 'jetty-12.1.x'.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2024-08-26 11:28:34 +02:00
commit 1e95e89ddc
No known key found for this signature in database
GPG Key ID: 1677D141BCF3584D
5 changed files with 176 additions and 29 deletions

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
@ -45,8 +46,6 @@ import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import static java.util.concurrent.TimeUnit.SECONDS;
@ -55,7 +54,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled // TODO fix this
public class BlockedWritesWithSmallThreadPoolTest
{
private Server server;
@ -102,7 +100,6 @@ public class BlockedWritesWithSmallThreadPoolTest
}
@Test
@Tag("flaky")
public void testServerThreadsBlockedInWrites() throws Exception
{
int contentLength = 16 * 1024 * 1024;
@ -110,11 +107,12 @@ public class BlockedWritesWithSmallThreadPoolTest
start(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
serverEndPointRef.compareAndSet(null, (AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint());
// Write a large content to cause TCP congestion.
response.write(true, ByteBuffer.wrap(new byte[contentLength]), callback);
// Blocking write a large content to cause TCP congestion.
Content.Sink.write(response, true, ByteBuffer.wrap(new byte[contentLength]));
callback.succeeded();
return true;
}
});
@ -140,21 +138,20 @@ public class BlockedWritesWithSmallThreadPoolTest
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
try
{
// Block here to stop reading from the network
// to cause the server to TCP congest.
clientBlockLatch.await(5, SECONDS);
Stream.Data data = stream.readData();
data.release();
if (data.frame().isEndStream())
clientDataLatch.countDown();
else
stream.demand();
}
catch (InterruptedException x)
catch (InterruptedException ignored)
{
data.release();
}
}
});
@ -174,18 +171,139 @@ public class BlockedWritesWithSmallThreadPoolTest
await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
long delaySeconds = 10;
CountDownLatch serverBlockLatch = new CountDownLatch(1);
assertTrue(serverThreads.tryExecute(() -> await().atMost(20, SECONDS).until(() -> serverBlockLatch.await(15, SECONDS), b -> true)));
assertTrue(serverThreads.tryExecute(() ->
{
try
{
serverBlockLatch.await(2 * delaySeconds, SECONDS);
}
catch (InterruptedException ignored)
{
}
}));
// No more threads are available on the server.
assertEquals(0, serverThreads.getReadyThreads());
// Unblock the client to read from the network, which should unblock the server write().
clientBlockLatch.countDown();
assertTrue(clientDataLatch.await(10, SECONDS), server.dump());
assertTrue(clientDataLatch.await(delaySeconds, SECONDS), server.dump());
// Unblock blocked threads.
serverBlockLatch.countDown();
}
@Test
public void testServerThreadsInPendingWrites() throws Exception
{
int contentLength = 16 * 1024 * 1024;
AtomicReference<AbstractEndPoint> serverEndPointRef = new AtomicReference<>();
start(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
serverEndPointRef.set((AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint());
// Large write that will TCP congest, but it is non-blocking.
response.write(true, ByteBuffer.allocate(contentLength), callback);
return true;
}
});
client = new HTTP2Client();
// Set large flow control windows so the server hits TCP congestion.
int window = 2 * contentLength;
client.setInitialSessionRecvWindow(window);
client.setInitialStreamRecvWindow(window);
client.start();
CountDownLatch clientBlockLatch = new CountDownLatch(1);
CountDownLatch clientDataLatch = new CountDownLatch(1);
Session session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
.get(5, SECONDS);
HttpURI uri = HttpURI.build("http://localhost:" + connector.getLocalPort() + "/congest");
MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, HttpFields.EMPTY);
session.newStream(new HeadersFrame(request, null, true), new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
try
{
// Block here to stop reading from the network
// to cause the server to TCP congest.
clientBlockLatch.await(5, SECONDS);
Stream.Data data = stream.readData();
data.release();
if (data.frame().isEndStream())
clientDataLatch.countDown();
else
stream.demand();
}
catch (InterruptedException ignored)
{
}
}
});
await().atMost(5, SECONDS).until(() ->
{
AbstractEndPoint serverEndPoint = serverEndPointRef.get();
return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending();
});
// Wait for NIO on the server to be OP_WRITE interested.
Thread.sleep(1000);
// Handler.handle() should have returned, make sure we block that thread.
long delaySeconds = 10;
await().atMost(5, SECONDS).until(() -> serverThreads.getIdleThreads() == 1);
CountDownLatch serverBlockLatch = new CountDownLatch(1);
serverThreads.execute(() ->
{
try
{
serverBlockLatch.await(2 * delaySeconds, SECONDS);
}
catch (InterruptedException ignored)
{
}
});
// Make sure there is a reserved thread.
if (serverThreads.getAvailableReservedThreads() != 1)
{
assertFalse(serverThreads.tryExecute(() -> {}));
await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
CountDownLatch reservedBlockLatch = new CountDownLatch(1);
assertTrue(serverThreads.tryExecute(() ->
{
try
{
reservedBlockLatch.await(2 * delaySeconds, SECONDS);
}
catch (InterruptedException ignored)
{
}
}));
// No more threads are available on the server.
assertEquals(0, serverThreads.getReadyThreads());
// Unblock the client to read from the network, which must unblock the server write() and send a response.
clientBlockLatch.countDown();
assertTrue(clientDataLatch.await(delaySeconds, SECONDS), server.dump());
// Unblock blocked threads.
serverBlockLatch.countDown();
reservedBlockLatch.countDown();
}
@Test
public void testClientThreadsBlockedInWrite() throws Exception
{
@ -202,12 +320,12 @@ public class BlockedWritesWithSmallThreadPoolTest
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
try
{
// Block here to stop reading from the network
// to cause the client to TCP congest.
serverBlockLatch.await(5, SECONDS);
Stream.Data data = stream.readData();
data.release();
if (data.frame().isEndStream())
{
@ -219,9 +337,8 @@ public class BlockedWritesWithSmallThreadPoolTest
stream.demand();
}
}
catch (InterruptedException x)
catch (InterruptedException ignored)
{
data.release();
}
}
};
@ -279,14 +396,27 @@ public class BlockedWritesWithSmallThreadPoolTest
await().atMost(5, SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
assertTrue(clientThreads.tryExecute(() -> await().until(() -> clientBlockLatch.await(15, SECONDS), b -> true)));
long delaySeconds = 10;
assertTrue(clientThreads.tryExecute(() ->
{
try
{
clientBlockLatch.await(2 * delaySeconds, SECONDS);
}
catch (InterruptedException ignored)
{
}
}));
// No more threads are available on the client.
await().atMost(5, SECONDS).until(() -> clientThreads.getReadyThreads() == 0);
// Unblock the server to read from the network, which should unblock the client.
serverBlockLatch.countDown();
assertTrue(latch.await(10, SECONDS), client.dump());
assertTrue(latch.await(delaySeconds, SECONDS), client.dump());
// Unblock blocked threads.
clientBlockLatch.countDown();
}
}

View File

@ -135,6 +135,12 @@ public interface HttpStream extends Callback
return CONTENT_NOT_CONSUMED;
}
@Override
default InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
class Wrapper implements HttpStream
{
private final HttpStream _wrapped;

View File

@ -1636,7 +1636,6 @@ public class HttpChannelState implements HttpChannel, Components
@Override
public InvocationType getInvocationType()
{
// TODO review this as it is probably not correct
return _request.getHttpStream().getInvocationType();
}
}

View File

@ -1623,12 +1623,6 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
{
getEndPoint().close(failure);
}
@Override
public InvocationType getInvocationType()
{
return HttpStream.super.getInvocationType();
}
}
private class TunnelSupportOverHTTP1 implements TunnelSupport

View File

@ -102,10 +102,10 @@ public interface Callback extends Invocable
* with the given {@code blocking} characteristic.</p>
*
* @param completable the CompletableFuture to convert into a callback
* @param invocation whether the callback is blocking
* @param invocationType whether the callback is blocking
* @return a callback that when completed, completes the given CompletableFuture
*/
static Callback from(CompletableFuture<?> completable, InvocationType invocation)
static Callback from(CompletableFuture<?> completable, InvocationType invocationType)
{
if (completable instanceof Callback)
return (Callback)completable;
@ -135,7 +135,7 @@ public interface Callback extends Invocable
@Override
public InvocationType getInvocationType()
{
return invocation;
return invocationType;
}
};
}
@ -290,6 +290,12 @@ public interface Callback extends Invocable
{
Callback.failed(callback::failed, completed, x);
}
@Override
public InvocationType getInvocationType()
{
return callback.getInvocationType();
}
};
}
@ -320,15 +326,21 @@ public interface Callback extends Invocable
}
}
@Override
public void failed(Throwable x)
{
Callback.failed(this::completed, callback::failed, x);
}
private void completed(Throwable ignored)
{
completed.run();
}
@Override
public void failed(Throwable x)
public InvocationType getInvocationType()
{
Callback.failed(this::completed, callback::failed, x);
return callback.getInvocationType();
}
};
}
@ -357,6 +369,12 @@ public interface Callback extends Invocable
ExceptionUtil.addSuppressedIfNotAssociated(cause, x);
Callback.failed(callback, cause);
}
@Override
public InvocationType getInvocationType()
{
return callback.getInvocationType();
}
};
}