Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-11.0.x

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-08-12 10:56:01 +02:00
commit 7e39d5434a
17 changed files with 212 additions and 239 deletions

View File

@ -132,6 +132,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kerby</groupId>
<artifactId>kerb-simplekdc</artifactId>

View File

@ -40,7 +40,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -50,7 +49,6 @@ import java.util.function.LongConsumer;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.DispatcherType;
import jakarta.servlet.ReadListener;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServletRequest;
@ -91,7 +89,6 @@ import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -688,48 +685,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testRequestIdleTimeout(Scenario scenario) throws Exception
{
long idleTimeout = 1000;
start(scenario, new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
try
{
baseRequest.setHandled(true);
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
}
catch (InterruptedException x)
{
throw new ServletException(x);
}
}
});
String host = "localhost";
int port = connector.getLocalPort();
assertThrows(TimeoutException.class, () ->
client.newRequest(host, port)
.scheme(scenario.getScheme())
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
.timeout(3 * idleTimeout, TimeUnit.MILLISECONDS)
.send());
// Make another request without specifying the idle timeout, should not fail
ContentResponse response = client.newRequest(host, port)
.scheme(scenario.getScheme())
.timeout(3 * idleTimeout, TimeUnit.MILLISECONDS)
.send();
assertNotNull(response);
assertEquals(200, response.getStatus());
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testSendToIPv6Address(Scenario scenario) throws Exception

View File

@ -32,13 +32,12 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.logging.StacklessLogging;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -213,8 +212,6 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
@Tag("Slow")
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testBadRequestWithSlowRequestRemovesConnection(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
@ -423,8 +420,6 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
@Tag("Slow")
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testIdleConnectionIsClosedOnRemoteClose(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
@ -448,10 +443,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
connector.stop();
// Give the connection some time to process the remote close
TimeUnit.SECONDS.sleep(1);
assertEquals(0, idleConnections.size());
assertEquals(0, activeConnections.size());
await().atMost(5, TimeUnit.SECONDS).until(() -> idleConnections.size() == 0 && activeConnections.size() == 0);
}
@ParameterizedTest

View File

@ -35,8 +35,8 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -91,7 +91,6 @@ public class HttpSenderOverHTTPTest
}
@Test
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testSendNoRequestContentIncompleteFlush() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
@ -105,7 +104,7 @@ public class HttpSenderOverHTTPTest
StringBuilder builder = new StringBuilder(endPoint.takeOutputString());
// Wait for the write to complete
TimeUnit.SECONDS.sleep(1);
await().atMost(5, TimeUnit.SECONDS).until(() -> endPoint.toEndPointString().contains(",flush=P,"));
String chunk = endPoint.takeOutputString();
while (chunk.length() > 0)

View File

