Moving from ee9 to core the client transport tests that are generic enough to work in core.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-08-17 09:20:32 +02:00
parent 4fe414a762
commit d001d95d9f
17 changed files with 788 additions and 401 deletions

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.io.content;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@ -30,7 +31,7 @@ import org.eclipse.jetty.util.IO;
* </p>
* @see AsyncContent
*/
public class OutputStreamContentSource implements Content.Source
public class OutputStreamContentSource implements Content.Source, Closeable
{
private final AsyncContent async = new AsyncContent();
private final AsyncOutputStream output = new AsyncOutputStream();
@ -64,6 +65,12 @@ public class OutputStreamContentSource implements Content.Source
async.fail(failure);
}
@Override
public void close()
{
output.close();
}
private class AsyncOutputStream extends OutputStream
{
@Override

View File

@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-tests</artifactId>
<version>12.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jetty-test-client-transports</artifactId>
<name>Jetty Core :: Tests :: Client Transports</name>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-unixdomain-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-alpn-java-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.fcgi</groupId>
<artifactId>jetty-fcgi-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>jetty-http2-client-transport</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>jetty-http2-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>jetty-http3-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>jetty-http3-client-transport</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>enable-virtual-threads-preview</id>
<activation>
<jdk>[19,)</jdk>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>
@{argLine} --enable-preview
</argLine>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,254 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.test.client.transport;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
import org.eclipse.jetty.fcgi.server.ServerFCGIConnectionFactory;
import org.eclipse.jetty.http2.HTTP2Cipher;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.transport.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.client.transport.HttpClientTransportOverHTTP3;
import org.eclipse.jetty.http3.server.HTTP3ServerConnectionFactory;
import org.eclipse.jetty.http3.server.HTTP3ServerConnector;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HostHeaderCustomizer;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.unixdomain.server.UnixDomainServerConnector;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AbstractTest
{
protected final HttpConfiguration httpConfig = new HttpConfiguration();
protected SslContextFactory.Server sslContextFactoryServer;
protected Server server;
protected Connector connector;
protected HttpClient client;
protected Path unixDomainPath;
public static List<Transport> transports()
{
return List.of(Transport.values());
}
@AfterEach
public void dispose()
{
LifeCycle.stop(client);
LifeCycle.stop(server);
}
protected void start(Transport transport, Handler handler) throws Exception
{
startServer(transport, handler);
startClient(transport);
}
protected void startServer(Transport transport, Handler handler) throws Exception
{
prepareServer(transport, handler);
server.start();
}
protected void prepareServer(Transport transport, Handler handler) throws Exception
{
if (transport == Transport.UNIX_DOMAIN)
{
String unixDomainDir = System.getProperty("jetty.unixdomain.dir", System.getProperty("java.io.tmpdir"));
unixDomainPath = Files.createTempFile(Path.of(unixDomainDir), "unix_", ".sock");
assertTrue(unixDomainPath.toAbsolutePath().toString().length() < UnixDomainServerConnector.MAX_UNIX_DOMAIN_PATH_LENGTH, "Unix-Domain path too long");
Files.delete(unixDomainPath);
}
sslContextFactoryServer = newSslContextFactoryServer();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
connector = newConnector(transport, server);
server.addConnector(connector);
server.setHandler(handler);
}
protected SslContextFactory.Server newSslContextFactoryServer()
{
SslContextFactory.Server ssl = new SslContextFactory.Server();
ssl.setKeyStorePath("src/test/resources/keystore.p12");
ssl.setKeyStorePassword("storepwd");
ssl.setUseCipherSuitesOrder(true);
ssl.setCipherComparator(HTTP2Cipher.COMPARATOR);
return ssl;
}
protected void startClient(Transport transport) throws Exception
{
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client = new HttpClient(newHttpClientTransport(transport));
client.setExecutor(clientThreads);
client.setSocketAddressResolver(new SocketAddressResolver.Sync());
client.start();
}
public Connector newConnector(Transport transport, Server server)
{
return switch (transport)
{
case HTTP:
case HTTPS:
case H2C:
case H2:
case FCGI:
yield new ServerConnector(server, 1, 1, newServerConnectionFactory(transport));
case H3:
yield new HTTP3ServerConnector(server, sslContextFactoryServer, newServerConnectionFactory(transport));
case UNIX_DOMAIN:
UnixDomainServerConnector connector = new UnixDomainServerConnector(server, 1, 1, newServerConnectionFactory(transport));
connector.setUnixDomainPath(unixDomainPath);
yield connector;
};
}
protected ConnectionFactory[] newServerConnectionFactory(Transport transport)
{
List<ConnectionFactory> list = switch (transport)
{
case HTTP, UNIX_DOMAIN -> List.of(new HttpConnectionFactory(httpConfig));
case HTTPS ->
{
httpConfig.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactoryServer, http.getProtocol());
yield List.of(ssl, http);
}
case H2C ->
{
httpConfig.addCustomizer(new HostHeaderCustomizer());
yield List.of(new HTTP2CServerConnectionFactory(httpConfig));
}
case H2 ->
{
httpConfig.addCustomizer(new SecureRequestCustomizer());
httpConfig.addCustomizer(new HostHeaderCustomizer());
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(httpConfig);
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory("h2");
SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactoryServer, alpn.getProtocol());
yield List.of(ssl, alpn, h2);
}
case H3 ->
{
httpConfig.addCustomizer(new SecureRequestCustomizer());
httpConfig.addCustomizer(new HostHeaderCustomizer());
yield List.of(new HTTP3ServerConnectionFactory(httpConfig));
}
case FCGI -> List.of(new ServerFCGIConnectionFactory(httpConfig));
};
return list.toArray(ConnectionFactory[]::new);
}
protected SslContextFactory.Client newSslContextFactoryClient()
{
SslContextFactory.Client ssl = new SslContextFactory.Client();
ssl.setKeyStorePath("src/test/resources/keystore.p12");
ssl.setKeyStorePassword("storepwd");
ssl.setEndpointIdentificationAlgorithm(null);
return ssl;
}
protected HttpClientTransport newHttpClientTransport(Transport transport)
{
return switch (transport)
{
case HTTP, HTTPS ->
{
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(newSslContextFactoryClient());
yield new HttpClientTransportOverHTTP(clientConnector);
}
case H2C, H2 ->
{
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(newSslContextFactoryClient());
HTTP2Client http2Client = new HTTP2Client(clientConnector);
yield new HttpClientTransportOverHTTP2(http2Client);
}
case H3 ->
{
HTTP3Client http3Client = new HTTP3Client();
ClientConnector clientConnector = http3Client.getClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(newSslContextFactoryClient());
http3Client.getQuicConfiguration().setVerifyPeerCertificates(false);
yield new HttpClientTransportOverHTTP3(http3Client);
}
case FCGI -> new HttpClientTransportOverFCGI(1, "");
case UNIX_DOMAIN ->
{
ClientConnector clientConnector = ClientConnector.forUnixDomain(unixDomainPath);
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(newSslContextFactoryClient());
yield new HttpClientTransportOverHTTP(clientConnector);
}
};
}
protected URI newURI(Transport transport)
{
String scheme = transport.isSecure() ? "https" : "http";
String uri = scheme + "://localhost";
if (connector instanceof NetworkConnector networkConnector)
uri += ":" + networkConnector.getLocalPort();
return URI.create(uri);
}
public enum Transport
{
HTTP, HTTPS, H2C, H2, H3, FCGI, UNIX_DOMAIN;
public boolean isSecure()
{
return switch (this)
{
case HTTP, H2C, FCGI, UNIX_DOMAIN -> false;
case HTTPS, H2, H3 -> true;
};
}
}
}

