Moving from ee9 to core more client transport tests.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-08-18 10:40:48 +02:00
parent d001d95d9f
commit 8e68505524
25 changed files with 1003 additions and 1337 deletions

View File

@ -180,13 +180,21 @@ public class HttpClient extends ContainerLifeCycle
}
/**
* @return the {@link SslContextFactory} that manages TLS encryption
* @return the {@link SslContextFactory.Client} that manages TLS encryption
*/
public SslContextFactory.Client getSslContextFactory()
{
return connector.getSslContextFactory();
}
/**
* @param sslContextFactory the {@link SslContextFactory.Client} that manages TLS encryption
*/
public void setSslContextFactory(SslContextFactory.Client sslContextFactory)
{
connector.setSslContextFactory(sslContextFactory);
}
@Override
protected void doStart() throws Exception
{

View File

@ -185,22 +185,14 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
_scheduler = scheduler != null ? scheduler : new ScheduledExecutorScheduler(String.format("Connector-Scheduler-%x", hashCode()), false);
addBean(_scheduler);
synchronized (server)
if (pool == null)
{
// Look for (and cache) a common pool on the server
pool = server.getBean(ByteBufferPool.class);
if (pool == null)
{
// Look for (and cache) a common pool on the server
pool = server.getBean(ByteBufferPool.class);
if (pool == null)
{
pool = new LogarithmicArrayByteBufferPool();
server.addBean(pool, true);
}
addBean(pool, false);
}
else
{
addBean(pool, true);
pool = new LogarithmicArrayByteBufferPool();
server.addBean(pool, true);
}
}
_byteBufferPool = pool;

View File

@ -27,15 +27,18 @@ import org.eclipse.jetty.fcgi.server.ServerFCGIConnectionFactory;
import org.eclipse.jetty.http2.HTTP2Cipher;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.transport.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.client.transport.HttpClientTransportOverHTTP3;
import org.eclipse.jetty.http3.server.AbstractHTTP3ServerConnectionFactory;
import org.eclipse.jetty.http3.server.HTTP3ServerConnectionFactory;
import org.eclipse.jetty.http3.server.HTTP3ServerConnector;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.quic.server.QuicServerConnector;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HostHeaderCustomizer;
import org.eclipse.jetty.server.HttpConfiguration;
@ -59,7 +62,7 @@ public class AbstractTest
protected final HttpConfiguration httpConfig = new HttpConfiguration();
protected SslContextFactory.Server sslContextFactoryServer;
protected Server server;
protected Connector connector;
protected AbstractConnector connector;
protected HttpClient client;
protected Path unixDomainPath;
@ -97,14 +100,20 @@ public class AbstractTest
Files.delete(unixDomainPath);
}
sslContextFactoryServer = newSslContextFactoryServer();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
if (server == null)
server = newServer();
connector = newConnector(transport, server);
server.addConnector(connector);
server.setHandler(handler);
}
protected Server newServer()
{
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
return new Server(serverThreads);
}
protected SslContextFactory.Server newSslContextFactoryServer()
{
SslContextFactory.Server ssl = new SslContextFactory.Server();
@ -125,7 +134,7 @@ public class AbstractTest
client.start();
}
public Connector newConnector(Transport transport, Server server)
public AbstractConnector newConnector(Transport transport, Server server)
{
return switch (transport)
{
@ -238,6 +247,37 @@ public class AbstractTest
return URI.create(uri);
}
protected void setStreamIdleTimeout(long idleTimeout)
{
AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class);
if (h2 != null)
{
h2.setStreamIdleTimeout(idleTimeout);
}
else
{
AbstractHTTP3ServerConnectionFactory h3 = connector.getConnectionFactory(AbstractHTTP3ServerConnectionFactory.class);
if (h3 != null)
h3.getHTTP3Configuration().setStreamIdleTimeout(idleTimeout);
else
connector.setIdleTimeout(idleTimeout);
}
}
protected void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class);
if (h2 != null)
{
h2.setMaxConcurrentStreams(maxRequestsPerConnection);
}
else
{
if (connector instanceof QuicServerConnector)
((QuicServerConnector)connector).getQuicConfiguration().setMaxBidirectionalRemoteStreams(maxRequestsPerConnection);
}
}
public enum Transport
{
HTTP, HTTPS, H2C, H2, H3, FCGI, UNIX_DOMAIN;
@ -250,5 +290,14 @@ public class AbstractTest
case HTTPS, H2, H3 -> true;
};
}
public boolean isMultiplexed()
{
return switch (this)
{
case HTTP, HTTPS, FCGI, UNIX_DOMAIN -> false;
case H2C, H2, H3 -> true;
};
}
}
}

View File

@ -0,0 +1,19 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.test.client.transport;
// TODO: write 100 Continue tests for Handler (not Servlet) functionality.
public class Continue100Test
{
}

View File

@ -0,0 +1,28 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.test.client.transport;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
public class EmptyServerHandler extends Handler.Processor
{
@Override
public void process(Request request, Response response, Callback callback)
{
callback.succeeded();
}
}

View File