@ -24,7 +24,6 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.GZIPOutputStream;
@ -48,7 +47,6 @@ import org.eclipse.jetty.toolchain.test.Net;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
@ -410,47 +408,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest
assertArrayEquals(data, response.getContent());
}
@Test
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testRequestIdleTimeout() throws Exception
{
final long idleTimeout = 1000;
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
try
{
baseRequest.setHandled(true);
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
}
catch (InterruptedException x)
{
throw new ServletException(x);
}
}
});
final String host = "localhost";
final int port = connector.getLocalPort();
assertThrows(TimeoutException.class, () ->
client.newRequest(host, port)
.scheme(scheme)
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
.timeout(3 * idleTimeout, TimeUnit.MILLISECONDS)
.send());
// Make another request without specifying the idle timeout, should not fail
ContentResponse response = client.newRequest(host, port)
.scheme(scheme)
.timeout(3 * idleTimeout, TimeUnit.MILLISECONDS)
.send();
assertNotNull(response);
assertEquals(200, response.getStatus());
}
@Test
public void testConnectionIdleTimeout() throws Exception
{

View File

@ -22,7 +22,6 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
@ -30,7 +29,6 @@ import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -55,23 +53,21 @@ public class SelectorManagerTest
}
@Test
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testConnectTimeoutBeforeSuccessfulConnect() throws Exception
{
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress("localhost", 0));
SocketAddress address = server.getLocalAddress();
final AtomicLong timeoutConnection = new AtomicLong();
final long connectTimeout = 1000;
CountDownLatch connectionFinishedLatch = new CountDownLatch(1);
CountDownLatch failedConnectionLatch = new CountDownLatch(1);
long connectTimeout = 1000;
SelectorManager selectorManager = new SelectorManager(executor, scheduler)
{
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{
SocketChannelEndPoint endPoint = new SocketChannelEndPoint((SocketChannel)channel, selector, key, getScheduler());
endPoint.setIdleTimeout(connectTimeout / 2);
return endPoint;
return new SocketChannelEndPoint((SocketChannel)channel, selector, key, getScheduler());
}
@Override
@ -79,15 +75,17 @@ public class SelectorManagerTest
{
try
{
long timeout = timeoutConnection.get();
if (timeout > 0)
TimeUnit.MILLISECONDS.sleep(timeout);
assertTrue(failedConnectionLatch.await(connectTimeout * 2, TimeUnit.MILLISECONDS));
return super.doFinishConnect(channel);
}
catch (InterruptedException e)
{
return false;
}
finally
{
connectionFinishedLatch.countDown();
}
}
@Override
@ -116,40 +114,36 @@ public class SelectorManagerTest
{
SocketChannel client1 = SocketChannel.open();
client1.configureBlocking(false);
client1.connect(address);
long timeout = connectTimeout * 2;
timeoutConnection.set(timeout);
final CountDownLatch latch1 = new CountDownLatch(1);
assertFalse(client1.connect(address));
selectorManager.connect(client1, new Callback()
{
@Override
public void failed(Throwable x)
{
latch1.countDown();
failedConnectionLatch.countDown();
}
});
assertTrue(latch1.await(connectTimeout * 3, TimeUnit.MILLISECONDS));
assertTrue(failedConnectionLatch.await(connectTimeout * 2, TimeUnit.MILLISECONDS));
assertFalse(client1.isOpen());
// Wait for the first connect to finish, as the selector thread is waiting in finishConnect().
Thread.sleep(timeout);
// Wait for the first connect to finish, as the selector thread is waiting in doFinishConnect().
assertTrue(connectionFinishedLatch.await(5, TimeUnit.SECONDS));
// Verify that after the failure we can connect successfully.
try (SocketChannel client2 = SocketChannel.open())
{
client2.configureBlocking(false);
client2.connect(address);
timeoutConnection.set(0);
final CountDownLatch latch2 = new CountDownLatch(1);
assertFalse(client2.connect(address));
CountDownLatch successfulConnectionLatch = new CountDownLatch(1);
selectorManager.connect(client2, new Callback()
{
@Override
public void succeeded()
{
latch2.countDown();
successfulConnectionLatch.countDown();
}
});
assertTrue(latch2.await(connectTimeout * 5, TimeUnit.MILLISECONDS));
assertTrue(successfulConnectionLatch.await(connectTimeout * 2, TimeUnit.MILLISECONDS));
assertTrue(client2.isOpen());
}
}

View File

@ -34,9 +34,7 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@ -46,8 +44,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConnectionOpenCloseTest extends AbstractHttpTest
{
@Test
@Tag("Slow")
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testOpenClose() throws Exception
{
server.setHandler(new AbstractHandler()
@ -97,8 +93,6 @@ public class ConnectionOpenCloseTest extends AbstractHttpTest
}
@Test
@Tag("Slow")
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testOpenRequestClose() throws Exception
{
server.setHandler(new AbstractHandler()
@ -153,15 +147,13 @@ public class ConnectionOpenCloseTest extends AbstractHttpTest
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
// Wait some time to see if the callbacks are called too many times
TimeUnit.SECONDS.sleep(1);
TimeUnit.MILLISECONDS.sleep(200);
assertEquals(2, callbacks.get());
}
}
@Test
@Tag("Slow")
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testSSLOpenRequestClose() throws Exception
{
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
@ -223,7 +215,7 @@ public class ConnectionOpenCloseTest extends AbstractHttpTest
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
// Wait some time to see if the callbacks are called too many times
TimeUnit.SECONDS.sleep(1);
TimeUnit.MILLISECONDS.sleep(200);
assertEquals(4, callbacks.get());
}

View File

@ -40,7 +40,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.eclipse.jetty.http.HttpHeader.CONTENT_LENGTH;
import static org.eclipse.jetty.http.HttpHeader.CONTENT_TYPE;
@ -277,7 +276,6 @@ public class ResourceHandlerTest
}
@Test
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testSlowBiggest() throws Exception
{
_connector.setIdleTimeout(9000);
@ -307,7 +305,7 @@ public class ResourceHandlerTest
ByteBuffer buffer = null;
while (true)
{
Thread.sleep(25);
Thread.sleep(10);
int len = in.read(array);
if (len < 0)
break;

View File

@ -54,7 +54,6 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -71,10 +70,9 @@ public class ThreadStarvationTest
}
@Test
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testDefaultServletSuccess() throws Exception
{
int maxThreads = 10;
int maxThreads = 6;
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);
@ -86,11 +84,11 @@ public class ThreadStarvationTest
Path resourcePath = Paths.get(directory.getPath(), resourceName);
try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE))
{
byte[] chunk = new byte[1024];
byte[] chunk = new byte[256 * 1024];
Arrays.fill(chunk, (byte)'X');
chunk[chunk.length - 2] = '\r';
chunk[chunk.length - 1] = '\n';
for (int i = 0; i < 256 * 1024; ++i)
for (int i = 0; i < 1024; ++i)
{
output.write(chunk);
}
@ -135,10 +133,9 @@ public class ThreadStarvationTest
"\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
Thread.sleep(100);
}
// Wait for a the servlet to block.
// Wait for a thread on the servlet to block.
assertTrue(writePending.await(5, TimeUnit.SECONDS));
long expected = Files.size(resourcePath);

View File

@ -63,6 +63,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-perf-helper</artifactId>

View File

@ -13,13 +13,10 @@
package org.eclipse.jetty.util;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
@ -30,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import static org.eclipse.jetty.util.BlockingArrayQueueTest.Await.await;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -528,35 +525,4 @@ public class BlockingArrayQueueTest
assertThat(queue.size(), Matchers.is(0));
assertThat(queue, Matchers.empty());
}
static class Await
{
private Duration duration;
public static Await await()
{
return new Await();
}
public Await atMost(long time, TimeUnit unit)
{
duration = Duration.ofMillis(unit.toMillis(time));
return this;
}
public void until(Callable<Boolean> condition) throws Exception
{
Objects.requireNonNull(duration);
long start = System.nanoTime();
while (true)
{
if (condition.call())
return;
if (duration.minus(Duration.ofNanos(System.nanoTime() - start)).isNegative())
throw new AssertionError("Duration expired");
Thread.sleep(10);
}
}
}
}

