diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java index 7bfdc6e2531..55cd8ae642a 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java @@ -14,6 +14,7 @@ package org.eclipse.jetty.http.client; import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -28,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletException; import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -322,31 +324,37 @@ public class HttpClientContinueTest extends AbstractTest public void testExpect100ContinueWithContentWithResponseFailureBefore100Continue(Transport transport) throws Exception { init(transport); - long idleTimeout = 100; + AtomicReference clientRequestRef = new AtomicReference<>(); + CountDownLatch clientLatch = new CountDownLatch(1); + CountDownLatch serverLatch = new CountDownLatch(1); + scenario.startServer(new AbstractHandler() { @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException { baseRequest.setHandled(true); + clientRequestRef.get().abort(new Exception("abort!")); try { - TimeUnit.MILLISECONDS.sleep(2 * idleTimeout); + if (!clientLatch.await(5, TimeUnit.SECONDS)) + throw new ServletException("Server timed out on client latch"); + serverLatch.countDown(); } - catch (InterruptedException x) + catch (InterruptedException e) { - throw new ServletException(x); + throw new ServletException(e); } } }); - scenario.startClient(httpClient -> httpClient.setIdleTimeout(2 * idleTimeout)); + scenario.startClient(); byte[] content = new byte[1024]; - CountDownLatch latch = new CountDownLatch(1); - scenario.client.newRequest(scenario.newURI()) + org.eclipse.jetty.client.api.Request clientRequest = scenario.client.newRequest(scenario.newURI()); + clientRequestRef.set(clientRequest); + clientRequest .headers(headers -> headers.put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE)) .body(new BytesRequestContent(content)) - .idleTimeout(idleTimeout, TimeUnit.MILLISECONDS) .send(new BufferingResponseListener() { @Override @@ -355,11 +363,12 @@ public class HttpClientContinueTest extends AbstractTest assertTrue(result.isFailed()); assertNotNull(result.getRequestFailure()); assertNotNull(result.getResponseFailure()); - latch.countDown(); + clientLatch.countDown(); } }); - assertTrue(latch.await(3 * idleTimeout, TimeUnit.MILLISECONDS)); + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); } @ParameterizedTest @@ -367,7 +376,9 @@ public class HttpClientContinueTest extends AbstractTest public void testExpect100ContinueWithContentWithResponseFailureAfter100Continue(Transport transport) throws Exception { init(transport); - long idleTimeout = 100; + AtomicReference clientRequestRef = new AtomicReference<>(); + CountDownLatch clientLatch = new CountDownLatch(1); + CountDownLatch serverLatch = new CountDownLatch(1); scenario.startServer(new AbstractHandler() { @Override @@ -376,9 +387,12 @@ public class HttpClientContinueTest extends AbstractTest baseRequest.setHandled(true); // Send 100-Continue and consume the content IO.copy(request.getInputStream(), new ByteArrayOutputStream()); + clientRequestRef.get().abort(new Exception("abort!")); try { - TimeUnit.MILLISECONDS.sleep(2 * idleTimeout); + if (!clientLatch.await(5, TimeUnit.SECONDS)) + throw new ServletException("Server timed out on client latch"); + serverLatch.countDown(); } catch (InterruptedException x) { @@ -386,11 +400,12 @@ public class HttpClientContinueTest extends AbstractTest } } }); - scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout)); + scenario.startClient(); byte[] content = new byte[1024]; - CountDownLatch latch = new CountDownLatch(1); - scenario.client.newRequest(scenario.newURI()) + org.eclipse.jetty.client.api.Request clientRequest = scenario.client.newRequest(scenario.newURI()); + clientRequestRef.set(clientRequest); + clientRequest .headers(headers -> headers.put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE)) .body(new BytesRequestContent(content)) .send(new BufferingResponseListener() @@ -401,11 +416,12 @@ public class HttpClientContinueTest extends AbstractTest assertTrue(result.isFailed()); assertNull(result.getRequestFailure()); assertNotNull(result.getResponseFailure()); - latch.countDown(); + clientLatch.countDown(); } }); - assertTrue(latch.await(3 * idleTimeout, TimeUnit.MILLISECONDS)); + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); } @ParameterizedTest @@ -472,8 +488,14 @@ public class HttpClientContinueTest extends AbstractTest @ArgumentsSource(TransportProvider.class) public void testExpect100ContinueWithDeferredContentRespond100Continue(Transport transport) throws Exception { + byte[] chunk1 = new byte[]{0, 1, 2, 3}; + byte[] chunk2 = new byte[]{4, 5, 6, 7}; + byte[] data = new byte[chunk1.length + chunk2.length]; + System.arraycopy(chunk1, 0, data, 0, chunk1.length); + System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length); + + CountDownLatch serverLatch = new CountDownLatch(1); AtomicReference handlerThread = new AtomicReference<>(); - CountDownLatch demandLatch = new CountDownLatch(3); init(transport); scenario.start(new AbstractHandler() { @@ -483,26 +505,21 @@ public class HttpClientContinueTest extends AbstractTest baseRequest.setHandled(true); handlerThread.set(Thread.currentThread()); // Send 100-Continue and echo the content - IO.copy(request.getInputStream(), response.getOutputStream()); + + ServletOutputStream outputStream = response.getOutputStream(); + DataInputStream inputStream = new DataInputStream(request.getInputStream()); + // Block until the 1st chunk is fully received. + byte[] buf1 = new byte[chunk1.length]; + inputStream.readFully(buf1); + outputStream.write(buf1); + + serverLatch.countDown(); + IO.copy(inputStream, outputStream); } }); - byte[] chunk1 = new byte[]{0, 1, 2, 3}; - byte[] chunk2 = new byte[]{4, 5, 6, 7}; - byte[] data = new byte[chunk1.length + chunk2.length]; - System.arraycopy(chunk1, 0, data, 0, chunk1.length); - System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length); - CountDownLatch requestLatch = new CountDownLatch(1); - AsyncRequestContent content = new AsyncRequestContent() - { - @Override - public void demand() - { - super.demand(); - demandLatch.countDown(); - } - }; + AsyncRequestContent content = new AsyncRequestContent(); scenario.client.newRequest(scenario.newURI()) .headers(headers -> headers.put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE)) .body(content) @@ -516,7 +533,7 @@ public class HttpClientContinueTest extends AbstractTest } }); - // Wait for the handler thread to be blocked in IO. + // Wait for the handler thread to be blocked in the 1st IO. await().atMost(5, TimeUnit.SECONDS).until(() -> { Thread thread = handlerThread.get(); @@ -525,7 +542,13 @@ public class HttpClientContinueTest extends AbstractTest content.offer(ByteBuffer.wrap(chunk1)); - assertTrue(demandLatch.await(5, TimeUnit.SECONDS)); + // Wait for the handler thread to be blocked in the 2nd IO. + assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + Thread thread = handlerThread.get(); + return thread != null && thread.getState() == Thread.State.WAITING; + }); content.offer(ByteBuffer.wrap(chunk2)); content.close();