Backport from 10.0.x of the changes using Awaitility.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-08-25 15:46:37 +02:00
parent 1b79fcee94
commit 05c08e1602
11 changed files with 135 additions and 125 deletions

View File

@ -135,5 +135,9 @@
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -44,6 +44,7 @@ import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -430,8 +431,6 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
@Tag("Slow")
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testIdleConnectionIsClosedOnRemoteClose(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
@ -457,10 +456,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
connector.stop();
// Give the connection some time to process the remote close
TimeUnit.SECONDS.sleep(1);
assertEquals(0, idleConnections.size());
assertEquals(0, activeConnections.size());
await().atMost(5, TimeUnit.SECONDS).until(() -> idleConnections.size() == 0 && activeConnections.size() == 0);
}
@ParameterizedTest

View File

@ -38,8 +38,8 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -94,7 +94,6 @@ public class HttpSenderOverHTTPTest
}
@Test
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testSendNoRequestContentIncompleteFlush() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
@ -108,7 +107,7 @@ public class HttpSenderOverHTTPTest
StringBuilder builder = new StringBuilder(endPoint.takeOutputString());
// Wait for the write to complete
TimeUnit.SECONDS.sleep(1);
await().atMost(5, TimeUnit.SECONDS).until(() -> endPoint.toEndPointString().contains(",flush=P,"));
String chunk = endPoint.takeOutputString();
while (chunk.length() > 0)

View File