View File

@ -1172,6 +1172,11 @@
<artifactId>hamcrest</artifactId>
<version>${hamcrest.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>

View File

@ -46,6 +46,11 @@
<artifactId>slf4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-alpn-java-client</artifactId>

View File

@ -60,11 +60,13 @@ import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.logging.StacklessLogging;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpInput.Content;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
@ -74,12 +76,11 @@ import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.compression.InflaterPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static java.nio.ByteBuffer.wrap;
import static org.awaitility.Awaitility.await;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.eclipse.jetty.http.client.Transport.H2C;
import static org.eclipse.jetty.http.client.Transport.HTTP;
@ -398,18 +399,11 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@Tag("Unstable")
@Disabled
public void testAsyncWriteClosed(Transport transport) throws Exception
{
init(transport);
String text = "Now is the winter of our discontent. How Now Brown Cow. The quick brown fox jumped over the lazy dog.\n";
for (int i = 0; i < 10; i++)
{
text = text + text;
}
byte[] data = text.getBytes(StandardCharsets.UTF_8);
byte[] data = new byte[1024];
CountDownLatch errorLatch = new CountDownLatch(1);
scenario.start(new HttpServlet()
@ -431,9 +425,26 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
// Wait for the failure to arrive to
// the server while we are about to write.
sleep(2000);
out.write(data);
try
{
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
try
{
if (out.isReady())
((HttpOutput)out).write(ByteBuffer.wrap(data));
return false;
}
catch (EofException e)
{
return true;
}
});
}
catch (Exception e)
{
throw new AssertionError(e);
}
}
@Override

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.http.client;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -25,9 +26,11 @@ import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.ContinueProtocolHandler;
@ -45,11 +48,10 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.awaitility.Awaitility.await;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -319,36 +321,40 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@Tag("Slow")
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testExpect100ContinueWithContentWithResponseFailureBefore100Continue(Transport transport) throws Exception
{
init(transport);
long idleTimeout = 1000;
AtomicReference<org.eclipse.jetty.client.api.Request> clientRequestRef = new AtomicReference<>();
CountDownLatch clientLatch = new CountDownLatch(1);
CountDownLatch serverLatch = new CountDownLatch(1);
scenario.startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
baseRequest.setHandled(true);
clientRequestRef.get().abort(new Exception("abort!"));
try
{
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
if (!clientLatch.await(5, TimeUnit.SECONDS))
throw new ServletException("Server timed out on client latch");
serverLatch.countDown();
}
catch (InterruptedException x)
catch (InterruptedException e)
{
throw new ServletException(x);
throw new ServletException(e);
}
}
});
scenario.startClient(httpClient -> httpClient.setIdleTimeout(2 * idleTimeout));
scenario.startClient();
byte[] content = new byte[1024];
CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
org.eclipse.jetty.client.api.Request clientRequest = scenario.client.newRequest(scenario.newURI());
clientRequestRef.set(clientRequest);
clientRequest
.headers(headers -> headers.put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE))
.body(new BytesRequestContent(content))
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
.send(new BufferingResponseListener()
{
@Override
@ -357,21 +363,22 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
assertTrue(result.isFailed());
assertNotNull(result.getRequestFailure());
assertNotNull(result.getResponseFailure());
latch.countDown();
clientLatch.countDown();
}
});
assertTrue(latch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@Tag("Slow")
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testExpect100ContinueWithContentWithResponseFailureAfter100Continue(Transport transport) throws Exception
{
init(transport);
long idleTimeout = 1000;
AtomicReference<org.eclipse.jetty.client.api.Request> clientRequestRef = new AtomicReference<>();
CountDownLatch clientLatch = new CountDownLatch(1);
CountDownLatch serverLatch = new CountDownLatch(1);
scenario.startServer(new AbstractHandler()
{
@Override
@ -380,9 +387,12 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
baseRequest.setHandled(true);
// Send 100-Continue and consume the content
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
clientRequestRef.get().abort(new Exception("abort!"));
try
{
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
if (!clientLatch.await(5, TimeUnit.SECONDS))
throw new ServletException("Server timed out on client latch");
serverLatch.countDown();
}
catch (InterruptedException x)
{
@ -390,11 +400,12 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
}
}
});
scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
scenario.startClient();
byte[] content = new byte[1024];
CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
org.eclipse.jetty.client.api.Request clientRequest = scenario.client.newRequest(scenario.newURI());
clientRequestRef.set(clientRequest);
clientRequest
.headers(headers -> headers.put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE))
.body(new BytesRequestContent(content))
.send(new BufferingResponseListener()
@ -405,11 +416,12 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
assertTrue(result.isFailed());
assertNull(result.getRequestFailure());
assertNotNull(result.getResponseFailure());
latch.countDown();
clientLatch.countDown();
}
});
assertTrue(latch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ -474,10 +486,16 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@Tag("Slow")
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testExpect100ContinueWithDeferredContentRespond100Continue(Transport transport) throws Exception
{
byte[] chunk1 = new byte[]{0, 1, 2, 3};
byte[] chunk2 = new byte[]{4, 5, 6, 7};
byte[] data = new byte[chunk1.length + chunk2.length];
System.arraycopy(chunk1, 0, data, 0, chunk1.length);
System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length);
CountDownLatch serverLatch = new CountDownLatch(1);
AtomicReference<Thread> handlerThread = new AtomicReference<>();
init(transport);
scenario.start(new AbstractHandler()
{
@ -485,18 +503,22 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
handlerThread.set(Thread.currentThread());
// Send 100-Continue and echo the content
IO.copy(request.getInputStream(), response.getOutputStream());
ServletOutputStream outputStream = response.getOutputStream();
DataInputStream inputStream = new DataInputStream(request.getInputStream());
// Block until the 1st chunk is fully received.
byte[] buf1 = new byte[chunk1.length];
inputStream.readFully(buf1);
outputStream.write(buf1);
serverLatch.countDown();
IO.copy(inputStream, outputStream);
}
});
byte[] chunk1 = new byte[]{0, 1, 2, 3};
byte[] chunk2 = new byte[]{4, 5, 6, 7};
byte[] data = new byte[chunk1.length + chunk2.length];
System.arraycopy(chunk1, 0, data, 0, chunk1.length);
System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch requestLatch = new CountDownLatch(1);
AsyncRequestContent content = new AsyncRequestContent();
scenario.client.newRequest(scenario.newURI())
.headers(headers -> headers.put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE))
@ -507,28 +529,38 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void onComplete(Result result)
{
assertArrayEquals(data, getContent());
latch.countDown();
requestLatch.countDown();
}
});
Thread.sleep(1000);
// Wait for the handler thread to be blocked in the 1st IO.
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread thread = handlerThread.get();
return thread != null && thread.getState() == Thread.State.WAITING;
});
content.offer(ByteBuffer.wrap(chunk1));
Thread.sleep(1000);
// Wait for the handler thread to be blocked in the 2nd IO.
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread thread = handlerThread.get();
return thread != null && thread.getState() == Thread.State.WAITING;
});
content.offer(ByteBuffer.wrap(chunk2));
content.close();
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@Tag("Slow")
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testExpect100ContinueWithInitialAndDeferredContentRespond100Continue(Transport transport) throws Exception
{
AtomicReference<Thread> handlerThread = new AtomicReference<>();
init(transport);
scenario.start(new AbstractHandler()
{
@ -536,6 +568,7 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
handlerThread.set(Thread.currentThread());
// Send 100-Continue and echo the content
IO.copy(request.getInputStream(), response.getOutputStream());
}
@ -562,7 +595,12 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
}
});
Thread.sleep(1000);
// Wait for the handler thread to be blocked in IO.
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread thread = handlerThread.get();
return thread != null && thread.getState() == Thread.State.WAITING;
});
content.offer(ByteBuffer.wrap(chunk2));
content.close();