View File

@ -11,10 +11,9 @@
// ========================================================================
//
package org.eclipse.jetty.ee9.http.client;
package org.eclipse.jetty.test.client.transport;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -22,41 +21,32 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.util.AsyncRequestContent;
import org.eclipse.jetty.client.util.InputStreamRequestContent;
import org.eclipse.jetty.client.util.OutputStreamRequestContent;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AsyncRequestContentTest extends AbstractTest<TransportScenario>
public class AsyncRequestContentTest extends AbstractTest
{
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testEmptyAsyncContent(Transport transport) throws Exception
{
init(transport);
scenario.start(new ConsumeInputHandler());
start(transport, new ConsumeInputHandler());
AsyncRequestContent content = new AsyncRequestContent();
CountDownLatch latch = new CountDownLatch(1);
scenario.client.POST(scenario.newURI())
client.POST(newURI(transport))
.body(content)
.send(result ->
{
@ -70,15 +60,14 @@ public class AsyncRequestContentTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testAsyncContent(Transport transport) throws Exception
{
init(transport);
scenario.start(new ConsumeInputHandler());
start(transport, new ConsumeInputHandler());
AsyncRequestContent content = new AsyncRequestContent();
CountDownLatch latch = new CountDownLatch(1);
scenario.client.POST(scenario.newURI())
client.POST(newURI(transport))
.body(content)
.send(result ->
{
@ -86,23 +75,22 @@ public class AsyncRequestContentTest extends AbstractTest<TransportScenario>
result.getResponse().getStatus() == HttpStatus.OK_200)
latch.countDown();
});
content.offer(ByteBuffer.wrap(new byte[1]));
content.write(ByteBuffer.wrap(new byte[1]), Callback.NOOP);
content.close();
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testEmptyInputStream(Transport transport) throws Exception
{
init(transport);
scenario.start(new ConsumeInputHandler());
start(transport, new ConsumeInputHandler());
InputStreamRequestContent content =
new InputStreamRequestContent(new ByteArrayInputStream(new byte[0]));
CountDownLatch latch = new CountDownLatch(1);
scenario.client.POST(scenario.newURI())
client.POST(newURI(transport))
.body(content)
.send(result ->
{
@ -115,16 +103,15 @@ public class AsyncRequestContentTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testInputStream(Transport transport) throws Exception
{
init(transport);
scenario.start(new ConsumeInputHandler());
start(transport, new ConsumeInputHandler());
InputStreamRequestContent content =
new InputStreamRequestContent(new ByteArrayInputStream(new byte[1]));
CountDownLatch latch = new CountDownLatch(1);
scenario.client.POST(scenario.newURI())
client.POST(newURI(transport))
.body(content)
.send(result ->
{
@ -137,15 +124,14 @@ public class AsyncRequestContentTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testEmptyOutputStream(Transport transport) throws Exception
{
init(transport);
scenario.start(new ConsumeInputHandler());
start(transport, new ConsumeInputHandler());
OutputStreamRequestContent content = new OutputStreamRequestContent();
CountDownLatch latch = new CountDownLatch(1);
scenario.client.POST(scenario.newURI())
client.POST(newURI(transport))
.body(content)
.send(result ->
{
@ -159,15 +145,14 @@ public class AsyncRequestContentTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testOutputStream(Transport transport) throws Exception
{
init(transport);
scenario.start(new ConsumeInputHandler());
start(transport, new ConsumeInputHandler());
OutputStreamRequestContent content = new OutputStreamRequestContent();
CountDownLatch latch = new CountDownLatch(1);
scenario.client.POST(scenario.newURI())
client.POST(newURI(transport))
.body(content)
.send(result ->
{
@ -184,17 +169,16 @@ public class AsyncRequestContentTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testBufferReuseAfterCallbackCompleted(Transport transport) throws Exception
{
init(transport);
scenario.start(new ConsumeInputHandler());
start(transport, new ConsumeInputHandler());
AsyncRequestContent content = new AsyncRequestContent();
CountDownLatch latch = new CountDownLatch(1);
List<Byte> requestContent = new ArrayList<>();
scenario.client.POST(scenario.newURI())
client.POST(newURI(transport))
.onRequestContent(((request, buffer) -> requestContent.add(buffer.get())))
.body(content)
.send(result ->
@ -209,10 +193,10 @@ public class AsyncRequestContentTest extends AbstractTest<TransportScenario>
byte[] bytes = new byte[1];
bytes[0] = first;
ByteBuffer buffer = ByteBuffer.wrap(bytes);
content.offer(buffer, Callback.from(() ->
content.write(buffer, Callback.from(() ->
{
bytes[0] = second;
content.offer(ByteBuffer.wrap(bytes), Callback.from(content::close));
content.write(ByteBuffer.wrap(bytes), Callback.from(content::close));
}));
assertTrue(latch.await(5, TimeUnit.SECONDS));
@ -221,20 +205,14 @@ public class AsyncRequestContentTest extends AbstractTest<TransportScenario>
assertEquals(second, requestContent.get(1));
}
private static class ConsumeInputHandler extends AbstractHandler
private static class ConsumeInputHandler extends Handler.Processor
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void process(Request request, Response response, Callback callback) throws Exception
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
Content.Source.consumeAll(request);
response.setStatus(HttpStatus.OK_200);
callback.succeeded();
}
}
}

View File

@ -11,15 +11,11 @@
// ========================================================================
//
package org.eclipse.jetty.ee9.http.client;
package org.eclipse.jetty.test.client.transport;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.BytesRequestContent;
import org.eclipse.jetty.http.HttpHeader;
@ -27,40 +23,36 @@ import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.ConnectionStatistics;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.MethodSource;
import static org.eclipse.jetty.ee9.http.client.Transport.H2C;
import static org.eclipse.jetty.ee9.http.client.Transport.HTTP;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
public class ConnectionStatisticsTest extends AbstractTest<TransportScenario>
public class ConnectionStatisticsTest extends AbstractTest
{
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
Assumptions.assumeTrue(scenario.transport == HTTP || scenario.transport == H2C);
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testConnectionStatistics(Transport transport) throws Exception
{
init(transport);
scenario.start(new AbstractHandler()
// Counting SslConnection opening/closing makes the test more complicated.
assumeTrue(!transport.isSecure());
// FastCGI server does not have statistics.
assumeTrue(transport != Transport.FCGI);
start(transport, new Handler.Processor()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void process(Request request, Response response, Callback callback)
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
Content.copy(request, response, callback);
}
});
@ -80,21 +72,21 @@ public class ConnectionStatisticsTest extends AbstractTest<TransportScenario>
};
ConnectionStatistics serverStats = new ConnectionStatistics();
scenario.connector.addBean(serverStats);
scenario.connector.addBean(closer);
connector.addBean(serverStats);
connector.addBean(closer);
serverStats.start();
ConnectionStatistics clientStats = new ConnectionStatistics();
scenario.client.addBean(clientStats);
scenario.client.addBean(closer);
client.addBean(clientStats);
client.addBean(closer);
clientStats.start();
long idleTimeout = 1000;
scenario.client.setIdleTimeout(idleTimeout);
client.setIdleTimeout(idleTimeout);
byte[] content = new byte[3072];
long contentLength = content.length;
ContentResponse response = scenario.client.newRequest(scenario.newURI())
ContentResponse response = client.newRequest(newURI(transport))
.headers(headers -> headers.put(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE))
.body(new BytesRequestContent(content))
.timeout(5, TimeUnit.SECONDS)

View File

@ -11,9 +11,8 @@
// ========================================================================
//
package org.eclipse.jetty.ee9.http.client;
package org.eclipse.jetty.test.client.transport;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -38,42 +37,45 @@ import org.eclipse.jetty.http2.client.transport.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.client.transport.internal.HttpChannelOverHTTP2;
import org.eclipse.jetty.http2.client.transport.internal.HttpConnectionOverHTTP2;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
import org.eclipse.jetty.http3.client.transport.HttpClientTransportOverHTTP3;
import org.eclipse.jetty.http3.client.transport.internal.HttpChannelOverHTTP3;
import org.eclipse.jetty.http3.client.transport.internal.HttpConnectionOverHTTP3;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
public class HttpChannelAssociationTest extends AbstractTest
{
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testAssociationFailedAbortsRequest(Transport transport) throws Exception
{
init(transport);
scenario.startServer(new EmptyServerHandler());
startServer(transport, new Handler.Processor()
{
@Override
public void process(org.eclipse.jetty.server.Request request, Response response, Callback callback)
{
callback.succeeded();
}
});
scenario.client = new HttpClient(newHttpClientTransport(scenario, exchange -> false));
client = new HttpClient(newHttpClientTransport(transport, exchange -> false));
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
scenario.client.setExecutor(clientThreads);
scenario.client.start();
client.setExecutor(clientThreads);
client.start();
CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.send(result ->
{
if (result.isFailed())
@ -84,14 +86,20 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testIdleTimeoutJustBeforeAssociation(Transport transport) throws Exception
{
init(transport);
scenario.startServer(new EmptyServerHandler());
startServer(transport, new Handler.Processor()
{
@Override
public void process(org.eclipse.jetty.server.Request request, Response response, Callback callback)
{
callback.succeeded();
}
});
long idleTimeout = 1000;
scenario.client = new HttpClient(newHttpClientTransport(scenario, exchange ->
client = new HttpClient(newHttpClientTransport(transport, exchange ->
{
// We idle timeout just before the association,
// we must be able to send the request successfully.
@ -100,12 +108,12 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
}));
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
scenario.client.setExecutor(clientThreads);
scenario.client.setIdleTimeout(idleTimeout);
scenario.client.start();
client.setExecutor(clientThreads);
client.setIdleTimeout(idleTimeout);
client.start();
CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.send(result ->
{
if (result.isSucceeded())
@ -115,17 +123,17 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
}
private HttpClientTransport newHttpClientTransport(TransportScenario scenario, Predicate<HttpExchange> code)
private HttpClientTransport newHttpClientTransport(Transport transport, Predicate<HttpExchange> code)
{
switch (scenario.transport)
return switch (transport)
{
case HTTP:
case HTTPS:
{
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(scenario.newClientSslContextFactory());
return new HttpClientTransportOverHTTP(clientConnector)
clientConnector.setSslContextFactory(newSslContextFactoryClient());
yield new HttpClientTransportOverHTTP(clientConnector)
{
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
@ -153,9 +161,9 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
{
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(scenario.newClientSslContextFactory());
clientConnector.setSslContextFactory(newSslContextFactoryClient());
HTTP2Client http2Client = new HTTP2Client(clientConnector);
return new HttpClientTransportOverHTTP2(http2Client)
yield new HttpClientTransportOverHTTP2(http2Client)
{
@Override
protected HttpConnectionOverHTTP2 newHttpConnection(HttpDestination destination, Session session)
@ -182,9 +190,9 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
{
HTTP3Client http3Client = new HTTP3Client();
http3Client.getClientConnector().setSelectors(1);
http3Client.getClientConnector().setSslContextFactory(scenario.newClientSslContextFactory());
http3Client.getClientConnector().setSslContextFactory(newSslContextFactoryClient());
http3Client.getQuicConfiguration().setVerifyPeerCertificates(false);
return new HttpClientTransportOverHTTP3(http3Client)
yield new HttpClientTransportOverHTTP3(http3Client)
{
@Override
protected HttpConnection newHttpConnection(HttpDestination destination, HTTP3SessionClient session)
@ -211,8 +219,8 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
{
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(scenario.newClientSslContextFactory());
return new HttpClientTransportOverFCGI(clientConnector, "")
clientConnector.setSslContextFactory(newSslContextFactoryClient());
yield new HttpClientTransportOverFCGI(clientConnector, "")
{
@Override
protected org.eclipse.jetty.io.Connection newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
@ -237,10 +245,10 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
}
case UNIX_DOMAIN:
{
ClientConnector clientConnector = ClientConnector.forUnixDomain(scenario.unixDomainPath);
ClientConnector clientConnector = ClientConnector.forUnixDomain(unixDomainPath);
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(scenario.newClientSslContextFactory());
return new HttpClientTransportOverHTTP(clientConnector)
clientConnector.setSslContextFactory(newSslContextFactoryClient());
yield new HttpClientTransportOverHTTP(clientConnector)
{
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
@ -263,11 +271,7 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
}
};
}
default:
{
throw new IllegalArgumentException();
}
}
};
}
private void sleep(long time)

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.ee9.http.client;
package org.eclipse.jetty.test.client.transport;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -22,46 +22,43 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Request;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
// TODO: these tests seems to fail spuriously, figure out why.
@Disabled
@Tag("Unstable")
public class HttpClientConnectTimeoutTest extends AbstractTest<TransportScenario>
public class HttpClientConnectTimeoutTest extends AbstractTest
{
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testConnectTimeout(Transport transport) throws Exception
{
init(transport);
final String host = "10.255.255.1";
final int port = 80;
String host = "10.255.255.1";
int port = 80;
int connectTimeout = 1000;
assumeConnectTimeout(host, port, connectTimeout);
assumeTrue(connectTimeout(host, port, connectTimeout));
scenario.start(new EmptyServerHandler());
scenario.client.stop();
scenario.client.setConnectTimeout(connectTimeout);
scenario.client.start();
start(transport, new Handler.Processor()
{
@Override
public void process(org.eclipse.jetty.server.Request request, Response response, Callback callback) throws Exception
{
callback.succeeded();
}
});
client.stop();
client.setConnectTimeout(connectTimeout);
client.start();
final CountDownLatch latch = new CountDownLatch(1);
Request request = scenario.client.newRequest(host, port);
CountDownLatch latch = new CountDownLatch(1);
Request request = client.newRequest(host, port);
request.send(result ->
{
if (result.isFailed())
@ -73,23 +70,29 @@ public class HttpClientConnectTimeoutTest extends AbstractTest<TransportScenario
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testConnectTimeoutIsCancelledByShorterRequestTimeout(Transport transport) throws Exception
{
init(transport);
String host = "10.255.255.1";
int port = 80;
int connectTimeout = 2000;
assumeConnectTimeout(host, port, connectTimeout);
assumeTrue(connectTimeout(host, port, connectTimeout));
scenario.start(new EmptyServerHandler());
scenario.client.stop();
scenario.client.setConnectTimeout(connectTimeout);
scenario.client.start();
start(transport, new Handler.Processor()
{
@Override
public void process(org.eclipse.jetty.server.Request request, Response response, Callback callback) throws Exception
{
callback.succeeded();
}
});
client.stop();
client.setConnectTimeout(connectTimeout);
client.start();
final AtomicInteger completes = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(2);
Request request = scenario.client.newRequest(host, port);
AtomicInteger completes = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(2);
Request request = client.newRequest(host, port);
request.timeout(connectTimeout / 2, TimeUnit.MILLISECONDS)
.send(result ->
{
@ -103,28 +106,34 @@ public class HttpClientConnectTimeoutTest extends AbstractTest<TransportScenario
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void retryAfterConnectTimeout(Transport transport) throws Exception
{
init(transport);
final String host = "10.255.255.1";
final int port = 80;
String host = "10.255.255.1";
int port = 80;
int connectTimeout = 1000;
assumeConnectTimeout(host, port, connectTimeout);
assumeTrue(connectTimeout(host, port, connectTimeout));
scenario.start(new EmptyServerHandler());
scenario.client.stop();
scenario.client.setConnectTimeout(connectTimeout);
scenario.client.start();
start(transport, new Handler.Processor()
{
@Override
public void process(org.eclipse.jetty.server.Request request, Response response, Callback callback) throws Exception
{
callback.succeeded();
}
});
client.stop();
client.setConnectTimeout(connectTimeout);
client.start();
final CountDownLatch latch = new CountDownLatch(1);
Request request = scenario.client.newRequest(host, port);
CountDownLatch latch = new CountDownLatch(1);
Request request = client.newRequest(host, port);
request.send(result1 ->
{
if (result1.isFailed())
{
// Retry
scenario.client.newRequest(host, port).send(result2 ->
client.newRequest(host, port).send(result2 ->
{
if (result2.isFailed())
latch.countDown();
@ -136,7 +145,7 @@ public class HttpClientConnectTimeoutTest extends AbstractTest<TransportScenario
assertNotNull(request.getAbortCause());
}
private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException
private boolean connectTimeout(String host, int port, int connectTimeout) throws IOException
{
try (Socket socket = new Socket())
{
@ -145,18 +154,12 @@ public class HttpClientConnectTimeoutTest extends AbstractTest<TransportScenario
// connect to them will hang the connection attempt, which is
// what we want to simulate in this test.
socket.connect(new InetSocketAddress(host, port), connectTimeout);
// Abort the test if we can connect.
Assumptions.assumeTrue(false, "Should not have been able to connect to " + host + ":" + port);
return false;
}
catch (SocketTimeoutException x)
{
// Expected timeout during connect, continue the test.
return;
}
catch (Throwable x)
{
// Abort if any other exception happens.
fail(x);
return true;
}
}
}

View File

@ -11,9 +11,8 @@
// ========================================================================
//
package org.eclipse.jetty.ee9.http.client;
package org.eclipse.jetty.test.client.transport;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Queue;
@ -28,20 +27,19 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.zip.GZIPOutputStream;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -50,37 +48,27 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpClientDemandTest extends AbstractTest<TransportScenario>
public class HttpClientDemandTest extends AbstractTest
{
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testDemandInTwoChunks(Transport transport) throws Exception
{
init(transport);
// Tests a special case where the first chunk is automatically
// delivered, and the second chunk is explicitly demanded and
// completes the response content.
CountDownLatch contentLatch = new CountDownLatch(1);
scenario.start(new EmptyServerHandler()
start(transport, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
try
{
response.setContentLength(2);
ServletOutputStream out = response.getOutputStream();
out.write('A');
out.flush();
response.getHeaders().putLongField(HttpHeader.CONTENT_LENGTH, 2);
response.write(false, ByteBuffer.wrap(new byte[]{'A'}), Callback.NOOP);
contentLatch.await();
out.write('B');
response.write(true, ByteBuffer.wrap(new byte[]{'B'}), callback);
}
catch (InterruptedException x)
{
@ -90,7 +78,7 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
});
CountDownLatch resultLatch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.send(new BufferingResponseListener()
{
private final AtomicInteger chunks = new AtomicInteger();
@ -121,36 +109,34 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testDemand(Transport transport) throws Exception
{
init(transport);
// A small buffer size so the response content is
// read in multiple buffers, but big enough for HTTP/3.
int bufferSize = 1536;
byte[] content = new byte[10 * bufferSize];
new Random().nextBytes(content);
scenario.startServer(new EmptyServerHandler()
startServer(transport, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
response.setContentLength(content.length);
response.getOutputStream().write(content);
response.getHeaders().putLongField(HttpHeader.CONTENT_LENGTH, content.length);
response.write(true, ByteBuffer.wrap(content), callback);
}
});
scenario.startClient(client ->
{
client.setByteBufferPool(new MappedByteBufferPool(bufferSize));
client.setResponseBufferSize(bufferSize);
});
startClient(transport);
client.stop();
client.setByteBufferPool(new MappedByteBufferPool(bufferSize));
client.setResponseBufferSize(bufferSize);
client.start();
Queue<LongConsumer> demandQueue = new ConcurrentLinkedQueue<>();
Queue<ByteBuffer> contentQueue = new ConcurrentLinkedQueue<>();
Queue<Callback> callbackQueue = new ConcurrentLinkedQueue<>();
CountDownLatch resultLatch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.send(new BufferingResponseListener()
{
@Override
@ -209,25 +195,21 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testContentWhileStalling(Transport transport) throws Exception
{
init(transport);
CountDownLatch serverContentLatch = new CountDownLatch(1);
scenario.start(new EmptyServerHandler()
start(transport, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
try
{
response.setContentLength(2);
ServletOutputStream out = response.getOutputStream();
out.write('A');
out.flush();
response.getHeaders().putLongField(HttpHeader.CONTENT_LENGTH, 2);
response.write(false, ByteBuffer.wrap(new byte[]{'A'}), Callback.NOOP);
serverContentLatch.await();
out.write('B');
response.write(true, ByteBuffer.wrap(new byte[]{'B'}), callback);
}
catch (InterruptedException x)
{
@ -240,7 +222,7 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
AtomicReference<LongConsumer> demandRef = new AtomicReference<>();
CountDownLatch clientContentLatch = new CountDownLatch(2);
CountDownLatch resultLatch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.onResponseContentDemanded((response, demand, content, callback) ->
{
try
@ -264,7 +246,7 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
assertFalse(result.isFailed(), String.valueOf(result.getFailure()));
Assertions.assertFalse(result.isFailed(), String.valueOf(result.getFailure()));
Response response = result.getResponse();
assertEquals(HttpStatus.OK_200, response.getStatus());
resultLatch.countDown();
@ -283,28 +265,26 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testTwoListenersWithDifferentDemand(Transport transport) throws Exception
{
init(transport);
int bufferSize = 1536;
byte[] content = new byte[10 * bufferSize];
new Random().nextBytes(content);
scenario.startServer(new EmptyServerHandler()
startServer(transport, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
response.setContentLength(content.length);
response.getOutputStream().write(content);
response.getHeaders().putLongField(HttpHeader.CONTENT_LENGTH, content.length);
response.write(true, ByteBuffer.wrap(content), callback);
}
});
scenario.startClient(client ->
{
client.setByteBufferPool(new MappedByteBufferPool(bufferSize));
client.setResponseBufferSize(bufferSize);
});
startClient(transport);
client.stop();
client.setByteBufferPool(new MappedByteBufferPool(bufferSize));
client.setResponseBufferSize(bufferSize);
client.start();
AtomicInteger chunks = new AtomicInteger();
Response.DemandedContentListener listener1 = (response, demand, content1, callback) ->
@ -319,18 +299,18 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
AtomicReference<CountDownLatch> demandLatch = new AtomicReference<>(new CountDownLatch(1));
Response.DemandedContentListener listener2 = (response, demand, content12, callback) ->
{
contentQueue.offer(content12);
assertTrue(contentQueue.offer(content12));
demandRef.set(demand);
demandLatch.get().countDown();
};
CountDownLatch resultLatch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.onResponseContentDemanded(listener1)
.onResponseContentDemanded(listener2)
.send(result ->
{
assertFalse(result.isFailed(), String.valueOf(result.getFailure()));
Assertions.assertFalse(result.isFailed(), String.valueOf(result.getFailure()));
Response response = result.getResponse();
assertEquals(HttpStatus.OK_200, response.getStatus());
resultLatch.countDown();
@ -361,23 +341,21 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testGZippedResponseContentWithAsyncDemand(Transport transport) throws Exception
{
init(transport);
int chunks = 64;
byte[] content = new byte[chunks * 1024];
new Random().nextBytes(content);
scenario.start(new EmptyServerHandler()
start(transport, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
try (GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream()))
try (GZIPOutputStream gzip = new GZIPOutputStream(Content.Sink.asOutputStream(response)))
{
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
response.getHeaders().put(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
for (int i = 0; i < chunks; ++i)
{
Thread.sleep(10);
@ -394,7 +372,7 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
byte[] bytes = new byte[content.length];
ByteBuffer received = ByteBuffer.wrap(bytes);
CountDownLatch resultLatch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.onResponseContentDemanded((response, demand, buffer, callback) ->
{
received.put(buffer);
@ -403,8 +381,8 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
})
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
Assertions.assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown();
});
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
@ -412,20 +390,18 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testDelayedBeforeContentDemand(Transport transport) throws Exception
{
init(transport);
byte[] content = new byte[1024];
new Random().nextBytes(content);
scenario.start(new EmptyServerHandler()
start(transport, new Handler.Processor()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
response.setContentLength(content.length);
response.getOutputStream().write(content);
response.getHeaders().putLongField(HttpHeader.CONTENT_LENGTH, content.length);
response.write(true, ByteBuffer.wrap(content), callback);
}
});
@ -435,7 +411,7 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
CountDownLatch beforeContentLatch = new CountDownLatch(1);
CountDownLatch contentLatch = new CountDownLatch(1);
CountDownLatch resultLatch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.onResponseContentDemanded(new Response.DemandedContentListener()
{
@Override
@ -457,8 +433,8 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
})
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
Assertions.assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown();
});
@ -475,18 +451,23 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@MethodSource("transports")
public void testDelayedBeforeContentDemandWithNoResponseContent(Transport transport) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler());
start(transport, new Handler.Processor()
{
@Override
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
callback.succeeded();
}
});
AtomicReference<LongConsumer> beforeContentDemandRef = new AtomicReference<>();
CountDownLatch beforeContentLatch = new CountDownLatch(1);
CountDownLatch contentLatch = new CountDownLatch(1);
CountDownLatch resultLatch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
client.newRequest(newURI(transport))
.onResponseContentDemanded(new Response.DemandedContentListener()
{
@Override
@ -507,8 +488,8 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
})
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
Assertions.assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown();
});

View File

@ -0,0 +1,66 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.test.client.transport;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.condition.DisabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
@DisabledForJreRange(max = JRE.JAVA_18)
public class VirtualThreadsTest extends AbstractTest
{
@ParameterizedTest
@MethodSource("transports")
public void testHandlerInvokedOnVirtualThread(Transport transport) throws Exception
{
// No virtual thread support in FCGI server-side.
Assumptions.assumeTrue(transport != Transport.FCGI);
prepareServer(transport, new Handler.Processor()
{
@Override
public void process(Request request, Response response, Callback callback)
{
if (!VirtualThreads.isVirtualThread())
response.setStatus(HttpStatus.NOT_IMPLEMENTED_501);
callback.succeeded();
}
});
ThreadPool threadPool = server.getThreadPool();
if (threadPool instanceof VirtualThreads.Configurable)
((VirtualThreads.Configurable)threadPool).setUseVirtualThreads(true);
server.start();
startClient(transport);
ContentResponse response = client.newRequest(newURI(transport))
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus(), " for transport " + transport);
}
}

View File

@ -0,0 +1,14 @@
#org.eclipse.jetty.LEVEL=DEBUG
org.eclipse.jetty.jmx.LEVEL=INFO
#org.eclipse.jetty.client.LEVEL=DEBUG
#org.eclipse.jetty.fcgi.LEVEL=DEBUG
#org.eclipse.jetty.proxy.LEVEL=DEBUG
#org.eclipse.jetty.http2.LEVEL=DEBUG
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.http2.client.LEVEL=DEBUG
#org.eclipse.jetty.http3.LEVEL=DEBUG
org.eclipse.jetty.http3.qpack.LEVEL=INFO
#org.eclipse.jetty.quic.LEVEL=DEBUG
org.eclipse.jetty.quic.quiche.LEVEL=INFO
#org.eclipse.jetty.util.VirtualThreads.LEVEL=DEBUG
#org.eclipse.jetty.util.thread.strategy.LEVEL=DEBUG

View File

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-core</artifactId>
<version>12.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jetty-tests</artifactId>
<name>Jetty Core :: Tests</name>
<packaging>pom</packaging>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>
<modules>
<module>jetty-test-client-transports</module>
</modules>
</project>

View File

@ -15,10 +15,10 @@ package org.eclipse.jetty.unixdomain.server;
import java.io.Closeable;
import java.io.IOException;
import java.net.ProtocolFamily;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.net.UnixDomainSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
@ -42,7 +42,6 @@ import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.JavaVersion;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.thread.Scheduler;
@ -238,34 +237,10 @@ public class UnixDomainServerConnector extends AbstractConnector
private ServerSocketChannel bindServerSocketChannel() throws IOException
{
Path unixDomainPath = getUnixDomainPath();
ServerSocketChannel serverChannel;
SocketAddress socketAddress;
try
{
ProtocolFamily family = Enum.valueOf(StandardProtocolFamily.class, "UNIX");
Class<?> channelClass = Class.forName("java.nio.channels.ServerSocketChannel");
serverChannel = (ServerSocketChannel)channelClass.getMethod("open", ProtocolFamily.class).invoke(null, family);
// Unix-Domain does not support SO_REUSEADDR.
Class<?> addressClass = Class.forName("java.net.UnixDomainSocketAddress");
socketAddress = (SocketAddress)addressClass.getMethod("of", Path.class).invoke(null, unixDomainPath);
}
catch (Throwable x)
{
String message = "Unix-Domain SocketChannels are available starting from Java 16, your Java version is: " + JavaVersion.VERSION;
throw new UnsupportedOperationException(message, x);
}
try
{
serverChannel.bind(socketAddress, getAcceptQueueSize());
return serverChannel;
}
catch (IOException x)
{
String message = String.format("Could not bind %s to %s", UnixDomainServerConnector.class.getSimpleName(), unixDomainPath);
throw new IOException(message, x);
}
ServerSocketChannel serverChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX);
SocketAddress socketAddress = UnixDomainSocketAddress.of(unixDomainPath);
serverChannel.bind(socketAddress, getAcceptQueueSize());
return serverChannel;
}
@Override

View File

@ -35,6 +35,7 @@
<module>jetty-session</module>
<module>jetty-slf4j-impl</module>
<module>jetty-start</module>
<module>jetty-tests</module>
<module>jetty-unixdomain-server</module>
<module>jetty-util</module>
<module>jetty-util-ajax</module>

View File

@ -52,6 +52,8 @@ import org.eclipse.jetty.client.util.AsyncRequestContent;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.InputStreamRequestContent;
import org.eclipse.jetty.client.util.StringRequestContent;
import org.eclipse.jetty.ee9.nested.HttpInput;
import org.eclipse.jetty.ee9.nested.HttpOutput;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
@ -60,20 +62,16 @@ import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.client.transport.internal.HttpConnectionOverHTTP2;
import org.eclipse.jetty.http2.internal.HTTP2Session;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.logging.StacklessLogging;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpInput.Content;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor;
import org.eclipse.jetty.server.internal.HttpChannelState;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.compression.InflaterPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
@ -255,7 +253,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
String data = "0123456789";
AsyncRequestContent content = new AsyncRequestContent();
content.offer(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)));
content.write(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)), Callback.NOOP);
CountDownLatch responseLatch = new CountDownLatch(1);
CountDownLatch clientLatch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
@ -746,7 +744,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
});
sleep(100);
content.offer(ByteBuffer.wrap(data));
content.write(ByteBuffer.wrap(data), Callback.NOOP);
content.close();
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
@ -852,7 +850,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
});
sleep(100);
content.offer(ByteBuffer.wrap(data));
content.write(ByteBuffer.wrap(data), Callback.NOOP);
content.close();
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
@ -1049,7 +1047,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
String content = "0123456789ABCDEF";
AsyncRequestContent requestContent = new AsyncRequestContent();
requestContent.offer(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)));
requestContent.write(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)), Callback.NOOP);
CountDownLatch clientLatch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
@ -1132,7 +1130,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
CountDownLatch responseLatch = new CountDownLatch(1);
AsyncRequestContent requestContent = new AsyncRequestContent();
requestContent.offer(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)));
requestContent.write(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)), Callback.NOOP);
var request = scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
.path(scenario.servletPath)
@ -1211,71 +1209,75 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
{
System.err.println("Service " + request);
final HttpInput httpInput = ((Request)request).getHttpInput();
HttpInput httpInput = ((org.eclipse.jetty.ee9.nested.Request)request).getHttpInput();
httpInput.addInterceptor(new HttpInput.Interceptor()
{
int state = 0;
Content saved;
Content.Chunk saved;
@Override
public Content readFrom(Content content)
public Content.Chunk readFrom(Content.Chunk chunk)
{
// System.err.printf("readFrom s=%d saved=%b %s%n",state,saved!=null,content);
switch (state)
{
case 0:
// null transform
content.skip(content.remaining());
chunk.skip(chunk.remaining());
chunk.release();
state++;
return null;
case 1:
{
// copy transform
if (content.isEmpty())
if (!chunk.hasRemaining())
{
state++;
return content;
return chunk;
}
ByteBuffer copy = wrap(toArray(content.getByteBuffer()));
content.skip(copy.remaining());
return new Content(copy);
ByteBuffer copy = wrap(toArray(chunk.getByteBuffer()));
chunk.skip(copy.remaining());
chunk.release();
return Content.Chunk.from(copy, false);
}
case 2:
// byte by byte
if (content.isEmpty())
if (!chunk.hasRemaining())
{
state++;
return content;
return chunk;
}
byte[] b = new byte[1];
int l = content.get(b, 0, 1);
return new Content(wrap(b, 0, l));
int l = chunk.get(b, 0, 1);
if (!chunk.hasRemaining())
chunk.release();
return Content.Chunk.from(wrap(b, 0, l), false);
case 3:
{
// double vision
if (content.isEmpty())
if (!chunk.hasRemaining())
{
if (saved == null)
{
state++;
return content;
return chunk;
}
Content copy = saved;
Content.Chunk ref = saved;
saved = null;
return copy;
return ref;
}
byte[] data = toArray(content.getByteBuffer());
content.skip(data.length);
saved = new Content(wrap(data));
return new Content(wrap(data));
byte[] data = toArray(chunk.getByteBuffer());
chunk.skip(data.length);
chunk.release();
saved = Content.Chunk.from(wrap(data), false);
return Content.Chunk.from(wrap(data), false);
}
default:
return content;
return chunk;
}
}
});
@ -1347,19 +1349,19 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
}
});
content.offer(BufferUtil.toBuffer("S0"));
content.write(BufferUtil.toBuffer("S0"), Callback.NOOP);
content.flush();
content.offer(BufferUtil.toBuffer("S1"));
content.write(BufferUtil.toBuffer("S1"), Callback.NOOP);
content.flush();
content.offer(BufferUtil.toBuffer("S2"));
content.write(BufferUtil.toBuffer("S2"), Callback.NOOP);
content.flush();
content.offer(BufferUtil.toBuffer("S3"));
content.write(BufferUtil.toBuffer("S3"), Callback.NOOP);
content.flush();
content.offer(BufferUtil.toBuffer("S4"));
content.write(BufferUtil.toBuffer("S4"), Callback.NOOP);
content.flush();
content.offer(BufferUtil.toBuffer("S5"));
content.write(BufferUtil.toBuffer("S5"), Callback.NOOP);
content.flush();
content.offer(BufferUtil.toBuffer("S6"));
content.write(BufferUtil.toBuffer("S6"), Callback.NOOP);
content.close();
assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
@ -1434,7 +1436,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
for (int i = 0; i < 1_000_000; i++)
{
contentProvider.offer(BufferUtil.toBuffer("S" + i));
contentProvider.write(BufferUtil.toBuffer("S" + i), Callback.NOOP);
}
contentProvider.close();
@ -1443,6 +1445,8 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
assertThat(resultRef.get().getResponse().getStatus(), Matchers.equalTo(HttpStatus.OK_200));
}
/*
// TODO: there is no GzipHttpInputInterceptor anymore, use something else.
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testAsyncInterceptedTwice(Transport transport) throws Exception
@ -1455,17 +1459,17 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
{
System.err.println("Service " + request);
final HttpInput httpInput = ((Request)request).getHttpInput();
httpInput.addInterceptor(new GzipHttpInputInterceptor(new InflaterPool(-1, true), ((Request)request).getHttpChannel().getByteBufferPool(), 1024));
httpInput.addInterceptor(content ->
HttpInput httpInput = ((org.eclipse.jetty.ee9.nested.Request)request).getHttpInput();
httpInput.addInterceptor(new GzipHttpInputInterceptor(new InflaterPool(-1, true), ((org.eclipse.jetty.ee9.nested.Request)request).getHttpChannel().getByteBufferPool(), 1024));
httpInput.addInterceptor(chunk ->
{
if (content.isSpecial())
return content;
ByteBuffer byteBuffer = content.getByteBuffer();
if (chunk.isTerminal())
return chunk;
ByteBuffer byteBuffer = chunk.getByteBuffer();
byte[] bytes = new byte[2];
bytes[1] = byteBuffer.get();
bytes[0] = byteBuffer.get();
return new Content(wrap(bytes));
return Content.Chunk.from(wrap(bytes), false);
});
AsyncContext asyncContext = request.startAsync();
@ -1538,13 +1542,14 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
for (int i = 0; i < 7; i++)
{
contentProvider.offer(gzipToBuffer("S" + i));
contentProvider.write(gzipToBuffer("S" + i), Callback.NOOP);
contentProvider.flush();
}
contentProvider.close();
assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
}
*/
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@ -1558,34 +1563,35 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
{
System.err.println("Service " + request);
final HttpInput httpInput = ((Request)request).getHttpInput();
httpInput.addInterceptor(content ->
HttpInput httpInput = ((org.eclipse.jetty.ee9.nested.Request)request).getHttpInput();
httpInput.addInterceptor(chunk ->
{
if (content.isEmpty())
return content;
if (!chunk.hasRemaining())
return chunk;
// skip contents with odd numbers
ByteBuffer duplicate = content.getByteBuffer().duplicate();
ByteBuffer duplicate = chunk.getByteBuffer().duplicate();
duplicate.get();
byte integer = duplicate.get();
int idx = Character.getNumericValue(integer);
Content contentCopy = new Content(content.getByteBuffer().duplicate());
content.skip(content.remaining());
Content.Chunk chunkCopy = Content.Chunk.from(chunk.getByteBuffer().duplicate(), false);
chunk.skip(chunk.remaining());
chunk.release();
if (idx % 2 == 0)
return contentCopy;
return chunkCopy;
return null;
});
httpInput.addInterceptor(content ->
httpInput.addInterceptor(chunk ->
{
if (content.isEmpty())
return content;
if (!chunk.hasRemaining())
return chunk;
// reverse the bytes
ByteBuffer byteBuffer = content.getByteBuffer();
ByteBuffer byteBuffer = chunk.getByteBuffer();
byte[] bytes = new byte[2];
bytes[1] = byteBuffer.get();
bytes[0] = byteBuffer.get();
return new Content(wrap(bytes));
return Content.Chunk.from(wrap(bytes), false);
});
AsyncContext asyncContext = request.startAsync();
@ -1653,19 +1659,19 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
}
});
contentProvider.offer(BufferUtil.toBuffer("S0"));
contentProvider.write(BufferUtil.toBuffer("S0"), Callback.NOOP);
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S1"));
contentProvider.write(BufferUtil.toBuffer("S1"), Callback.NOOP);
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S2"));
contentProvider.write(BufferUtil.toBuffer("S2"), Callback.NOOP);
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S3"));
contentProvider.write(BufferUtil.toBuffer("S3"), Callback.NOOP);
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S4"));
contentProvider.write(BufferUtil.toBuffer("S4"), Callback.NOOP);
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S5"));
contentProvider.write(BufferUtil.toBuffer("S5"), Callback.NOOP);
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S6"));
contentProvider.write(BufferUtil.toBuffer("S6"), Callback.NOOP);
contentProvider.close();
assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
@ -1835,14 +1841,14 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
context.addEventListener(new ContextHandler.ContextScopeListener()
{
@Override
public void enterScope(Context context, Request request, Object reason)
public void enterScope(org.eclipse.jetty.server.Context context, Request request)
{
checkScope();
scope.set(new RuntimeException());
}
@Override
public void exitScope(Context context, Request request)
public void exitScope(org.eclipse.jetty.server.Context context, Request request)
{
assertScope();
scope.set(null);

View File

@ -19,12 +19,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.util.AsyncRequestContent;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -49,25 +49,26 @@ public class BlockedIOTest extends AbstractTest<TransportScenario>
CountDownLatch started = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AtomicReference<Throwable> rereadException = new AtomicReference<>();
AtomicReference<Throwable> reReadException = new AtomicReference<>();
init(transport);
scenario.start(new AbstractHandler()
scenario.start(new HttpServlet()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
baseRequest.setHandled(true);
new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
int b = request.getInputStream().read();
if (b == '1')
{
started.countDown();
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
request.getInputStream().read();
// The read() above should block since the client does
// not send more data, and then throw when service() exits.
throw new IllegalStateException();
}
}
catch (Throwable ex1)
@ -75,12 +76,13 @@ public class BlockedIOTest extends AbstractTest<TransportScenario>
readException.set(ex1);
try
{
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
request.getInputStream().read();
// The read() above should throw immediately.
throw new IllegalStateException();
}
catch (Throwable ex2)
{
rereadException.set(ex2);
reReadException.set(ex2);
}
finally
{
@ -92,7 +94,7 @@ public class BlockedIOTest extends AbstractTest<TransportScenario>
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
Thread.sleep(1000);
}
@ -122,7 +124,7 @@ public class BlockedIOTest extends AbstractTest<TransportScenario>
try
{
assertThat(response.getStatus(), is(200));
stopped.await(10, TimeUnit.SECONDS);
assertTrue(stopped.await(10, TimeUnit.SECONDS));
ok.countDown();
}
catch (Throwable t)
@ -131,10 +133,10 @@ public class BlockedIOTest extends AbstractTest<TransportScenario>
}
})
.send(null);
requestContent.offer(BufferUtil.toBuffer("1"));
requestContent.write(BufferUtil.toBuffer("1"), Callback.NOOP);
assertTrue(ok.await(10, TimeUnit.SECONDS));
assertThat(readException.get(), instanceOf(IOException.class));
assertThat(rereadException.get(), instanceOf(IOException.class));
assertThat(reReadException.get(), instanceOf(IOException.class));
}
}

View File

@ -20,7 +20,7 @@
<!-- <module>jetty-ee9-test-bad-websocket-webapp</module> -->
<!-- <module>jetty-ee9-test-cdi</module> -->
<!-- <module>jetty-ee9-test-http2-webapp</module> -->
<!-- <module>jetty-ee9-test-http-client-transport</module> -->
<module>jetty-ee9-test-http-client-transport</module>
<!-- <module>jetty-ee9-test-integration</module> -->
<!-- <module>jetty-ee9-test-jmx</module> -->
<module>jetty-ee9-test-jndi</module>