From 5c9d796a49a70c89d70a1b0a6749aa87c8d98596 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Fri, 22 Sep 2023 16:01:12 +0200 Subject: [PATCH] #10543 handle review comments Signed-off-by: Ludovic Orban --- .../test/client/transport/AbstractTest.java | 8 +- .../transport/HttpClientStreamTest.java | 82 ++++++++++--------- 2 files changed, 48 insertions(+), 42 deletions(-) diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/AbstractTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/AbstractTest.java index 0a5d8bd9cdf..d381e7a36ee 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/AbstractTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/AbstractTest.java @@ -162,7 +162,7 @@ public class AbstractTest isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue); if (disabled) { - System.err.println("Not tracking leaks"); + System.err.println("Not tracking " + tagSubValue + " leaks"); return true; } @@ -172,7 +172,7 @@ public class AbstractTest isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue + ":" + transportName); if (disabled) { - System.err.println("Not tracking leaks for transport " + transportName); + System.err.println("Not tracking " + tagSubValue + " leaks for transport " + transportName); return true; } } @@ -181,7 +181,7 @@ public class AbstractTest isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue + ":" + tagSubValue); if (disabled) { - System.err.println("Not tracking leaks for " + tagSubValue); + System.err.println("Not tracking " + tagSubValue + " leaks"); return true; } @@ -191,7 +191,7 @@ public class AbstractTest isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue + ":" + tagSubValue + ":" + transportName); if (disabled) { - System.err.println("Not tracking leaks for " + tagSubValue + " using transport " + transportName); + System.err.println("Not tracking " + tagSubValue + " leaks for transport " + transportName); return true; } } diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientStreamTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientStreamTest.java index 71439c77c42..5df2cbbcce0 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientStreamTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientStreamTest.java @@ -32,7 +32,6 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -59,6 +58,7 @@ import org.eclipse.jetty.server.Request; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.CompletableTask; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.NanoTime; import org.eclipse.jetty.util.component.LifeCycle; @@ -1200,81 +1200,87 @@ public class HttpClientStreamTest extends AbstractTest @ParameterizedTest @MethodSource("transports") - @Tag("DisableLeakTracking:server") + @Tag("DisableLeakTracking") public void testHttpStreamConsumeAvailableUponClientTimeout(Transport transport) throws Exception { + AtomicReference clientRequestRef = new AtomicReference<>(); + start(transport, new Handler.Abstract() { @Override public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback) { - // Consume the uploaded data very slowly to make the client timeout. - new Runnable() + new CompletableTask<>() { @Override public void run() { while (true) { - try - { - Thread.sleep(100); - } - catch (InterruptedException e) - { - // ignore - } - Content.Chunk chunk = request.read(); + + Content.Chunk chunk = request.read(); if (chunk == null) { request.demand(this); return; } - if (Content.Chunk.isFailure(chunk)) { - callback.failed(chunk.getFailure()); + completeExceptionally(chunk.getFailure()); + return; + } + chunk.release(); + if (chunk.isLast()) + { + complete(null); return; } - chunk.release(); - - if (chunk.isLast()) + org.eclipse.jetty.client.Request r = clientRequestRef.getAndSet(null); + if (r != null) { - callback.succeeded(); - return; + // Abort the client request then give some time for the client's + // abort notification (e.g.: reset frame) to reach the server. + r.abort(new IllegalCallerException()); + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + completeExceptionally(e); + return; + } } } } - }.run(); + } + .start() + .whenComplete((result, failure) -> + { + if (failure == null) + callback.succeeded(); + else + callback.failed(failure); + }); return true; } }); - // Upload a large amount of data to the server with a timeout small enough - // that the client will timeout during the transfer. byte[] data = new byte[16 * 1024 * 1024]; new Random().nextBytes(data); ByteBufferRequestContent content = new ByteBufferRequestContent(ByteBuffer.wrap(data)); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference errorHolder = new AtomicReference<>(); - long timeoutMs = switch (transport) - { - case H2, H2C -> 100; - default -> 1000; - }; - new CompletableResponseListener(client.newRequest(newURI(transport)).body(content).timeout(timeoutMs, TimeUnit.MILLISECONDS)) + org.eclipse.jetty.client.Request request = client.newRequest(newURI(transport)) + .body(content); + clientRequestRef.set(request); + Throwable throwable = new CompletableResponseListener(request) .send() - .whenComplete((r, t) -> - { - errorHolder.set(t); - latch.countDown(); - }); + .handle((r, t) -> t) + .get(5, TimeUnit.SECONDS); - assertTrue(latch.await(5, TimeUnit.SECONDS)); - assertInstanceOf(TimeoutException.class, errorHolder.get()); + assertInstanceOf(IllegalCallerException.class, throwable); } private record HandlerContext(Request request, org.eclipse.jetty.server.Response response, Callback callback)