View File

@ -58,7 +58,6 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -666,7 +665,6 @@ public class HttpClientStreamTest extends AbstractTest<TransportScenario>
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testUploadWithDeferredContentProviderFromInputStream(Transport transport) throws Exception
{
init(transport);
@ -680,20 +678,22 @@ public class HttpClientStreamTest extends AbstractTest<TransportScenario>
}
});
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch requestSentLatch = new CountDownLatch(1);
CountDownLatch responseLatch = new CountDownLatch(1);
try (AsyncRequestContent content = new AsyncRequestContent())
{
scenario.client.newRequest(scenario.newURI())
.scheme(scenario.getScheme())
.body(content)
.onRequestCommit((request) -> requestSentLatch.countDown())
.send(result ->
{
if (result.isSucceeded() && result.getResponse().getStatus() == 200)
latch.countDown();
responseLatch.countDown();
});
// Make sure we provide the content *after* the request has been "sent".
Thread.sleep(1000);
assertTrue(requestSentLatch.await(5, TimeUnit.SECONDS));
try (ByteArrayInputStream input = new ByteArrayInputStream(new byte[1024]))
{
@ -705,7 +705,7 @@ public class HttpClientStreamTest extends AbstractTest<TransportScenario>
}
}
}
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest

View File

@ -22,10 +22,12 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServlet;
@ -781,6 +783,58 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testRequestIdleTimeout(Transport transport) throws Exception
{
init(transport);
CountDownLatch latch = new CountDownLatch(1);
long idleTimeout = 500;
scenario.start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
try
{
baseRequest.setHandled(true);
if (target.equals("/1"))
assertTrue(latch.await(5, TimeUnit.SECONDS));
else if (target.equals("/2"))
Thread.sleep(2 * idleTimeout);
else
fail("Unknown path: " + target);
}
catch (InterruptedException x)
{
throw new ServletException(x);
}
}
});
String host = "localhost";
int port = scenario.getNetworkConnectorLocalPortInt().get();
assertThrows(TimeoutException.class, () ->
scenario.client.newRequest(host, port)
.scheme(scenario.getScheme())
.path("/1")
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
.timeout(2 * idleTimeout, TimeUnit.MILLISECONDS)
.send());
latch.countDown();
// Make another request without specifying the idle timeout, should not fail
ContentResponse response = scenario.client.newRequest(host, port)
.scheme(scenario.getScheme())
.path("/2")
.timeout(3 * idleTimeout, TimeUnit.MILLISECONDS)
.send();
assertNotNull(response);
assertEquals(200, response.getStatus());
}
private void sleep(long time) throws IOException
{
try