@ -118,6 +118,10 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -56,10 +55,10 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class BlockedWritesWithSmallThreadPoolTest
{
@ -143,16 +142,23 @@ public class BlockedWritesWithSmallThreadPoolTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
// Block here to stop reading from the network
// to cause the server to TCP congest.
awaitUntil(0, () -> clientBlockLatch.await(5, TimeUnit.SECONDS));
callback.succeeded();
if (frame.isEndStream())
clientDataLatch.countDown();
try
{
// Block here to stop reading from the network
// to cause the server to TCP congest.
clientBlockLatch.await(5, TimeUnit.SECONDS);
callback.succeeded();
if (frame.isEndStream())
clientDataLatch.countDown();
}
catch (InterruptedException x)
{
callback.failed(x);
}
}
});
awaitUntil(5000, () ->
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
AbstractEndPoint serverEndPoint = serverEndPointRef.get();
return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending();
@ -164,11 +170,11 @@ public class BlockedWritesWithSmallThreadPoolTest
if (serverThreads.getAvailableReservedThreads() != 1)
{
assertFalse(serverThreads.tryExecute(() -> {}));
awaitUntil(5000, () -> serverThreads.getAvailableReservedThreads() == 1);
await().atMost(5, TimeUnit.SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
CountDownLatch serverBlockLatch = new CountDownLatch(1);
assertTrue(serverThreads.tryExecute(() -> awaitUntil(0, () -> serverBlockLatch.await(15, TimeUnit.SECONDS))));
assertTrue(serverThreads.tryExecute(() -> await().atMost(20, TimeUnit.SECONDS).until(() -> serverBlockLatch.await(15, TimeUnit.SECONDS), b -> true)));
assertEquals(0, serverThreads.getReadyThreads());
@ -194,14 +200,21 @@ public class BlockedWritesWithSmallThreadPoolTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
// Block here to stop reading from the network
// to cause the client to TCP congest.
awaitUntil(0, () -> serverBlockLatch.await(5, TimeUnit.SECONDS));
callback.succeeded();
if (frame.isEndStream())
try
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
// Block here to stop reading from the network
// to cause the client to TCP congest.
serverBlockLatch.await(5, TimeUnit.SECONDS);
callback.succeeded();
if (frame.isEndStream())
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
}
catch (InterruptedException x)
{
callback.failed(x);
}
}
};
@ -241,7 +254,7 @@ public class BlockedWritesWithSmallThreadPoolTest
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(contentLength), true), Callback.NOOP);
awaitUntil(5000, () ->
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
AbstractEndPoint clientEndPoint = (AbstractEndPoint)((HTTP2Session)session).getEndPoint();
return clientEndPoint.getWriteFlusher().isPending();
@ -251,17 +264,17 @@ public class BlockedWritesWithSmallThreadPoolTest
CountDownLatch clientBlockLatch = new CountDownLatch(1);
// Make sure the application thread is blocked.
clientThreads.execute(() -> awaitUntil(0, () -> clientBlockLatch.await(15, TimeUnit.SECONDS)));
clientThreads.execute(() -> await().until(() -> clientBlockLatch.await(15, TimeUnit.SECONDS), b -> true));
// Make sure the reserved thread is blocked.
if (clientThreads.getAvailableReservedThreads() != 1)
{
assertFalse(clientThreads.tryExecute(() -> {}));
awaitUntil(5000, () -> clientThreads.getAvailableReservedThreads() == 1);
await().atMost(5, TimeUnit.SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
assertTrue(clientThreads.tryExecute(() -> awaitUntil(0, () -> clientBlockLatch.await(15, TimeUnit.SECONDS))));
assertTrue(clientThreads.tryExecute(() -> await().until(() -> clientBlockLatch.await(15, TimeUnit.SECONDS), b -> true)));
awaitUntil(5000, () -> clientThreads.getReadyThreads() == 0);
await().atMost(5, TimeUnit.SECONDS).until(() -> clientThreads.getReadyThreads() == 0);
// Unblock the server to read from the network, which should unblock the client.
serverBlockLatch.countDown();
@ -269,35 +282,4 @@ public class BlockedWritesWithSmallThreadPoolTest
assertTrue(latch.await(10, TimeUnit.SECONDS), client.dump());
clientBlockLatch.countDown();
}
private void awaitUntil(long millis, Callable<Boolean> test)
{
try
{
if (millis == 0)
{
if (test.call())
return;
}
else
{
long begin = System.nanoTime();
while (System.nanoTime() - begin < TimeUnit.MILLISECONDS.toNanos(millis))
{
if (test.call())
return;
Thread.sleep(10);
}
}
fail("Await elapsed: " + millis + "ms");
}
catch (RuntimeException | Error x)
{
throw x;
}
catch (Exception x)
{
throw new RuntimeException(x);
}
}
}

View File

@ -96,5 +96,9 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -18,13 +18,10 @@
package org.eclipse.jetty.util;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
@ -35,7 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import static org.eclipse.jetty.util.BlockingArrayQueueTest.Await.await;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -533,35 +530,4 @@ public class BlockingArrayQueueTest
assertThat(queue.size(), Matchers.is(0));
assertThat(queue, Matchers.empty());
}
static class Await
{
private Duration duration;
public static Await await()
{
return new Await();
}
public Await atMost(long time, TimeUnit unit)
{
duration = Duration.ofMillis(unit.toMillis(time));
return this;
}
public void until(Callable<Boolean> condition) throws Exception
{
Objects.requireNonNull(duration);
long start = System.nanoTime();
while (true)
{
if (condition.call())
return;
if (duration.minus(Duration.ofNanos(System.nanoTime() - start)).isNegative())
throw new AssertionError("Duration expired");
Thread.sleep(10);
}
}
}
}

View File

@ -1045,6 +1045,12 @@
<artifactId>junit-jupiter</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
<!-- Test Container Deps -->
<dependency>
<groupId>org.testcontainers</groupId>

View File

