#6327 stop relying on timeouts to abort tested requests

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-08-04 10:42:48 +02:00
parent 8766bddb50
commit 72505af846
1 changed files with 59 additions and 36 deletions

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.http.client;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -28,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -322,31 +324,37 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void testExpect100ContinueWithContentWithResponseFailureBefore100Continue(Transport transport) throws Exception
{
init(transport);
long idleTimeout = 100;
AtomicReference<org.eclipse.jetty.client.api.Request> clientRequestRef = new AtomicReference<>();
CountDownLatch clientLatch = new CountDownLatch(1);
CountDownLatch serverLatch = new CountDownLatch(1);
scenario.startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
baseRequest.setHandled(true);
clientRequestRef.get().abort(new Exception("abort!"));
try
{
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
if (!clientLatch.await(5, TimeUnit.SECONDS))
throw new ServletException("Server timed out on client latch");
serverLatch.countDown();
}
catch (InterruptedException x)
catch (InterruptedException e)
{
throw new ServletException(x);
throw new ServletException(e);
}
}
});
scenario.startClient(httpClient -> httpClient.setIdleTimeout(2 * idleTimeout));
scenario.startClient();
byte[] content = new byte[1024];
CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
org.eclipse.jetty.client.api.Request clientRequest = scenario.client.newRequest(scenario.newURI());
clientRequestRef.set(clientRequest);
clientRequest
.headers(headers -> headers.put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE))
.body(new BytesRequestContent(content))
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
.send(new BufferingResponseListener()
{
@Override
@ -355,11 +363,12 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
assertTrue(result.isFailed());
assertNotNull(result.getRequestFailure());
assertNotNull(result.getResponseFailure());
latch.countDown();
clientLatch.countDown();
}
});
assertTrue(latch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ -367,7 +376,9 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void testExpect100ContinueWithContentWithResponseFailureAfter100Continue(Transport transport) throws Exception
{
init(transport);
long idleTimeout = 100;
AtomicReference<org.eclipse.jetty.client.api.Request> clientRequestRef = new AtomicReference<>();
CountDownLatch clientLatch = new CountDownLatch(1);
CountDownLatch serverLatch = new CountDownLatch(1);
scenario.startServer(new AbstractHandler()
{
@Override
@ -376,9 +387,12 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
baseRequest.setHandled(true);
// Send 100-Continue and consume the content
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
clientRequestRef.get().abort(new Exception("abort!"));
try
{
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
if (!clientLatch.await(5, TimeUnit.SECONDS))
throw new ServletException("Server timed out on client latch");
serverLatch.countDown();
}
catch (InterruptedException x)
{
@ -386,11 +400,12 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
}
}
});
scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
scenario.startClient();
byte[] content = new byte[1024];
CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
org.eclipse.jetty.client.api.Request clientRequest = scenario.client.newRequest(scenario.newURI());
clientRequestRef.set(clientRequest);
clientRequest
.headers(headers -> headers.put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE))
.body(new BytesRequestContent(content))
.send(new BufferingResponseListener()
@ -401,11 +416,12 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
assertTrue(result.isFailed());
assertNull(result.getRequestFailure());
assertNotNull(result.getResponseFailure());
latch.countDown();
clientLatch.countDown();
}
});
assertTrue(latch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ -472,8 +488,14 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
@ArgumentsSource(TransportProvider.class)
public void testExpect100ContinueWithDeferredContentRespond100Continue(Transport transport) throws Exception
{
byte[] chunk1 = new byte[]{0, 1, 2, 3};
byte[] chunk2 = new byte[]{4, 5, 6, 7};
byte[] data = new byte[chunk1.length + chunk2.length];
System.arraycopy(chunk1, 0, data, 0, chunk1.length);
System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length);
CountDownLatch serverLatch = new CountDownLatch(1);
AtomicReference<Thread> handlerThread = new AtomicReference<>();
CountDownLatch demandLatch = new CountDownLatch(3);
init(transport);
scenario.start(new AbstractHandler()
{
@ -483,26 +505,21 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
baseRequest.setHandled(true);
handlerThread.set(Thread.currentThread());
// Send 100-Continue and echo the content
IO.copy(request.getInputStream(), response.getOutputStream());
ServletOutputStream outputStream = response.getOutputStream();
DataInputStream inputStream = new DataInputStream(request.getInputStream());
// Block until the 1st chunk is fully received.
byte[] buf1 = new byte[chunk1.length];
inputStream.readFully(buf1);
outputStream.write(buf1);
serverLatch.countDown();
IO.copy(inputStream, outputStream);
}
});
byte[] chunk1 = new byte[]{0, 1, 2, 3};
byte[] chunk2 = new byte[]{4, 5, 6, 7};
byte[] data = new byte[chunk1.length + chunk2.length];
System.arraycopy(chunk1, 0, data, 0, chunk1.length);
System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length);
CountDownLatch requestLatch = new CountDownLatch(1);
AsyncRequestContent content = new AsyncRequestContent()
{
@Override
public void demand()
{
super.demand();
demandLatch.countDown();
}
};
AsyncRequestContent content = new AsyncRequestContent();
scenario.client.newRequest(scenario.newURI())
.headers(headers -> headers.put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE))
.body(content)
@ -516,7 +533,7 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
}
});
// Wait for the handler thread to be blocked in IO.
// Wait for the handler thread to be blocked in the 1st IO.
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread thread = handlerThread.get();
@ -525,7 +542,13 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
content.offer(ByteBuffer.wrap(chunk1));
assertTrue(demandLatch.await(5, TimeUnit.SECONDS));
// Wait for the handler thread to be blocked in the 2nd IO.
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread thread = handlerThread.get();
return thread != null && thread.getState() == Thread.State.WAITING;
});
content.offer(ByteBuffer.wrap(chunk2));
content.close();