@ -43,9 +43,6 @@ import org.eclipse.jetty.http3.client.transport.internal.HttpChannelOverHTTP3;
import org.eclipse.jetty.http3.client.transport.internal.HttpConnectionOverHTTP3;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.params.ParameterizedTest;
@ -59,14 +56,7 @@ public class HttpChannelAssociationTest extends AbstractTest
@MethodSource("transports")
public void testAssociationFailedAbortsRequest(Transport transport) throws Exception
{
startServer(transport, new Handler.Processor()
{
@Override
public void process(org.eclipse.jetty.server.Request request, Response response, Callback callback)
{
callback.succeeded();
}
});
startServer(transport, new EmptyServerHandler());
client = new HttpClient(newHttpClientTransport(transport, exchange -> false));
QueuedThreadPool clientThreads = new QueuedThreadPool();
@ -89,14 +79,7 @@ public class HttpChannelAssociationTest extends AbstractTest
@MethodSource("transports")
public void testIdleTimeoutJustBeforeAssociation(Transport transport) throws Exception
{
startServer(transport, new Handler.Processor()
{
@Override
public void process(org.eclipse.jetty.server.Request request, Response response, Callback callback)
{
callback.succeeded();
}
});
startServer(transport, new EmptyServerHandler());
long idleTimeout = 1000;
client = new HttpClient(newHttpClientTransport(transport, exchange ->

View File

@ -22,9 +22,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@ -45,17 +42,8 @@ public class HttpClientConnectTimeoutTest extends AbstractTest
int connectTimeout = 1000;
assumeTrue(connectTimeout(host, port, connectTimeout));
start(transport, new Handler.Processor()
{
@Override
public void process(org.eclipse.jetty.server.Request request, Response response, Callback callback) throws Exception
{
callback.succeeded();
}
});
client.stop();
start(transport, new EmptyServerHandler());
client.setConnectTimeout(connectTimeout);
client.start();
CountDownLatch latch = new CountDownLatch(1);
Request request = client.newRequest(host, port);
@ -78,17 +66,8 @@ public class HttpClientConnectTimeoutTest extends AbstractTest
int connectTimeout = 2000;
assumeTrue(connectTimeout(host, port, connectTimeout));
start(transport, new Handler.Processor()
{
@Override
public void process(org.eclipse.jetty.server.Request request, Response response, Callback callback) throws Exception
{
callback.succeeded();
}
});
client.stop();
start(transport, new EmptyServerHandler());
client.setConnectTimeout(connectTimeout);
client.start();
AtomicInteger completes = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(2);
@ -114,17 +93,8 @@ public class HttpClientConnectTimeoutTest extends AbstractTest
int connectTimeout = 1000;
assumeTrue(connectTimeout(host, port, connectTimeout));
start(transport, new Handler.Processor()
{
@Override
public void process(org.eclipse.jetty.server.Request request, Response response, Callback callback) throws Exception
{
callback.succeeded();
}
});
client.stop();
start(transport, new EmptyServerHandler());
client.setConnectTimeout(connectTimeout);
client.start();
CountDownLatch latch = new CountDownLatch(1);
Request request = client.newRequest(host, port);

View File

@ -454,14 +454,7 @@ public class HttpClientDemandTest extends AbstractTest
@MethodSource("transports")
public void testDelayedBeforeContentDemandWithNoResponseContent(Transport transport) throws Exception
{
start(transport, new Handler.Processor()
{
@Override
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
callback.succeeded();
}
});
start(transport, new EmptyServerHandler());
AtomicReference<LongConsumer> beforeContentDemandRef = new AtomicReference<>();
CountDownLatch beforeContentLatch = new CountDownLatch(1);

View File

@ -11,57 +11,45 @@
// ========================================================================
//
package org.eclipse.jetty.ee9.http.client;
package org.eclipse.jetty.test.client.transport;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpClientIdleTimeoutTest extends AbstractTest<TransportScenario>
public class HttpClientIdleTimeoutTest extends AbstractTest
{
private final long idleTimeout = 1000;
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testClientIdleTimeout(Transport transport) throws Exception
{
init(transport);
scenario.startServer(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
public void process(Request request, Response response, Callback callback) throws Exception
{
baseRequest.setHandled(true);
if (target.equals("/timeout"))
{
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
}
// Do not succeed the callback if it's a timeout request.
if (!request.getPathInContext().equals("/timeout"))
callback.succeeded();
}
});
scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
client.setIdleTimeout(idleTimeout);
final CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
CountDownLatch latch = new CountDownLatch(1);
client.newRequest(newURI(transport))
.path("/timeout")
.send(result ->
{
@ -72,31 +60,29 @@ public class HttpClientIdleTimeoutTest extends AbstractTest<TransportScenario>
assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Verify that after the timeout we can make another request.
ContentResponse response = scenario.client.newRequest(scenario.newURI()).send();
ContentResponse response = client.newRequest(newURI(transport))
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testRequestIdleTimeout(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
public void process(Request request, Response response, Callback callback) throws Exception
{
baseRequest.setHandled(true);
if (target.equals("/timeout"))
{
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
}
// Do not succeed the callback if it's a timeout request.
if (!request.getPathInContext().equals("/timeout"))
callback.succeeded();
}
});
final CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
CountDownLatch latch = new CountDownLatch(1);
client.newRequest(newURI(transport))
.path("/timeout")
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
.send(result ->
@ -108,46 +94,46 @@ public class HttpClientIdleTimeoutTest extends AbstractTest<TransportScenario>
assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Verify that after the timeout we can make another request.
ContentResponse response = scenario.client.newRequest(scenario.newURI()).send();
ContentResponse response = client.newRequest(newURI(transport))
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testIdleClientIdleTimeout(Transport transport) throws Exception
{
init(transport);
scenario.startServer(new EmptyServerHandler());
scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
start(transport, new EmptyServerHandler());
client.setIdleTimeout(idleTimeout);
// Make a first request to open a connection.
ContentResponse response = scenario.client.newRequest(scenario.newURI()).send();
ContentResponse response = client.newRequest(newURI(transport)).send();
assertEquals(HttpStatus.OK_200, response.getStatus());
// Let the connection idle timeout.
Thread.sleep(2 * idleTimeout);
// Verify that after the timeout we can make another request.
response = scenario.client.newRequest(scenario.newURI()).send();
response = client.newRequest(newURI(transport)).send();
assertEquals(HttpStatus.OK_200, response.getStatus());
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testIdleServerIdleTimeout(Transport transport) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler());
scenario.setConnectionIdleTimeout(idleTimeout);
start(transport, new EmptyServerHandler());
connector.setIdleTimeout(idleTimeout);
ContentResponse response1 = scenario.client.newRequest(scenario.newURI()).send();
ContentResponse response1 = client.newRequest(newURI(transport)).send();
assertEquals(HttpStatus.OK_200, response1.getStatus());
// Let the server idle timeout.
Thread.sleep(2 * idleTimeout);
// Make sure we can make another request successfully.
ContentResponse response2 = scenario.client.newRequest(scenario.newURI()).send();
ContentResponse response2 = client.newRequest(newURI(transport)).send();
assertEquals(HttpStatus.OK_200, response2.getStatus());
}
}

View File

@ -11,10 +11,8 @@
// ========================================================================
//
package org.eclipse.jetty.ee9.http.client;
package org.eclipse.jetty.test.client.transport;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -27,9 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.LeakTrackingConnectionPool;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
@ -39,62 +34,68 @@ import org.eclipse.jetty.client.util.BytesRequestContent;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http3.server.HTTP3ServerConnector;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
import org.eclipse.jetty.io.LogarithmicArrayByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.unixdomain.server.UnixDomainServerConnector;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTransportScenario>
public class HttpClientLoadTest extends AbstractTest
{
private final Logger logger = LoggerFactory.getLogger(HttpClientLoadTest.class);
private final AtomicLong requestCount = new AtomicLong();
private final AtomicLong connectionLeaks = new AtomicLong();
@Override
public void init(Transport transport) throws IOException
{
setScenario(new LoadTransportScenario(transport, connectionLeaks));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testIterative(Transport transport) throws Exception
{
// TODO: cannot run HTTP/3 (or UDP) in Jenkins.
if ("ci".equals(System.getProperty("env")))
Assumptions.assumeTrue(transport != Transport.H3);
init(transport);
scenario.start(new LoadHandler(), client ->
server = newServer();
server.addBean(new LeakTrackingByteBufferPool(new LogarithmicArrayByteBufferPool()));
start(transport, new LoadHandler());
setStreamIdleTimeout(120000);
client.stop();
client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
client.setMaxConnectionsPerDestination(32768);
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
client.setIdleTimeout(120000);
switch (transport)
{
client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
client.setMaxConnectionsPerDestination(32768);
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
});
scenario.setConnectionIdleTimeout(120000);
scenario.setRequestIdleTimeout(120000);
scenario.client.setIdleTimeout(120000);
case HTTP, HTTPS, FCGI, UNIX_DOMAIN ->
{
// Track connection leaking only for non-multiplexed transports.
client.getTransport().setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
{
@Override
protected void leaked(LeakDetector<Connection>.LeakInfo leakInfo)
{
super.leaked(leakInfo);
connectionLeaks.incrementAndGet();
}
});
}
}
client.start();
// At least 25k requests to warmup properly (use -XX:+PrintCompilation to verify JIT activity)
int runs = 1;
@ -115,19 +116,18 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testConcurrent(Transport transport) throws Exception
{
// TODO: cannot run HTTP/3 (or UDP) in Jenkins.
Assumptions.assumeTrue(transport != Transport.H3);
init(transport);
scenario.start(new LoadHandler(), client ->
{
client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
client.setMaxConnectionsPerDestination(32768);
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
});
start(transport, new LoadHandler());
client.stop();
client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
client.setMaxConnectionsPerDestination(32768);
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
client.start();
int runs = 1;
int iterations = 128;
@ -142,20 +142,18 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
{
System.gc();
ByteBufferPool byteBufferPool = scenario.connector.getByteBufferPool();
if (byteBufferPool instanceof LeakTrackingByteBufferPool)
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
if (byteBufferPool instanceof LeakTrackingByteBufferPool serverBufferPool)
{
LeakTrackingByteBufferPool serverBufferPool = (LeakTrackingByteBufferPool)byteBufferPool;
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Server BufferPool - leaked removes", serverBufferPool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L));
}
byteBufferPool = scenario.client.getByteBufferPool();
if (byteBufferPool instanceof LeakTrackingByteBufferPool)
byteBufferPool = client.getByteBufferPool();
if (byteBufferPool instanceof LeakTrackingByteBufferPool clientBufferPool)
{
LeakTrackingByteBufferPool clientBufferPool = (LeakTrackingByteBufferPool)byteBufferPool;
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Client BufferPool - leaked removes", clientBufferPool.getLeakedRemoves(), Matchers.is(0L));
@ -175,18 +173,18 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
// Dumps the state of the client if the test takes too long
Thread testThread = Thread.currentThread();
long maxTime = Math.max(60000, (long)iterations * factor);
Scheduler.Task task = scenario.client.getScheduler().schedule(() ->
Scheduler.Task task = client.getScheduler().schedule(() ->
{
logger.warn("Interrupting test, it is taking too long (maxTime={} ms){}{}{}{}", maxTime,
System.lineSeparator(), scenario.server.dump(),
System.lineSeparator(), scenario.client.dump());
System.lineSeparator(), server.dump(),
System.lineSeparator(), client.dump());
testThread.interrupt();
}, maxTime, TimeUnit.MILLISECONDS);
long begin = NanoTime.now();
for (int i = 0; i < iterations; ++i)
{
test(latch, failures);
test(transport, latch, failures);
// test("http", "localhost", "GET", false, false, 64 * 1024, false, latch, failures);
}
long end = NanoTime.now();
@ -202,7 +200,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
assertTrue(failures.isEmpty(), failures.toString());
}
private void test(CountDownLatch latch, List<String> failures)
private void test(Transport transport, CountDownLatch latch, List<String> failures)
{
ThreadLocalRandom random = ThreadLocalRandom.current();
// Choose a random destination
@ -210,7 +208,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
// Choose a random method
HttpMethod method = random.nextBoolean() ? HttpMethod.GET : HttpMethod.POST;
boolean ssl = scenario.transport.isTlsBased();
boolean ssl = transport.isSecure();
// Choose randomly whether to close the connection on the client or on the server
boolean clientClose = !ssl && random.nextInt(100) < 5;
@ -223,14 +221,16 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
int maxContentLength = 64 * 1024;
int contentLength = random.nextInt(maxContentLength) + 1;
test(scenario.getScheme(), host, method.asString(), clientClose, serverClose, clientTimeout, contentLength, true, latch, failures);
String uri = (ssl ? "https" : "http") + "://" + host;
if (connector instanceof NetworkConnector networkConnector)
uri += ":" + networkConnector.getLocalPort();
test(uri, method.asString(), clientClose, serverClose, clientTimeout, contentLength, true, latch, failures);
}
private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, long clientTimeout, int contentLength, boolean checkContentLength, CountDownLatch latch, List<String> failures)
private void test(String uri, String method, boolean clientClose, boolean serverClose, long clientTimeout, int contentLength, boolean checkContentLength, CountDownLatch latch, List<String> failures)
{
long requestId = requestCount.incrementAndGet();
Request request = scenario.client.newRequest(host, scenario.getServerPort().orElse(0))
.scheme(scheme)
Request request = client.newRequest(uri)
.path("/" + requestId)
.method(method);
@ -247,13 +247,12 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
switch (method)
{
case "GET":
request.headers(headers -> headers.put("X-Download", String.valueOf(contentLength)));
break;
case "POST":
case "GET" -> request.headers(headers -> headers.put("X-Download", String.valueOf(contentLength)));
case "POST" ->
{
request.headers(headers -> headers.put("X-Upload", String.valueOf(contentLength)));
request.body(new BytesRequestContent(new byte[contentLength]));
break;
}
}
CountDownLatch requestLatch = new CountDownLatch(1);
@ -302,19 +301,19 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
}
});
int maxTime = 30000;
if (!await(requestLatch, maxTime, TimeUnit.MILLISECONDS))
if (!await(requestLatch, maxTime))
{
logger.warn("Request {} took too long (maxTime={} ms){}{}{}{}", requestId, maxTime,
System.lineSeparator(), scenario.server.dump(),
System.lineSeparator(), scenario.client.dump());
System.lineSeparator(), server.dump(),
System.lineSeparator(), client.dump());
}
}
private boolean await(CountDownLatch latch, long time, TimeUnit unit)
private boolean await(CountDownLatch latch, long timeMs)
{
try
{
return latch.await(time, unit);
return latch.await(timeMs, TimeUnit.MILLISECONDS);
}
catch (InterruptedException x)
{
@ -322,119 +321,38 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
}
}
private static class LoadHandler extends AbstractHandler
private static class LoadHandler extends Handler.Processor
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(org.eclipse.jetty.server.Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
baseRequest.setHandled(true);
String timeout = request.getHeader("X-Timeout");
String timeout = request.getHeaders().get("X-Timeout");
if (timeout != null)
sleep(2L * Integer.parseInt(timeout));
Thread.sleep(2L * Integer.parseInt(timeout));
String method = request.getMethod().toUpperCase(Locale.ENGLISH);
switch (method)
{
case "GET":
case "GET" ->
{
int contentLength = request.getIntHeader("X-Download");
if (contentLength > 0)
ByteBuffer content = BufferUtil.EMPTY_BUFFER;
int contentLength = (int)request.getHeaders().getLongField("X-Download");
if (contentLength >= 0)
{
response.setHeader("X-Content", String.valueOf(contentLength));
response.getOutputStream().write(new byte[contentLength]);
response.getHeaders().putLongField("X-Content", contentLength);
content = ByteBuffer.allocate(contentLength);
}
break;
response.write(true, content, callback);
}
case "POST":
case "POST" ->
{
response.setHeader("X-Content", request.getHeader("X-Upload"));
IO.copy(request.getInputStream(), response.getOutputStream());
break;
response.getHeaders().putLongField("X-Content", request.getHeaders().getLongField("X-Upload"));
Content.copy(request, response, callback);
}
}
if (Boolean.parseBoolean(request.getHeader("X-Close")))
response.setHeader("Connection", "close");
}
private void sleep(long time) throws InterruptedIOException
{
try
{
Thread.sleep(time);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
}
public static class LoadTransportScenario extends TransportScenario
{
private final AtomicLong connectionLeaks;
public LoadTransportScenario(Transport transport, AtomicLong connectionLeaks) throws IOException
{
super(transport);
this.connectionLeaks = connectionLeaks;
}
@Override
public Connector newServerConnector(Server server)
{
int selectors = Math.min(1, ProcessorUtils.availableProcessors() / 2);
ByteBufferPool byteBufferPool = new ArrayByteBufferPool();
byteBufferPool = new LeakTrackingByteBufferPool(byteBufferPool);
switch (transport)
{
case HTTP:
case HTTPS:
case H2C:
case H2:
case FCGI:
return new ServerConnector(server, null, null, byteBufferPool, 1, selectors, provideServerConnectionFactory(transport));
case H3:
return new HTTP3ServerConnector(server, null, null, byteBufferPool, sslContextFactory, provideServerConnectionFactory(transport));
case UNIX_DOMAIN:
UnixDomainServerConnector unixSocketConnector = new UnixDomainServerConnector(server, null, null, byteBufferPool, 1, selectors, provideServerConnectionFactory(transport));
unixSocketConnector.setUnixDomainPath(unixDomainPath);
return unixSocketConnector;
default:
throw new IllegalStateException();
}
}
@Override
public HttpClientTransport provideClientTransport(Transport transport, SslContextFactory.Client sslContextFactory)
{
HttpClientTransport clientTransport = super.provideClientTransport(transport, sslContextFactory);
switch (transport)
{
case HTTP:
case HTTPS:
case FCGI:
case UNIX_DOMAIN:
{
// Track connection leaking only for non-multiplexed transports.
clientTransport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination)
{
@Override
protected void leaked(LeakDetector<Connection>.LeakInfo leakInfo)
{
super.leaked(leakInfo);
connectionLeaks.incrementAndGet();
}
});
break;
}
default:
{
break;
}
}
return clientTransport;
if (Boolean.parseBoolean(request.getHeaders().get("X-Close")))
response.getHeaders().put(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
}
}
}

View File

@ -11,11 +11,13 @@
// ========================================================================
//
package org.eclipse.jetty.ee9.http.client;
package org.eclipse.jetty.test.client.transport;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;
@ -27,12 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
@ -43,23 +39,23 @@ import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http3.client.transport.HttpClientTransportOverHTTP3;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.Net;
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.MethodSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@ -70,32 +66,26 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
public class HttpClientTest extends AbstractTest<TransportScenario>
public class HttpClientTest extends AbstractTest
{
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testRequestWithoutResponseContent(Transport transport) throws Exception
{
init(transport);
final int status = HttpStatus.NO_CONTENT_204;
scenario.start(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
baseRequest.setHandled(true);
response.setStatus(status);
callback.succeeded();
}
});
ContentResponse response = scenario.client.newRequest(scenario.newURI())
ContentResponse response = client.newRequest(newURI(transport))
.timeout(5, TimeUnit.SECONDS)
.send();
@ -104,37 +94,34 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testRequestWithSmallResponseContent(Transport transport) throws Exception
{
init(transport);
testRequestWithResponseContent(1024);
testRequestWithResponseContent(transport, 1024);
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testRequestWithLargeResponseContent(Transport transport) throws Exception
{
init(transport);
testRequestWithResponseContent(1024 * 1024);
testRequestWithResponseContent(transport, 1024 * 1024);
}
private void testRequestWithResponseContent(int length) throws Exception
private void testRequestWithResponseContent(Transport transport, int length) throws Exception
{
final byte[] bytes = new byte[length];
new Random().nextBytes(bytes);
scenario.start(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
baseRequest.setHandled(true);
response.setContentLength(length);
response.getOutputStream().write(bytes);
response.getHeaders().putLongField(HttpHeader.CONTENT_LENGTH, length);
response.write(true, ByteBuffer.wrap(bytes), callback);
}
});
org.eclipse.jetty.client.api.Request request = scenario.client.newRequest(scenario.newURI());
org.eclipse.jetty.client.api.Request request = client.newRequest(newURI(transport));
FutureResponseListener listener = new FutureResponseListener(request, length);
request.timeout(10, TimeUnit.SECONDS).send(listener);
ContentResponse response = listener.get();
@ -144,22 +131,20 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testRequestWithSmallResponseContentChunked(Transport transport) throws Exception
{
init(transport);
testRequestWithResponseContentChunked(512);
testRequestWithResponseContentChunked(transport, 512);
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testRequestWithLargeResponseContentChunked(Transport transport) throws Exception
{
init(transport);
testRequestWithResponseContentChunked(512 * 512);
testRequestWithResponseContentChunked(transport, 512 * 512);
}
private void testRequestWithResponseContentChunked(int length) throws Exception
private void testRequestWithResponseContentChunked(Transport transport, int length) throws Exception
{
final byte[] chunk1 = new byte[length];
final byte[] chunk2 = new byte[length];
@ -169,20 +154,17 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
byte[] bytes = new byte[chunk1.length + chunk2.length];
System.arraycopy(chunk1, 0, bytes, 0, chunk1.length);
System.arraycopy(chunk2, 0, bytes, chunk1.length, chunk2.length);
scenario.start(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
baseRequest.setHandled(true);
ServletOutputStream output = response.getOutputStream();
output.write(chunk1);
output.flush();
output.write(chunk2);
response.write(false, ByteBuffer.wrap(chunk1), Callback.NOOP);
response.write(true, ByteBuffer.wrap(chunk2), callback);
}
});
org.eclipse.jetty.client.api.Request request = scenario.client.newRequest(scenario.newURI());
org.eclipse.jetty.client.api.Request request = client.newRequest(newURI(transport));
FutureResponseListener listener = new FutureResponseListener(request, 2 * length);
request.timeout(10, TimeUnit.SECONDS).send(listener);
ContentResponse response = listener.get();
@ -192,49 +174,45 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testUploadZeroLengthWithoutResponseContent(Transport transport) throws Exception
{
init(transport);
testUploadWithoutResponseContent(0);
testUploadWithoutResponseContent(transport, 0);
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testUploadSmallWithoutResponseContent(Transport transport) throws Exception
{
init(transport);
testUploadWithoutResponseContent(1024);
testUploadWithoutResponseContent(transport, 1024);
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testUploadLargeWithoutResponseContent(Transport transport) throws Exception
{
init(transport);
testUploadWithoutResponseContent(1024 * 1024);
testUploadWithoutResponseContent(transport, 1024 * 1024);
}
private void testUploadWithoutResponseContent(int length) throws Exception
private void testUploadWithoutResponseContent(Transport transport, int length) throws Exception
{
final byte[] bytes = new byte[length];
new Random().nextBytes(bytes);
scenario.start(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
InputStream input = Request.asInputStream(request);
for (byte b : bytes)
{
assertEquals(b & 0xFF, input.read());
Assertions.assertEquals(b & 0xFF, input.read());
}
assertEquals(-1, input.read());
Assertions.assertEquals(-1, input.read());
}
});
ContentResponse response = scenario.client.newRequest(scenario.newURI())
ContentResponse response = client.newRequest(newURI(transport))
.method(HttpMethod.POST)
.body(new BytesRequestContent(bytes))
.timeout(15, TimeUnit.SECONDS)
@ -245,35 +223,43 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testClientManyWritesSlowServer(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
baseRequest.setHandled(true);
long sleep = 1024;
long total = 0;
ServletInputStream input = request.getInputStream();
byte[] buffer = new byte[1024];
while (true)
{
int read = input.read(buffer);
if (read < 0)
break;
total += read;
Content.Chunk chunk = request.read();
if (chunk == null)
{
try (Blocker.Runnable blocker = Blocker.runnable())
{
request.demand(blocker);
blocker.block();
continue;
}
}
if (chunk instanceof Content.Chunk.Error error)
throw IO.rethrow(error.getCause());
total += chunk.remaining();
if (total >= sleep)
{
sleep(250);
sleep += 256;
}
chunk.release();
if (chunk.isLast())
break;
}
response.getOutputStream().print(total);
Content.Sink.write(response, true, String.valueOf(total), callback);
}
});
@ -281,7 +267,7 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
int chunkSize = 16;
byte[][] bytes = IntStream.range(0, chunks).mapToObj(x -> new byte[chunkSize]).toArray(byte[][]::new);
BytesRequestContent content = new BytesRequestContent("application/octet-stream", bytes);
ContentResponse response = scenario.client.newRequest(scenario.newURI())
ContentResponse response = client.newRequest(newURI(transport))
.method(HttpMethod.POST)
.body(content)
.timeout(15, TimeUnit.SECONDS)
@ -292,29 +278,22 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testRequestAfterFailedRequest(Transport transport) throws Exception
{
init(transport);
int length = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
scenario.start(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
try
{
baseRequest.setHandled(true);
response.getOutputStream().write(new byte[length]);
}
catch (IOException ignored)
{
}
response.write(true, ByteBuffer.allocate(length), Callback.NOOP);
callback.succeeded();
}
});
// Make a request with a large enough response buffer.
org.eclipse.jetty.client.api.Request request = scenario.client.newRequest(scenario.newURI());
org.eclipse.jetty.client.api.Request request = client.newRequest(newURI(transport));
FutureResponseListener listener = new FutureResponseListener(request, length);
request.send(listener);
ContentResponse response = listener.get(15, TimeUnit.SECONDS);
@ -323,7 +302,7 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
// Make a request with a small response buffer, should fail.
try
{
request = scenario.client.newRequest(scenario.newURI());
request = client.newRequest(newURI(transport));
listener = new FutureResponseListener(request, length / 10);
request.send(listener);
listener.get(15, TimeUnit.SECONDS);
@ -335,7 +314,7 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
// Verify that we can make another request.
request = scenario.client.newRequest(scenario.newURI());
request = client.newRequest(newURI(transport));
listener = new FutureResponseListener(request, length);
request.send(listener);
response = listener.get(15, TimeUnit.SECONDS);
@ -343,64 +322,58 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testClientCannotValidateServerCertificate(Transport transport) throws Exception
{
init(transport);
// Only run this test for transports over TLS.
Assumptions.assumeTrue(scenario.transport.isTlsBased());
assumeTrue(transport.isSecure());
scenario.startServer(new EmptyServerHandler());
start(transport, new EmptyServerHandler());
// Disable validations on the server to be sure
// that the test failure happens during the
// validation of the certificate on the client.
scenario.httpConfig.getCustomizer(SecureRequestCustomizer.class).setSniHostCheck(false);
httpConfig.getCustomizer(SecureRequestCustomizer.class).setSniHostCheck(false);
// Use a default SslContextFactory, requests should fail because the server certificate is unknown.
SslContextFactory.Client clientTLS = scenario.newClientSslContextFactory();
// Use a SslContextFactory.Client that verifies server certificates,
// requests should fail because the server certificate is unknown.
SslContextFactory.Client clientTLS = newSslContextFactoryClient();
clientTLS.setEndpointIdentificationAlgorithm("HTTPS");
scenario.client = scenario.newHttpClient(scenario.provideClientTransport(transport, clientTLS));
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
scenario.client.setExecutor(clientThreads);
scenario.client.start();
client.stop();
client.setSslContextFactory(clientTLS);
client.start();
if (transport == Transport.H3)
{
Assumptions.assumeTrue(false, "certificate verification not yet supported in quic");
assumeTrue(false, "certificate verification not yet supported in quic");
// TODO: the lines below should be enough, but they don't work. To be investigated.
HttpClientTransportOverHTTP3 http3Transport = (HttpClientTransportOverHTTP3)scenario.client.getTransport();
HttpClientTransportOverHTTP3 http3Transport = (HttpClientTransportOverHTTP3)client.getTransport();
http3Transport.getHTTP3Client().getQuicConfiguration().setVerifyPeerCertificates(true);
}
assertThrows(ExecutionException.class, () ->
{
// Use an IP address not present in the certificate.
int serverPort = scenario.getServerPort().orElse(0);
scenario.client.newRequest("https://127.0.0.2:" + serverPort)
int serverPort = newURI(transport).getPort();
client.newRequest("https://127.0.0.2:" + serverPort)
.timeout(5, TimeUnit.SECONDS)
.send();
});
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testOPTIONS(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
baseRequest.setHandled(true);
assertTrue(HttpMethod.OPTIONS.is(request.getMethod()));
assertEquals("*", target);
assertEquals("*", request.getPathInfo());
assertEquals("*", request.getPathInContext());
}
});
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.scheme(scenario.getScheme())
ContentResponse response = client.newRequest(newURI(transport))
.method(HttpMethod.OPTIONS)
.path("*")
.timeout(5, TimeUnit.SECONDS)
@ -410,28 +383,25 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testOPTIONSWithRelativeRedirect(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
baseRequest.setHandled(true);
if ("*".equals(target))
if ("*".equals(request.getPathInContext()))
{
// Be nasty and send a relative redirect.
// Code 303 will change the method to GET.
response.setStatus(HttpStatus.SEE_OTHER_303);
response.setHeader("Location", "/");
response.getHeaders().put(HttpHeader.LOCATION, "/");
}
}
});
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.scheme(scenario.getScheme())
ContentResponse response = client.newRequest(newURI(transport))
.method(HttpMethod.OPTIONS)
.path("*")
.timeout(5, TimeUnit.SECONDS)
@ -441,25 +411,22 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testDownloadWithInputStreamResponseListener(Transport transport) throws Exception
{
init(transport);
String content = "hello world";
scenario.start(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
baseRequest.setHandled(true);
response.getOutputStream().print(content);
Content.Sink.write(response, true, content, callback);
}
});
CountDownLatch latch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener();
scenario.client.newRequest(scenario.newURI())
.scheme(scenario.getScheme())
client.newRequest(newURI(transport))
.onResponseSuccess(response -> latch.countDown())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
@ -475,17 +442,16 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testConnectionListener(Transport transport) throws Exception
{
init(transport);
scenario.startServer(new EmptyServerHandler());
start(transport, new EmptyServerHandler());
long idleTimeout = 1000;
scenario.startClient(httpClient -> httpClient.setIdleTimeout(idleTimeout));
client.setIdleTimeout(idleTimeout);
CountDownLatch openLatch = new CountDownLatch(1);
CountDownLatch closeLatch = new CountDownLatch(1);
scenario.client.addBean(new org.eclipse.jetty.io.Connection.Listener()
client.addBean(new org.eclipse.jetty.io.Connection.Listener()
{
@Override
public void onOpened(org.eclipse.jetty.io.Connection connection)
@ -500,8 +466,7 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
});
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.scheme(scenario.getScheme())
ContentResponse response = client.newRequest(newURI(transport))
.timeout(5, TimeUnit.SECONDS)
.send();
@ -513,18 +478,16 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testAsyncResponseContentBackPressure(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
baseRequest.setHandled(true);
// Large write to generate multiple DATA frames.
response.getOutputStream().write(new byte[256 * 1024]);
response.write(true, ByteBuffer.allocate(256 * 1024), callback);
}
});
@ -532,8 +495,7 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
AtomicInteger counter = new AtomicInteger();
AtomicReference<Callback> callbackRef = new AtomicReference<>();
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(new CountDownLatch(1));
scenario.client.newRequest(scenario.newURI())
.scheme(scenario.getScheme())
client.newRequest(newURI(transport))
.onResponseContentAsync((response, content, callback) ->
{
if (counter.incrementAndGet() == 1)
@ -558,21 +520,20 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testResponseWithContentCompleteListenerInvokedOnce(Transport transport) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler()
start(transport, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
response.getWriter().write("Jetty");
Content.Sink.write(response, true, "Jetty", callback);
}
});
AtomicInteger completes = new AtomicInteger();
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.send(result -> completes.incrementAndGet());
sleep(1000);
@ -581,35 +542,37 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testHEADResponds200(Transport transport) throws Exception
{
init(transport);
testHEAD(scenario.servletPath, HttpStatus.OK_200);
testHEAD(transport, "/", HttpStatus.OK_200);
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testHEADResponds404(Transport transport) throws Exception
{
init(transport);
testHEAD("/notMapped", HttpStatus.NOT_FOUND_404);
testHEAD(transport, "/notMapped", HttpStatus.NOT_FOUND_404);
}
private void testHEAD(String path, int status) throws Exception
private void testHEAD(Transport transport, String path, int status) throws Exception
{
byte[] data = new byte[1024];
new Random().nextBytes(data);
scenario.start(new HttpServlet()
start(transport, new Handler.Processor()
{
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
response.getOutputStream().write(data);
String target = request.getPathInContext();
if ("/notMapped".equals(target))
org.eclipse.jetty.server.Response.writeError(request, response, callback, HttpStatus.NOT_FOUND_404);
else
response.write(true, ByteBuffer.wrap(data), callback);
}
});
ContentResponse response = scenario.client.newRequest(scenario.newURI())
ContentResponse response = client.newRequest(newURI(transport))
.method(HttpMethod.HEAD)
.path(path)
.send();
@ -619,23 +582,21 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testHEADWithAcceptHeaderAndSendError(Transport transport) throws Exception
{
init(transport);
int status = HttpStatus.BAD_REQUEST_400;
scenario.start(new HttpServlet()
start(transport, new Handler.Processor()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
resp.sendError(status);
org.eclipse.jetty.server.Response.writeError(request, response, callback, status);
}
});
ContentResponse response = scenario.client.newRequest(scenario.newURI())
ContentResponse response = client.newRequest(newURI(transport))
.method(HttpMethod.HEAD)
.path(scenario.servletPath)
.headers(headers -> headers.put(HttpHeader.ACCEPT, "*/*"))
.send();
@ -644,25 +605,22 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testHEADWithContentLengthGreaterThanMaxBufferingCapacity(Transport transport) throws Exception
{
int length = 1024;
init(transport);
scenario.start(new HttpServlet()
start(transport, new Handler.Processor()
{
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
response.setContentLength(length);
response.getOutputStream().write(new byte[length]);
response.getHeaders().putLongField(HttpHeader.CONTENT_LENGTH, length);
response.write(true, ByteBuffer.allocate(length), callback);
}
});
org.eclipse.jetty.client.api.Request request = scenario.client
.newRequest(scenario.newURI())
.method(HttpMethod.HEAD)
.path(scenario.servletPath);
org.eclipse.jetty.client.api.Request request = client.newRequest(newURI(transport))
.method(HttpMethod.HEAD);
FutureResponseListener listener = new FutureResponseListener(request, length / 2);
request.send(listener);
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
@ -672,52 +630,50 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testOneDestinationPerUser(Transport transport) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler());
start(transport, new EmptyServerHandler());
int runs = 4;
int users = 16;
for (int i = 0; i < runs; ++i)
{
for (int j = 0; j < users; ++j)
{
ContentResponse response = scenario.client.newRequest(scenario.newURI())
ContentResponse response = client.newRequest(newURI(transport))
.tag(j)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
}
}
List<Destination> destinations = scenario.client.getDestinations();
List<Destination> destinations = client.getDestinations();
assertEquals(users, destinations.size());
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testIPv6Host(Transport transport) throws Exception
{
Assumptions.assumeTrue(Net.isIpv6InterfaceAvailable());
Assumptions.assumeTrue(transport != Transport.UNIX_DOMAIN);
assumeTrue(Net.isIpv6InterfaceAvailable());
assumeTrue(transport != Transport.UNIX_DOMAIN);
init(transport);
scenario.start(new EmptyServerHandler()
start(transport, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
response.setContentType("text/plain");
response.getOutputStream().print(request.getHeader("Host"));
response.getHeaders().put(HttpHeader.CONTENT_TYPE, "text/plain");
Content.Sink.write(response, true, request.getHeaders().get("Host"), callback);
}
});
// Test with a full URI.
String hostAddress = "::1";
String host = "[" + hostAddress + "]";
String uri = scenario.newURI().replace("localhost", host) + "/path";
ContentResponse response = scenario.client.newRequest(uri)
URI serverURI = newURI(transport);
String uri = serverURI.toString().replace("localhost", host) + "/path";
ContentResponse response = client.newRequest(uri)
.method(HttpMethod.PUT)
.timeout(5, TimeUnit.SECONDS)
.send();
@ -726,9 +682,9 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
assertThat(new String(response.getContent(), StandardCharsets.ISO_8859_1), Matchers.startsWith("[::1]:"));
// Test with host address.
int port = scenario.getServerPort().orElse(0);
response = scenario.client.newRequest(hostAddress, port)
.scheme(scenario.getScheme())
int port = serverURI.getPort();
response = client.newRequest(hostAddress, port)
.scheme(serverURI.getScheme())
.method(HttpMethod.PUT)
.timeout(5, TimeUnit.SECONDS)
.send();
@ -736,9 +692,9 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
assertEquals(200, response.getStatus());
assertThat(new String(response.getContent(), StandardCharsets.ISO_8859_1), Matchers.startsWith("[::1]:"));
// Test with host.
response = scenario.client.newRequest(host, port)
.scheme(scenario.getScheme())
// Test with host name.
response = client.newRequest(host, port)
.scheme(serverURI.getScheme())
.method(HttpMethod.PUT)
.timeout(5, TimeUnit.SECONDS)
.send();
@ -746,37 +702,35 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
assertEquals(200, response.getStatus());
assertThat(new String(response.getContent(), StandardCharsets.ISO_8859_1), Matchers.startsWith("[::1]:"));
assertEquals(1, scenario.client.getDestinations().size());
Assertions.assertEquals(1, client.getDestinations().size());
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testRequestWithDifferentDestination(Transport transport) throws Exception
{
init(transport);
String requestScheme = HttpScheme.HTTPS.is(scenario.getScheme()) ? "http" : "https";
String requestScheme = newURI(transport).getScheme();
String requestHost = "otherHost.com";
int requestPort = 8888;
scenario.start(new EmptyServerHandler()
start(transport, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
HttpURI uri = jettyRequest.getHttpURI();
HttpURI uri = request.getHttpURI();
assertEquals(requestHost, uri.getHost());
assertEquals(requestPort, uri.getPort());
if (scenario.transport == Transport.H2C || scenario.transport == Transport.H2)
assertEquals(requestScheme, jettyRequest.getMetaData().getURI().getScheme());
if (transport == Transport.H2C || transport == Transport.H2)
assertEquals(requestScheme, uri.getScheme());
}
});
if (transport.isTlsBased())
scenario.httpConfig.getCustomizer(SecureRequestCustomizer.class).setSniHostCheck(false);
if (transport.isSecure())
httpConfig.getCustomizer(SecureRequestCustomizer.class).setSniHostCheck(false);
Origin origin = new Origin(scenario.getScheme(), "localhost", scenario.getServerPort().orElse(0));
HttpDestination destination = scenario.client.resolveDestination(origin);
Origin origin = new Origin(requestScheme, "localhost", requestPort);
HttpDestination destination = client.resolveDestination(origin);
org.eclipse.jetty.client.api.Request request = scenario.client.newRequest(requestHost, requestPort)
org.eclipse.jetty.client.api.Request request = client.newRequest(requestHost, requestPort)
.scheme(requestScheme)
.path("/path");
@ -792,38 +746,29 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testRequestIdleTimeout(Transport transport) throws Exception
{
init(transport);
CountDownLatch latch = new CountDownLatch(1);
long idleTimeout = 500;
scenario.start(new AbstractHandler()
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
try
{
baseRequest.setHandled(true);
if (target.equals("/1"))
assertTrue(latch.await(5, TimeUnit.SECONDS));
else if (target.equals("/2"))
Thread.sleep(2 * idleTimeout);
else
fail("Unknown path: " + target);
}
catch (InterruptedException x)
{
throw new ServletException(x);
}
String target = request.getPathInContext();
if (target.equals("/1"))
assertTrue(latch.await(5, TimeUnit.SECONDS));
else if (target.equals("/2"))
Thread.sleep(2 * idleTimeout);
else
fail("Unknown path: " + target);
}
});
assertThrows(TimeoutException.class, () ->
scenario.client.newRequest(scenario.newURI())
.scheme(scenario.getScheme())
client.newRequest(newURI(transport))
.path("/1")
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
.timeout(2 * idleTimeout, TimeUnit.MILLISECONDS)
@ -831,8 +776,7 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
latch.countDown();
// Make another request without specifying the idle timeout, should not fail
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.scheme(scenario.getScheme())
ContentResponse response = client.newRequest(newURI(transport))
.path("/2")
.timeout(3 * idleTimeout, TimeUnit.MILLISECONDS)
.send();

View File

@ -11,11 +11,9 @@
// ========================================================================
//
package org.eclipse.jetty.ee9.http.client;
package org.eclipse.jetty.test.client.transport;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
@ -27,9 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
@ -42,20 +37,23 @@ import org.eclipse.jetty.client.util.InputStreamRequestContent;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.opentest4j.TestAbortedException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -65,40 +63,32 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
public class HttpClientTimeoutTest extends AbstractTest
{
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testTimeoutOnFuture(Transport transport) throws Exception
{
init(transport);
long timeout = 1000;
scenario.start(new TimeoutHandler(2 * timeout));
start(transport, new TimeoutHandler(2 * timeout));
assertThrows(TimeoutException.class, () ->
{
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.timeout(timeout, TimeUnit.MILLISECONDS)
.send();
});
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testTimeoutOnListener(Transport transport) throws Exception
{
init(transport);
long timeout = 1000;
scenario.start(new TimeoutHandler(2 * timeout));
start(transport, new TimeoutHandler(2 * timeout));
final CountDownLatch latch = new CountDownLatch(1);
Request request = scenario.client.newRequest(scenario.newURI())
Request request = client.newRequest(newURI(transport))
.timeout(timeout, TimeUnit.MILLISECONDS);
request.send(result ->
{
@ -109,19 +99,18 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testTimeoutOnQueuedRequest(Transport transport) throws Exception
{
init(transport);
long timeout = 1000;
scenario.start(new TimeoutHandler(3 * timeout));
start(transport, new TimeoutHandler(3 * timeout));
// Only one connection so requests get queued
scenario.client.setMaxConnectionsPerDestination(1);
client.setMaxConnectionsPerDestination(1);
// The first request has a long timeout
final CountDownLatch firstLatch = new CountDownLatch(1);
Request request = scenario.client.newRequest(scenario.newURI())
Request request = client.newRequest(newURI(transport))
.timeout(4 * timeout, TimeUnit.MILLISECONDS);
request.send(result ->
{
@ -131,7 +120,7 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
// Second request has a short timeout and should fail in the queue
final CountDownLatch secondLatch = new CountDownLatch(1);
request = scenario.client.newRequest(scenario.newURI())
request = client.newRequest(newURI(transport))
.timeout(timeout, TimeUnit.MILLISECONDS);
request.send(result ->
{
@ -146,16 +135,15 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testTimeoutIsCancelledOnSuccess(Transport transport) throws Exception
{
init(transport);
long timeout = 1000;
scenario.start(new TimeoutHandler(timeout));
start(transport, new TimeoutHandler(timeout));
final CountDownLatch latch = new CountDownLatch(1);
final byte[] content = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
Request request = scenario.client.newRequest(scenario.newURI())
Request request = client.newRequest(newURI(transport))
.body(new InputStreamRequestContent(new ByteArrayInputStream(content)))
.timeout(2 * timeout, TimeUnit.MILLISECONDS);
request.send(new BufferingResponseListener()
@ -177,17 +165,16 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testTimeoutOnListenerWithExplicitConnection(Transport transport) throws Exception
{
init(transport);
long timeout = 1000;
scenario.start(new TimeoutHandler(2 * timeout));
start(transport, new TimeoutHandler(2 * timeout));
Request request = scenario.client.newRequest(scenario.newURI()).timeout(timeout, TimeUnit.MILLISECONDS);
Request request = client.newRequest(newURI(transport)).timeout(timeout, TimeUnit.MILLISECONDS);
CountDownLatch latch = new CountDownLatch(1);
Destination destination = scenario.client.resolveDestination(request);
Destination destination = client.resolveDestination(request);
FuturePromise<Connection> futureConnection = new FuturePromise<>();
destination.newConnection(futureConnection);
try (Connection connection = futureConnection.get(5, TimeUnit.SECONDS))
@ -203,17 +190,16 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testTimeoutIsCancelledOnSuccessWithExplicitConnection(Transport transport) throws Exception
{
init(transport);
long timeout = 1000;
scenario.start(new TimeoutHandler(timeout));
start(transport, new TimeoutHandler(timeout));
Request request = scenario.client.newRequest(scenario.newURI()).timeout(2 * timeout, TimeUnit.MILLISECONDS);
Request request = client.newRequest(newURI(transport)).timeout(2 * timeout, TimeUnit.MILLISECONDS);
CountDownLatch latch = new CountDownLatch(1);
Destination destination = scenario.client.resolveDestination(request);
Destination destination = client.resolveDestination(request);
FuturePromise<Connection> futureConnection = new FuturePromise<>();
destination.newConnection(futureConnection);
try (Connection connection = futureConnection.get(5, TimeUnit.SECONDS))
@ -235,16 +221,16 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testIdleTimeout(Transport transport) throws Exception
{
init(transport);
long timeout = 1000;
scenario.startServer(new TimeoutHandler(2 * timeout));
startServer(transport, new TimeoutHandler(2 * timeout));
AtomicBoolean sslIdle = new AtomicBoolean();
SslContextFactory.Client sslContextFactory = scenario.newClientSslContextFactory();
scenario.client = new HttpClient(scenario.provideClientTransport(transport, sslContextFactory))
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client = new HttpClient(newHttpClientTransport(transport))
{
@Override
public ClientConnectionFactory newSslClientConnectionFactory(SslContextFactory.Client sslContextFactory, ClientConnectionFactory connectionFactory)
@ -269,54 +255,50 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
};
}
};
scenario.client.setIdleTimeout(timeout);
scenario.client.start();
client.setExecutor(clientThreads);
client.setIdleTimeout(timeout);
client.start();
assertThrows(TimeoutException.class, () ->
{
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.send();
});
assertFalse(sslIdle.get());
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testBlockingConnectTimeoutFailsRequest(Transport transport) throws Exception
{
// Failure to connect is based on InetSocket address failure, which Unix-Domain does not use.
Assumptions.assumeTrue(transport != Transport.UNIX_DOMAIN);
init(transport);
testConnectTimeoutFailsRequest(true);
testConnectTimeoutFailsRequest(transport, true);
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testNonBlockingConnectTimeoutFailsRequest(Transport transport) throws Exception
{
// Failure to connect is based on InetSocket address failure, which Unix-Domain does not use.
Assumptions.assumeTrue(transport != Transport.UNIX_DOMAIN);
init(transport);
testConnectTimeoutFailsRequest(false);
testConnectTimeoutFailsRequest(transport, false);
}
private void testConnectTimeoutFailsRequest(boolean blocking) throws Exception
private void testConnectTimeoutFailsRequest(Transport transport, boolean blocking) throws Exception
{
String host = "10.255.255.1";
int port = 80;
int connectTimeout = 1000;
assumeConnectTimeout(host, port, connectTimeout);
scenario.start(new EmptyServerHandler());
HttpClient client = scenario.client;
client.stop();
start(transport, new EmptyServerHandler());
client.setConnectTimeout(connectTimeout);
client.setConnectBlocking(blocking);
client.start();
final CountDownLatch latch = new CountDownLatch(1);
Request request = client.newRequest(host, port);
request.scheme(scenario.getScheme())
request.scheme(newURI(transport).getScheme())
.send(result ->
{
if (result.isFailed())
@ -328,29 +310,24 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testConnectTimeoutIsCancelledByShorterRequestTimeout(Transport transport) throws Exception
{
// Failure to connect is based on InetSocket address failure, which Unix-Domain does not use.
Assumptions.assumeTrue(transport != Transport.UNIX_DOMAIN);
init(transport);
String host = "10.255.255.1";
int port = 80;
int connectTimeout = 2000;
assumeConnectTimeout(host, port, connectTimeout);
scenario.start(new EmptyServerHandler());
HttpClient client = scenario.client;
client.stop();
start(transport, new EmptyServerHandler());
client.setConnectTimeout(connectTimeout);
client.start();
final AtomicInteger completes = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(2);
Request request = client.newRequest(host, port);
request.scheme(scenario.getScheme())
request.scheme(newURI(transport).getScheme())
.timeout(connectTimeout / 2, TimeUnit.MILLISECONDS)
.send(result ->
{
@ -364,35 +341,31 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testRetryAfterConnectTimeout(Transport transport) throws Exception
{
// Failure to connect is based on InetSocket address failure, which Unix-Domain does not use.
Assumptions.assumeTrue(transport != Transport.UNIX_DOMAIN);
init(transport);
final String host = "10.255.255.1";
final int port = 80;
int connectTimeout = 1000;
assumeConnectTimeout(host, port, connectTimeout);
scenario.start(new EmptyServerHandler());
HttpClient client = scenario.client;
client.stop();
start(transport, new EmptyServerHandler());
client.setConnectTimeout(connectTimeout);
client.start();
final CountDownLatch latch = new CountDownLatch(1);
Request request = client.newRequest(host, port);
request.scheme(scenario.getScheme())
String scheme = newURI(transport).getScheme();
request.scheme(scheme)
.send(result ->
{
if (result.isFailed())
{
// Retry
client.newRequest(host, port)
.scheme(scenario.getScheme())
.scheme(scheme)
.send(retryResult ->
{
if (retryResult.isFailed())
@ -406,14 +379,13 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testVeryShortTimeout(Transport transport) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler());
start(transport, new EmptyServerHandler());
final CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.timeout(1, TimeUnit.MILLISECONDS) // Very short timeout
.send(result -> latch.countDown());
@ -421,18 +393,13 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testTimeoutCancelledWhenSendingThrowsException(Transport transport) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler());
start(transport, new EmptyServerHandler());
long timeout = 1000;
String uri = "badscheme://0.0.0.1";
if (scenario.getServerPort().isPresent())
uri += ":" + scenario.getServerPort().getAsInt();
Request request = scenario.client.newRequest(uri);
Request request = client.newRequest("badscheme://0.0.0.1/");
// TODO: assert a more specific Throwable
assertThrows(Exception.class, () ->
@ -450,32 +417,23 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testFirstRequestTimeoutAfterSecondRequestCompletes(Transport transport) throws Exception
{
init(transport);
long timeout = 2000;
scenario.start(new EmptyServerHandler()
start(transport, new Handler.Processor()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(org.eclipse.jetty.server.Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
if (request.getRequestURI().startsWith("/one"))
{
try
{
Thread.sleep(3 * timeout);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
if (request.getPathInContext().startsWith("/one"))
Thread.sleep(3 * timeout);
callback.succeeded();
}
});
CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.path("/one")
.timeout(2 * timeout, TimeUnit.MILLISECONDS)
.send(result ->
@ -484,7 +442,7 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
latch.countDown();
});
ContentResponse response = scenario.client.newRequest(scenario.newURI())
ContentResponse response = client.newRequest(newURI(transport))
.path("/two")
.timeout(timeout, TimeUnit.MILLISECONDS)
.send();
@ -494,55 +452,45 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testRequestQueuedDoesNotCancelTimeoutOfQueuedRequests(Transport transport) throws Exception
{
init(transport);
CountDownLatch serverLatch = new CountDownLatch(1);
scenario.start(new EmptyServerHandler()
start(transport, new Handler.Processor()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(org.eclipse.jetty.server.Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
if (request.getRequestURI().startsWith("/one"))
{
try
{
serverLatch.await();
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
if (request.getPathInContext().startsWith("/one"))
serverLatch.await();
callback.succeeded();
}
});
scenario.client.setMaxConnectionsPerDestination(1);
scenario.setMaxRequestsPerConnection(1);
setMaxRequestsPerConnection(1);
client.setMaxConnectionsPerDestination(1);
// Send the first request so that the others get queued.
CountDownLatch latch1 = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.path("/one")
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
Assertions.assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
latch1.countDown();
});
// Queue a second request, it should expire in the queue.
long timeout = 1000;
CountDownLatch latch2 = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.path("/two")
.timeout(2 * timeout, TimeUnit.MILLISECONDS)
.send(result ->
{
assertTrue(result.isFailed());
assertThat(result.getFailure(), Matchers.instanceOf(TimeoutException.class));
Assertions.assertTrue(result.isFailed());
MatcherAssert.assertThat(result.getFailure(), Matchers.instanceOf(TimeoutException.class));
latch2.countDown();
});
@ -550,13 +498,13 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
// Queue a third request, it should not reset the timeout of the second request.
CountDownLatch latch3 = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.path("/three")
.timeout(2 * timeout, TimeUnit.MILLISECONDS)
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
Assertions.assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
latch3.countDown();
});
@ -593,7 +541,7 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
}
private static class TimeoutHandler extends AbstractHandler
private static class TimeoutHandler extends Handler.Processor
{
private final long timeout;
@ -603,18 +551,10 @@ public class HttpClientTimeoutTest extends AbstractTest<TransportScenario>
}
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void process(org.eclipse.jetty.server.Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
baseRequest.setHandled(true);
try
{
TimeUnit.MILLISECONDS.sleep(timeout);
IO.copy(request.getInputStream(), response.getOutputStream());
}
catch (InterruptedException x)
{
throw new ServletException(x);
}
TimeUnit.MILLISECONDS.sleep(timeout);
Content.copy(request, response, callback);
}
}
}

View File

@ -11,9 +11,8 @@
// ========================================================================
//
package org.eclipse.jetty.ee9.http.client;
package org.eclipse.jetty.test.client.transport;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
@ -22,8 +21,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
import org.eclipse.jetty.client.AbstractConnectionPool;
import org.eclipse.jetty.client.HttpClient;
@ -50,15 +47,17 @@ import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
@ -397,13 +396,13 @@ public class HttpClientTransportDynamicTest
// client :1234 <-> :8888 proxy :5678 <-> server :8080
// client :2345 <-> :8888 proxy :6789 <-> server :8080
startServer(this::proxyH1H2C, new EmptyServerHandler()
startServer(this::proxyH1H2C, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, Response response, Callback callback)
{
response.setContentType(MimeTypes.Type.TEXT_PLAIN.asString());
response.getOutputStream().print(request.getRemotePort());
response.getHeaders().put(HttpHeader.CONTENT_TYPE, MimeTypes.Type.TEXT_PLAIN.asString());
Content.Sink.write(response, true, String.valueOf(Request.getRemotePort(request)), callback);
}
});
startClient(HttpClientConnectionFactory.HTTP11);
@ -494,13 +493,13 @@ public class HttpClientTransportDynamicTest
public void testHTTP11UpgradeToH2C() throws Exception
{
String content = "upgrade";
startServer(this::h1H2C, new EmptyServerHandler()
startServer(this::h1H2C, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, Response response, Callback callback)
{
response.setContentType("text/plain; charset=UTF-8");
response.getOutputStream().print(content);
response.getHeaders().put(HttpHeader.CONTENT_TYPE, MimeTypes.Type.TEXT_PLAIN_UTF_8.asString());
Content.Sink.write(response, true, content, callback);
}
});
ClientConnector clientConnector = new ClientConnector();
@ -569,13 +568,13 @@ public class HttpClientTransportDynamicTest
public void testHTTP11UpgradeToH2CWithForwardProxy() throws Exception
{
String content = "upgrade";
startServer(this::h1H2C, new EmptyServerHandler()
startServer(this::h1H2C, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, Response response, Callback callback)
{
response.setContentType("text/plain; charset=UTF-8");
response.getOutputStream().print(content);
response.getHeaders().put(HttpHeader.CONTENT_TYPE, MimeTypes.Type.TEXT_PLAIN_UTF_8.asString());
Content.Sink.write(response, true, content, callback);
}
});
ClientConnector clientConnector = new ClientConnector();
@ -608,13 +607,13 @@ public class HttpClientTransportDynamicTest
public void testHTTP11UpgradeToH2COverTLS() throws Exception
{
String content = "upgrade";
startServer(this::sslH1H2C, new EmptyServerHandler()
startServer(this::h1H2C, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, Response response, Callback callback)
{
response.setContentType("text/plain; charset=UTF-8");
response.getOutputStream().print(content);
response.getHeaders().put(HttpHeader.CONTENT_TYPE, MimeTypes.Type.TEXT_PLAIN_UTF_8.asString());
Content.Sink.write(response, true, content, callback);
}
});
ClientConnector clientConnector = new ClientConnector();
@ -641,12 +640,12 @@ public class HttpClientTransportDynamicTest
@Test
public void testHTTP11UpgradeToH2CWithRequestContentDoesNotUpgrade() throws Exception
{
startServer(this::h1H2C, new EmptyServerHandler()
startServer(this::h1H2C, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, Response response, Callback callback)
{
IO.copy(request.getInputStream(), response.getOutputStream());
Content.copy(request, response, callback);
}
});
ClientConnector clientConnector = new ClientConnector();
@ -708,12 +707,13 @@ public class HttpClientTransportDynamicTest
@Test
public void testHTTP11UpgradeToH2CFailedServerClose() throws Exception
{
startServer(this::h1H2C, new EmptyServerHandler()
startServer(this::h1H2C, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
public void process(Request request, Response response, Callback callback)
{
jettyRequest.getHttpChannel().getEndPoint().close();
request.getConnectionMetaData().getConnection().getEndPoint().close();
callback.succeeded();
}
});
ClientConnector clientConnector = new ClientConnector();

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.ee9.http.client;
package org.eclipse.jetty.test.client.transport;
import java.net.ConnectException;
import java.net.InetSocketAddress;
@ -24,8 +24,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
import org.eclipse.jetty.client.AbstractConnectionPool;
import org.eclipse.jetty.client.HttpClient;
@ -36,10 +34,6 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
import org.eclipse.jetty.ee9.proxy.AsyncProxyServlet;
import org.eclipse.jetty.ee9.proxy.ConnectHandler;
import org.eclipse.jetty.ee9.servlet.ServletContextHandler;
import org.eclipse.jetty.ee9.servlet.ServletHolder;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
@ -61,6 +55,7 @@ import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.ConnectHandler;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;

View File

@ -11,9 +11,8 @@
// ========================================================================
//
package org.eclipse.jetty.ee9.http.client;
package org.eclipse.jetty.test.client.transport;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@ -27,50 +26,45 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.RoundRobinConnectionPool;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.quic.server.QuicServerConnector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.hamcrest.Matchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.MethodSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario>
public class RoundRobinConnectionPoolTest extends AbstractTest
{
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testRoundRobin(Transport transport) throws Exception
{
init(transport);
AtomicBoolean record = new AtomicBoolean();
List<Integer> remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler()
start(transport, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
public void process(Request request, Response response, Callback callback)
{
if (record.get())
remotePorts.add(request.getRemotePort());
remotePorts.add(Request.getRemotePort(request));
callback.succeeded();
}
});
int maxConnections = 3;
CompletableFuture<Void> setup = new CompletableFuture<>();
scenario.client.getTransport().setConnectionPoolFactory(destination ->
client.getTransport().setConnectionPoolFactory(destination ->
{
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination);
pool.preCreateConnections(maxConnections).handle((r, x) -> x != null ? setup.completeExceptionally(x) : setup.complete(null));
@ -80,7 +74,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
// Send one request to trigger destination creation
// and connection pool pre-creation of connections,
// so we can test reliably the round-robin behavior.
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.timeout(5, TimeUnit.SECONDS)
.send();
setup.get(5, TimeUnit.SECONDS);
@ -89,7 +83,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
int requests = 2 * maxConnections - 1;
for (int i = 0; i < requests; ++i)
{
ContentResponse response = scenario.client.newRequest(scenario.newURI())
ContentResponse response = client.newRequest(newURI(transport))
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
@ -101,19 +95,18 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
int base = i % maxConnections;
int expected = remotePorts.get(base);
int candidate = remotePorts.get(i);
assertThat(scenario.client.dump() + System.lineSeparator() + remotePorts, expected, Matchers.equalTo(candidate));
assertThat(client.dump() + System.lineSeparator() + remotePorts, expected, Matchers.equalTo(candidate));
if (transport != Transport.UNIX_DOMAIN && i > 0)
assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate)));
}
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testMultiplex(Transport transport) throws Exception
{
init(transport);
int multiplex = 1;
if (scenario.transport.isMultiplexed())
if (transport.isMultiplexed())
multiplex = 4;
int maxMultiplex = multiplex;
@ -125,30 +118,31 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
AtomicReference<CountDownLatch> requestLatch = new AtomicReference<>();
CountDownLatch serverLatch = new CountDownLatch(count);
CyclicBarrier barrier = new CyclicBarrier(count + 1);
scenario.start(new EmptyServerHandler()
start(transport, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
public void process(Request request, Response response, Callback callback)
{
try
{
if (record.get())
{
remotePorts.add(request.getRemotePort());
remotePorts.add(Request.getRemotePort(request));
requestLatch.get().countDown();
serverLatch.countDown();
barrier.await();
}
callback.succeeded();
}
catch (Exception x)
{
throw new RuntimeException(x);
callback.failed(x);
}
}
});
CompletableFuture<Void> setup = new CompletableFuture<>();
scenario.client.getTransport().setConnectionPoolFactory(destination ->
client.getTransport().setConnectionPoolFactory(destination ->
{
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination);
pool.preCreateConnections(maxConnections).handle((r, x) -> x != null ? setup.completeExceptionally(x) : setup.complete(null));
@ -158,7 +152,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
// Send one request to trigger destination creation
// and connection pool pre-creation of connections,
// so we can test reliably the round-robin behavior.
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.timeout(5, TimeUnit.SECONDS)
.send();
setup.get(5, TimeUnit.SECONDS);
@ -170,7 +164,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
{
CountDownLatch latch = new CountDownLatch(1);
requestLatch.set(latch);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.path("/" + i)
.onRequestQueued(request -> requests.incrementAndGet())
.onRequestBegin(request -> requests.decrementAndGet())
@ -195,20 +189,18 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
int base = i % maxConnections;
int expected = remotePorts.get(base);
int candidate = remotePorts.get(i);
assertThat(scenario.client.dump() + System.lineSeparator() + remotePorts, expected, Matchers.equalTo(candidate));
assertThat(client.dump() + System.lineSeparator() + remotePorts, expected, Matchers.equalTo(candidate));
if (transport != Transport.UNIX_DOMAIN && i > 0)
assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate)));
}
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testMultiplexWithMaxUsage(Transport transport) throws Exception
{
init(transport);
int multiplex = 1;
if (scenario.transport.isMultiplexed())
if (transport.isMultiplexed())
multiplex = 2;
int maxMultiplex = multiplex;
@ -217,27 +209,28 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
int count = 2 * maxConnections * maxMultiplex * maxUsage;
List<Integer> remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler()
start(transport, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
public void process(Request request, Response response, Callback callback)
{
remotePorts.add(request.getRemotePort());
remotePorts.add(Request.getRemotePort(request));
callback.succeeded();
}
});
if (transport == Transport.H3)
((QuicServerConnector)scenario.connector).getQuicConfiguration().setMaxBidirectionalRemoteStreams(maxUsage);
scenario.client.getTransport().setConnectionPoolFactory(destination ->
((QuicServerConnector)connector).getQuicConfiguration().setMaxBidirectionalRemoteStreams(maxUsage);
client.getTransport().setConnectionPoolFactory(destination ->
{
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex);
pool.setMaxUsageCount(maxUsage);
pool.setMaxUsage(maxUsage);
return pool;
});
CountDownLatch clientLatch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.path("/" + i)
.timeout(5, TimeUnit.SECONDS)
.send(result ->

View File

@ -56,7 +56,7 @@
</build>
</profile>
<profile>
<id>enable-foreign-and-virtual-threads-preview</id>
<id>enable-preview</id>
<activation>
<jdk>[19,)</jdk>
</activation>

View File

@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.GZIPOutputStream;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.DispatcherType;
@ -51,9 +50,11 @@ import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.util.AsyncRequestContent;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.InputStreamRequestContent;
import org.eclipse.jetty.client.util.OutputStreamRequestContent;
import org.eclipse.jetty.client.util.StringRequestContent;
import org.eclipse.jetty.ee9.nested.HttpInput;
import org.eclipse.jetty.ee9.nested.HttpOutput;
import org.eclipse.jetty.ee9.servlet.ServletContextHandler;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
@ -65,7 +66,6 @@ import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.logging.StacklessLogging;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.internal.HttpChannelState;
@ -73,6 +73,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -1106,8 +1107,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
{
while (input.isReady() && !input.isFinished())
{
int read = input.read();
// System.err.printf("%x%n", read);
input.read();
readLatch.countDown();
}
}
@ -1677,15 +1677,6 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
}
private ByteBuffer gzipToBuffer(String s) throws IOException
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzos = new GZIPOutputStream(baos);
gzos.write(s.getBytes(StandardCharsets.ISO_8859_1));
gzos.close();
return BufferUtil.toBuffer(baos.toByteArray());
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testWriteListenerFromOtherThread(Transport transport) throws Exception
@ -1745,6 +1736,79 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
assertThat(failures, empty());
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testClientDefersContentServerIdleTimeout(Transport transport) throws Exception
{
CountDownLatch dataLatch = new CountDownLatch(1);
CountDownLatch errorLatch = new CountDownLatch(1);
init(transport);
scenario.start(new HttpServlet()
{
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException
{
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
request.getInputStream().setReadListener(new ReadListener()
{
@Override
public void onDataAvailable()
{
dataLatch.countDown();
}
@Override
public void onAllDataRead()
{
dataLatch.countDown();
}
@Override
public void onError(Throwable t)
{
errorLatch.countDown();
response.setStatus(HttpStatus.REQUEST_TIMEOUT_408);
asyncContext.complete();
}
});
}
});
long idleTimeout = 1000;
scenario.setRequestIdleTimeout(idleTimeout);
CountDownLatch latch = new CountDownLatch(1);
byte[] bytes = "[{\"key\":\"value\"}]".getBytes(StandardCharsets.UTF_8);
OutputStreamRequestContent content = new OutputStreamRequestContent("application/json;charset=UTF-8")
{
@Override
public long getLength()
{
return bytes.length;
}
};
scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
.path(scenario.servletPath)
.body(content)
.onResponseSuccess(response ->
{
Assertions.assertEquals(HttpStatus.REQUEST_TIMEOUT_408, response.getStatus());
latch.countDown();
})
.send(null);
// Wait for the server to idle timeout.
Thread.sleep(2 * idleTimeout);
assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
// Do not send the content to the server.
assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
private static class Listener implements ReadListener, WriteListener
{
private final Executor executor = Executors.newFixedThreadPool(32);
@ -1833,29 +1897,26 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
}
@Override
public void startServer(Handler handler) throws Exception
protected void prepareServer(ServletContextHandler handler)
{
if (handler == context)
// Add this listener before the context is started, so it's durable.
handler.addEventListener(new ContextHandler.ContextScopeListener()
{
// Add this listener before the context is started, so it's durable.
context.addEventListener(new ContextHandler.ContextScopeListener()
@Override
public void enterScope(org.eclipse.jetty.server.Context context, Request request)
{
@Override
public void enterScope(org.eclipse.jetty.server.Context context, Request request)
{
checkScope();
scope.set(new RuntimeException());
}
checkScope();
scope.set(new RuntimeException());
}
@Override
public void exitScope(org.eclipse.jetty.server.Context context, Request request)
{
assertScope();
scope.set(null);
}
});
}
super.startServer(handler);
@Override
public void exitScope(org.eclipse.jetty.server.Context context, Request request)
{
assertScope();
scope.set(null);
}
});
super.prepareServer(handler);
}
private void assertScope()

View File

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.ContinueProtocolHandler;
@ -40,13 +41,13 @@ import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.AsyncRequestContent;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.BytesRequestContent;
import org.eclipse.jetty.ee9.nested.Request;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -88,12 +89,11 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
private void testExpect100ContinueRespond100Continue(Transport transport, byte[]... contents) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
// Send 100-Continue and copy the content back
IO.copy(request.getInputStream(), response.getOutputStream());
}
@ -124,12 +124,11 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void testExpect100ContinueWithChunkedContentRespond100Continue(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
// Send 100-Continue and copy the content back
ServletInputStream input = request.getInputStream();
// Make sure we chunk the response too
@ -185,12 +184,11 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
private void testExpect100ContinueWithContentRespondError(Transport transport, int error) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.sendError(error);
}
});
@ -226,12 +224,11 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
{
init(transport);
String data = "success";
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
if (request.getRequestURI().endsWith("/done"))
{
response.getOutputStream().print(data);
@ -276,12 +273,11 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
// A request with Expect: 100-Continue cannot receive non-final responses like 3xx
String data = "success";
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
if (request.getRequestURI().endsWith("/done"))
{
// Send 100-Continue and consume the content
@ -328,12 +324,11 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
CountDownLatch clientLatch = new CountDownLatch(1);
CountDownLatch serverLatch = new CountDownLatch(1);
scenario.startServer(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException
{
baseRequest.setHandled(true);
clientRequestRef.get().abort(new Exception("abort!"));
try
{
@ -379,12 +374,11 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
AtomicReference<org.eclipse.jetty.client.api.Request> clientRequestRef = new AtomicReference<>();
CountDownLatch clientLatch = new CountDownLatch(1);
CountDownLatch serverLatch = new CountDownLatch(1);
scenario.startServer(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
baseRequest.setHandled(true);
// Send 100-Continue and consume the content
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
clientRequestRef.get().abort(new Exception("abort!"));
@ -429,12 +423,11 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void testExpect100ContinueWithContentWithResponseFailureDuring100Continue(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
// Send 100-Continue and consume the content
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
}
@ -497,12 +490,11 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
CountDownLatch serverLatch = new CountDownLatch(1);
AtomicReference<Thread> handlerThread = new AtomicReference<>();
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
handlerThread.set(Thread.currentThread());
// Send 100-Continue and echo the content
@ -540,7 +532,7 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
return thread != null && thread.getState() == Thread.State.WAITING;
});
content.offer(ByteBuffer.wrap(chunk1));
content.write(ByteBuffer.wrap(chunk1), Callback.NOOP);
// Wait for the handler thread to be blocked in the 2nd IO.
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
@ -550,7 +542,7 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
return thread != null && thread.getState() == Thread.State.WAITING;
});
content.offer(ByteBuffer.wrap(chunk2));
content.write(ByteBuffer.wrap(chunk2), Callback.NOOP);
content.close();
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
@ -562,12 +554,11 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
{
AtomicReference<Thread> handlerThread = new AtomicReference<>();
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
handlerThread.set(Thread.currentThread());
// Send 100-Continue and echo the content
IO.copy(request.getInputStream(), response.getOutputStream());
@ -602,7 +593,7 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
return thread != null && thread.getState() == Thread.State.WAITING;
});
content.offer(ByteBuffer.wrap(chunk2));
content.write(ByteBuffer.wrap(chunk2), Callback.NOOP);
content.close();
assertTrue(latch.await(5, TimeUnit.SECONDS));
@ -613,12 +604,11 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void testExpect100ContinueWithConcurrentDeferredContentRespond100Continue(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
// Send 100-Continue and echo the content
IO.copy(request.getInputStream(), response.getOutputStream());
}
@ -632,7 +622,7 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
.headers(headers -> headers.put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE))
.onRequestHeaders(request ->
{
content.offer(ByteBuffer.wrap(data));
content.write(ByteBuffer.wrap(data), Callback.NOOP);
content.close();
})
.body(content)
@ -654,12 +644,11 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void testExpect100ContinueWithInitialAndConcurrentDeferredContentRespond100Continue(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
// Send 100-Continue and echo the content
IO.copy(request.getInputStream(), response.getOutputStream());
}
@ -684,7 +673,7 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void onHeaders(Response response)
{
super.onHeaders(response);
content.offer(ByteBuffer.wrap(chunk2));
content.write(ByteBuffer.wrap(chunk2), Callback.NOOP);
content.close();
}
};
@ -742,24 +731,26 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
readRequestHeaders(socket.getInputStream());
OutputStream output = socket.getOutputStream();
String responses =
"HTTP/1.1 100 Continue\r\n" +
"\r\n" +
"HTTP/1.1 200 OK\r\n" +
"Transfer-Encoding: chunked\r\n" +
"\r\n" +
"10\r\n" +
"0123456789ABCDEF\r\n";
String responses = """
HTTP/1.1 100 Continue\r
\r
HTTP/1.1 200 OK\r
Transfer-Encoding: chunked\r
\r
10\r
0123456789ABCDEF\r
""";
output.write(responses.getBytes(StandardCharsets.UTF_8));
output.flush();
Thread.sleep(1000);
String content =
"10\r\n" +
"0123456789ABCDEF\r\n" +
"0\r\n" +
"\r\n";
String content = """
10\r
0123456789ABCDEF\r
0\r
\r
""";
output.write(content.getBytes(StandardCharsets.UTF_8));
output.flush();
@ -773,12 +764,12 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
public void testNoExpectRespond100Continue(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
Request jettyRequest = (Request)request;
// Force a 100 Continue response.
jettyRequest.getHttpChannel().sendResponse(HttpGenerator.CONTINUE_100_INFO, null, false);
// Echo the content.
@ -826,24 +817,26 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
OutputStream output = socket.getOutputStream();
readRequestHeaders(input);
String response1 =
"HTTP/1.1 100 Continue\r\n" +
"\r\n" +
"HTTP/1.1 303 See Other\r\n" +
"Location: /redirect\r\n" +
"Content-Length: 0\r\n" +
"\r\n";
String response1 = """
HTTP/1.1 100 Continue\r
\r
HTTP/1.1 303 See Other\r
Location: /redirect\r
Content-Length: 0\r
\r
""";
output.write(response1.getBytes(StandardCharsets.UTF_8));
output.flush();
readRequestHeaders(input);
String response2 =
"HTTP/1.1 100 Continue\r\n" +
"\r\n" +
"HTTP/1.1 200 OK\r\n" +
"Content-Length: 0\r\n" +
"Connection: close\r\n" +
"\r\n";
String response2 = """
HTTP/1.1 100 Continue\r
\r
HTTP/1.1 200 OK\r
Content-Length: 0\r
Connection: close\r
\r
""";
output.write(response2.getBytes(StandardCharsets.UTF_8));
output.flush();
}

View File

@ -17,14 +17,16 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpRequest;
@ -32,12 +34,9 @@ import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.BytesRequestContent;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -78,13 +77,13 @@ public class HttpTrailersTest extends AbstractTest<TransportScenario>
{
String trailerName = "Trailer";
String trailerValue = "value";
scenario.start(new EmptyServerHandler()
scenario.start(new HttpServlet()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
// Read the content first.
ServletInputStream input = jettyRequest.getInputStream();
ServletInputStream input = request.getInputStream();
while (true)
{
int read = input.read();
@ -92,8 +91,10 @@ public class HttpTrailersTest extends AbstractTest<TransportScenario>
break;
}
assertTrue(request.isTrailerFieldsReady());
// Now the trailers can be accessed.
HttpFields trailers = jettyRequest.getTrailerHttpFields();
Map<String, String> trailers = request.getTrailerFields();
assertNotNull(trailers);
assertEquals(trailerValue, trailers.get(trailerName));
}
@ -114,13 +115,13 @@ public class HttpTrailersTest extends AbstractTest<TransportScenario>
public void testEmptyRequestTrailers(Transport transport) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler()
scenario.start(new HttpServlet()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
// Read the content first.
ServletInputStream input = jettyRequest.getInputStream();
ServletInputStream input = request.getInputStream();
while (true)
{
int read = input.read();
@ -128,9 +129,12 @@ public class HttpTrailersTest extends AbstractTest<TransportScenario>
break;
}
assertTrue(request.isTrailerFieldsReady());
// Now the trailers can be accessed.
HttpFields trailers = jettyRequest.getTrailerHttpFields();
assertNull(trailers);
Map<String, String> trailers = request.getTrailerFields();
assertNotNull(trailers);
assertTrue(trailers.isEmpty());
}
});
@ -162,19 +166,16 @@ public class HttpTrailersTest extends AbstractTest<TransportScenario>
AtomicBoolean once = new AtomicBoolean();
String trailerName = "Trailer";
String trailerValue = "value";
scenario.start(new EmptyServerHandler()
scenario.start(new HttpServlet()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
Response jettyResponse = jettyRequest.getResponse();
if (once.compareAndSet(false, true))
{
HttpFields trailers = HttpFields.build().put(trailerName, trailerValue);
jettyResponse.setTrailers(() -> trailers);
Map<String, String> trailers = Map.of(trailerName, trailerValue);
response.setTrailerFields(() -> trailers);
}
if (content != null)
response.getOutputStream().write(content);
}
@ -228,14 +229,12 @@ public class HttpTrailersTest extends AbstractTest<TransportScenario>
public void testEmptyResponseTrailers(Transport transport) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler()
scenario.start(new HttpServlet()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
HttpFields trailers = HttpFields.build();
response.setTrailerFields(() ->
trailers.stream().collect(Collectors.toMap(HttpField::getName, HttpField::getValue)));
response.setTrailerFields(Map::of);
}
});
@ -270,15 +269,12 @@ public class HttpTrailersTest extends AbstractTest<TransportScenario>
String trailerName = "Trailer";
String trailerValue = "value";
init(transport);
scenario.start(new EmptyServerHandler()
scenario.start(new HttpServlet()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
HttpFields trailers = HttpFields.build().put(trailerName, trailerValue);
response.setTrailerFields(() ->
trailers.stream().collect(Collectors.toMap(HttpField::getName, HttpField::getValue)));
response.setTrailerFields(() -> Map.of(trailerName, trailerValue));
// Write a large content
response.getOutputStream().write(content);
}
@ -318,14 +314,12 @@ public class HttpTrailersTest extends AbstractTest<TransportScenario>
public void testResponseResetAlsoResetsTrailers(Transport transport) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler()
scenario.start(new HttpServlet()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
Response jettyResponse = jettyRequest.getResponse();
HttpFields trailers = HttpFields.build().put("name", "value");
jettyResponse.setTrailers(() -> trailers);
response.setTrailerFields(() -> Map.of("name", "value"));
// Fill some other response field.
response.setHeader("name", "value");
response.setStatus(HttpServletResponse.SC_NOT_ACCEPTABLE);

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.http.client;
package org.eclipse.jetty.ee9.http.client;
import java.io.IOException;
import java.util.List;
@ -20,7 +20,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpConversation;
@ -33,13 +33,10 @@ import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
@ -51,7 +48,7 @@ public class InformationalResponseTest extends AbstractTest<TransportScenario>
public void init(Transport transport) throws IOException
{
// Skip FCGI for now, not much interested in its server-side behavior.
Assumptions.assumeTrue(transport != FCGI);
Assumptions.assumeTrue(transport != Transport.FCGI);
setScenario(new TransportScenario(transport));
}
@ -60,12 +57,11 @@ public class InformationalResponseTest extends AbstractTest<TransportScenario>
public void test102Processing(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
response.sendError(HttpStatus.PROCESSING_102);
response.sendError(HttpStatus.PROCESSING_102);
response.setStatus(200);
@ -144,12 +140,11 @@ public class InformationalResponseTest extends AbstractTest<TransportScenario>
public void test103EarlyHint(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
response.setHeader("Hint", "one");
response.sendError(HttpStatus.EARLY_HINT_103);
response.setHeader("Hint", "two");

View File

@ -17,13 +17,13 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.BytesRequestContent;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -43,10 +43,10 @@ public class RequestReaderTest extends AbstractTest<TransportScenario>
public void testRecyclingWhenUsingReader(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
// Must be a Reader and not an InputStream.
BufferedReader br = request.getReader();
@ -58,9 +58,8 @@ public class RequestReaderTest extends AbstractTest<TransportScenario>
}
// Paranoid check.
assertThat(br.read(), is(-1));
baseRequest.setHandled(true);
}
}, client -> {});
});
ContentResponse response1 = scenario.client.newRequest(scenario.newURI())
.method("POST")

View File

@ -42,9 +42,6 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.client.transport.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.logging.StacklessLogging;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.internal.HttpChannelState;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
@ -106,10 +103,10 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
private void testBlockingReadWithDelayedFirstContentIdleTimeoutFires(TransportScenario scenario, boolean delayDispatch) throws Exception
{
testReadWithDelayedFirstContentIdleTimeoutFires(scenario, new EmptyServerHandler()
testReadWithDelayedFirstContentIdleTimeoutFires(scenario, new HttpServlet()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
// The client did not send the content,
// idle timeout should result in IOException.
@ -120,10 +117,10 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
private void testAsyncReadWithDelayedFirstContentIdleTimeoutFires(TransportScenario scenario, boolean delayDispatch) throws Exception
{
testReadWithDelayedFirstContentIdleTimeoutFires(scenario, new EmptyServerHandler()
testReadWithDelayedFirstContentIdleTimeoutFires(scenario, new HttpServlet()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
@ -152,18 +149,18 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
}, delayDispatch);
}
private void testReadWithDelayedFirstContentIdleTimeoutFires(TransportScenario scenario, Handler handler, boolean delayDispatch) throws Exception
private void testReadWithDelayedFirstContentIdleTimeoutFires(TransportScenario scenario, HttpServlet servlet, boolean delayDispatch) throws Exception
{
scenario.httpConfig.setDelayDispatchUntilContent(delayDispatch);
CountDownLatch handlerLatch = new CountDownLatch(1);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
try
{
handler.handle(target, jettyRequest, request, response);
servlet.service(request, response);
}
finally
{
@ -198,12 +195,11 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
{
init(transport);
CountDownLatch handlerLatch = new CountDownLatch(1);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
ServletInputStream input = request.getInputStream();
@ -262,12 +258,11 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
init(transport);
CountDownLatch handlerLatch = new CountDownLatch(1);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
ServletOutputStream output = response.getOutputStream();
@ -330,14 +325,13 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
scenario.requestLog.clear();
scenario.httpConfig.setMinRequestDataRate(bytesPerSecond);
CountDownLatch handlerLatch = new CountDownLatch(1);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
try
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
while (true)
{
@ -375,7 +369,7 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
for (int i = 0; i < 3; ++i)
{
content.offer(ByteBuffer.allocate(bytesPerSecond / 2));
content.write(ByteBuffer.allocate(bytesPerSecond / 2), Callback.NOOP);
Thread.sleep(2500);
}
content.close();
@ -398,12 +392,11 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
int bytesPerSecond = 20;
scenario.httpConfig.setMinRequestDataRate(bytesPerSecond);
CountDownLatch handlerLatch = new CountDownLatch(1);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
while (true)
{
@ -427,7 +420,7 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
for (int i = 0; i < 3; ++i)
{
content.offer(ByteBuffer.allocate(bytesPerSecond * 2));
content.write(ByteBuffer.allocate(bytesPerSecond * 2), Callback.NOOP);
Thread.sleep(2500);
}
content.close();
@ -445,7 +438,7 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
long idleTimeout = 3 * httpIdleTimeout;
scenario.httpConfig.setIdleTimeout(httpIdleTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
scenario.start(new BlockingReadHandler(handlerLatch));
scenario.start(new BlockingReadServlet(handlerLatch));
scenario.setRequestIdleTimeout(idleTimeout);
try (StacklessLogging ignore = new StacklessLogging(HttpChannelState.class))
@ -477,12 +470,11 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
long idleTimeout = 3 * httpIdleTimeout;
scenario.httpConfig.setIdleTimeout(httpIdleTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
ServletInputStream input = request.getInputStream();
@ -582,7 +574,7 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
// Wait for the server application to block reading.
Thread.sleep(2 * idleTimeout);
content.offer(ByteBuffer.wrap(data2));
content.write(ByteBuffer.wrap(data2), Callback.NOOP);
content.close();
assertTrue(latch.await(5, TimeUnit.SECONDS));
@ -610,10 +602,10 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
int bytesPerSecond = 16 * 1024;
scenario.httpConfig.setMinResponseDataRate(bytesPerSecond);
CountDownLatch serverLatch = new CountDownLatch(1);
scenario.start(new EmptyServerHandler()
scenario.start(new HttpServlet()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
try
{
@ -661,19 +653,18 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
assertTrue(clientLatch.await(15, TimeUnit.SECONDS));
}
private static class BlockingReadHandler extends AbstractHandler
private static class BlockingReadServlet extends HttpServlet
{
private final CountDownLatch handlerLatch;
public BlockingReadHandler(CountDownLatch handlerLatch)
public BlockingReadServlet(CountDownLatch handlerLatch)
{
this.handlerLatch = handlerLatch;
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
assertEquals(0, input.read());
try

View File

@ -45,11 +45,9 @@ import org.eclipse.jetty.http3.server.HTTP3ServerConnectionFactory;
import org.eclipse.jetty.http3.server.HTTP3ServerConnector;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.quic.server.QuicServerConnector;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HostHeaderCustomizer;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@ -80,7 +78,6 @@ public class TransportScenario
protected SslContextFactory.Server sslContextFactory;
protected Server server;
protected Connector connector;
protected ServletContextHandler context;
protected String servletPath = "/servlet";
protected HttpClient client;
protected Path unixDomainPath;
@ -145,6 +142,7 @@ public class TransportScenario
ret.append(getScheme());
ret.append("://localhost");
getServerPort().ifPresent(s -> ret.append(':').append(s));
ret.append(servletPath);
return ret.toString();
}
@ -278,31 +276,6 @@ public class TransportScenario
}
}
public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class);
if (h2 != null)
{
h2.setMaxConcurrentStreams(maxRequestsPerConnection);
}
else
{
if (connector instanceof QuicServerConnector)
((QuicServerConnector)connector).getQuicConfiguration().setMaxBidirectionalRemoteStreams(maxRequestsPerConnection);
}
}
public void start(Handler handler) throws Exception
{
start(handler, null);
}
public void start(Handler handler, Consumer<HttpClient> config) throws Exception
{
startServer(handler);
startClient(config);
}
public void start(HttpServlet servlet) throws Exception
{
startServer(servlet);
@ -334,21 +307,21 @@ public class TransportScenario
public void startServer(HttpServlet servlet) throws Exception
{
context = new ServletContextHandler();
prepareServer(servlet);
server.start();
}
protected void prepareServer(HttpServlet servlet)
{
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
ServletHolder holder = new ServletHolder(servlet);
holder.setAsyncSupported(true);
context.addServlet(holder, servletPath);
startServer(context);
prepareServer(context);
}
public void startServer(Handler handler) throws Exception
{
prepareServer(handler);
server.start();
}
protected void prepareServer(Handler handler)
protected void prepareServer(ServletContextHandler handler)
{
sslContextFactory = newServerSslContextFactory();
QueuedThreadPool serverThreads = new QueuedThreadPool();
@ -360,10 +333,13 @@ public class TransportScenario
connector = newServerConnector(server);
server.addConnector(connector);
server.setRequestLog((request, response) ->
{
int status = response.getCommittedMetaData().getStatus();
requestLog.offer(String.format("%s %s %s %03d", request.getMethod(), request.getRequestURI(), request.getProtocol(), status));
});
requestLog.offer(String.format("%s %s %s %03d",
request.getMethod(),
request.getHttpURI(),
request.getConnectionMetaData().getProtocol(),
response.getStatus())
)
);
server.setHandler(handler);
}

View File

@ -20,17 +20,16 @@ import java.util.concurrent.atomic.AtomicInteger;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.ReadListener;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.WriteListener;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.StringRequestContent;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.junit.jupiter.api.Assumptions;
@ -59,10 +58,10 @@ public class VirtualThreadsTest extends AbstractTest<TransportScenario>
Assumptions.assumeTrue(transport != Transport.FCGI);
init(transport);
scenario.prepareServer(new EmptyServerHandler()
scenario.prepareServer(new HttpServlet()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(HttpServletRequest request, HttpServletResponse response)
{
if (!VirtualThreads.isVirtualThread())
response.setStatus(HttpStatus.NOT_IMPLEMENTED_501);
@ -90,10 +89,10 @@ public class VirtualThreadsTest extends AbstractTest<TransportScenario>
init(transport);
byte[] data = new byte[128 * 1024 * 1024];
scenario.prepareServer(new EmptyServerHandler()
scenario.prepareServer(new HttpServlet()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
if (!VirtualThreads.isVirtualThread())
response.setStatus(HttpStatus.NOT_IMPLEMENTED_501);