@ -163,6 +163,10 @@
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -75,12 +75,11 @@ import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static java.nio.ByteBuffer.wrap;
import static org.awaitility.Awaitility.await;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.eclipse.jetty.http.client.Transport.H2C;
import static org.eclipse.jetty.http.client.Transport.HTTP;
@ -398,8 +397,6 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@Tag("Unstable")
@Disabled
public void testAsyncWriteClosed(Transport transport) throws Exception
{
init(transport);
@ -431,7 +428,19 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
// Wait for the failure to arrive to
// the server while we are about to write.
sleep(2000);
try
{
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
out.write(new byte[0]);
// Extract HttpOutput._apiState value from toString.
return !out.toString().split(",")[1].split("=")[1].equals("READY");
});
}
catch (Exception e)
{
throw new AssertionError(e);
}
out.write(data);
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http.client;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -30,8 +31,10 @@ import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -55,6 +58,7 @@ import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.awaitility.Awaitility.await;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -481,10 +485,16 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@Tag("Slow")
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testExpect100ContinueWithDeferredContentRespond100Continue(Transport transport) throws Exception
{
byte[] chunk1 = new byte[]{0, 1, 2, 3};
byte[] chunk2 = new byte[]{4, 5, 6, 7};
byte[] data = new byte[chunk1.length + chunk2.length];
System.arraycopy(chunk1, 0, data, 0, chunk1.length);
System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length);
CountDownLatch serverLatch = new CountDownLatch(1);
AtomicReference<Thread> handlerThread = new AtomicReference<>();
init(transport);
scenario.start(new AbstractHandler()
{
@ -492,18 +502,22 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
handlerThread.set(Thread.currentThread());
// Send 100-Continue and echo the content
IO.copy(request.getInputStream(), response.getOutputStream());
ServletOutputStream outputStream = response.getOutputStream();
DataInputStream inputStream = new DataInputStream(request.getInputStream());
// Block until the 1st chunk is fully received.
byte[] buf1 = new byte[chunk1.length];
inputStream.readFully(buf1);
outputStream.write(buf1);
serverLatch.countDown();
IO.copy(inputStream, outputStream);
}
});
final byte[] chunk1 = new byte[]{0, 1, 2, 3};
final byte[] chunk2 = new byte[]{4, 5, 6, 7};
final byte[] data = new byte[chunk1.length + chunk2.length];
System.arraycopy(chunk1, 0, data, 0, chunk1.length);
System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length);
final CountDownLatch latch = new CountDownLatch(1);
CountDownLatch requestLatch = new CountDownLatch(1);
DeferredContentProvider content = new DeferredContentProvider();
scenario.client.newRequest(scenario.newURI())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
@ -514,20 +528,31 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void onComplete(Result result)
{
assertArrayEquals(data, getContent());
latch.countDown();
requestLatch.countDown();
}
});
Thread.sleep(1000);
// Wait for the handler thread to be blocked in the 1st IO.
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread thread = handlerThread.get();
return thread != null && thread.getState() == Thread.State.WAITING;
});
content.offer(ByteBuffer.wrap(chunk1));
Thread.sleep(1000);
// Wait for the handler thread to be blocked in the 2nd IO.
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread thread = handlerThread.get();
return thread != null && thread.getState() == Thread.State.WAITING;
});
content.offer(ByteBuffer.wrap(chunk2));
content.close();
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ -581,6 +606,7 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
@ArgumentsSource(TransportProvider.class)
public void testExpect100ContinueWithConcurrentDeferredContentRespond100Continue(Transport transport) throws Exception
{
AtomicReference<Thread> handlerThread = new AtomicReference<>();
init(transport);
scenario.start(new AbstractHandler()
{
@ -588,22 +614,22 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
handlerThread.set(Thread.currentThread());
// Send 100-Continue and echo the content
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
final DeferredContentProvider content = new DeferredContentProvider();
byte[] chunk1 = new byte[]{0, 1, 2, 3};
byte[] chunk2 = new byte[]{4, 5, 6, 7};
byte[] data = new byte[chunk1.length + chunk2.length];
System.arraycopy(chunk1, 0, data, 0, chunk1.length);
System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length);
final CountDownLatch latch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);
DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(chunk1));
scenario.client.newRequest(scenario.newURI())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.onRequestHeaders(request ->
{
content.offer(ByteBuffer.wrap(data));
content.close();
})
.content(content)
.send(new BufferingResponseListener()
{
@ -615,6 +641,16 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
}
});
// Wait for the handler thread to be blocked in IO.
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread thread = handlerThread.get();
return thread != null && thread.getState() == Thread.State.WAITING;
});
content.offer(ByteBuffer.wrap(chunk2));
content.close();
assertTrue(latch.await(5, TimeUnit.SECONDS));
}