Introduced CompletableResponseListener (#10502)

Introduced CompletableResponseListener, a replacement for FutureResponseListener that uses better APIs based on CompletableFuture.

Deprecated FutureResponseListener and replaced its usages.

Updated documentation.

Added tests for zipped request content.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-09-14 15:33:51 +02:00 committed by GitHub
parent 85c72720fc
commit 8684b0a67a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 763 additions and 245 deletions

View File

@ -192,11 +192,11 @@ Jetty's `HttpClient` allows applications to handle response content in different
You can buffer the response content in memory; this is done when using the xref:pg-client-http-blocking[blocking APIs] and the content is buffered within a `ContentResponse` up to 2 MiB.
If you want to control the length of the response content (for example limiting to values smaller than the default of 2 MiB), then you can use a `org.eclipse.jetty.client.FutureResponseListener` in this way:
If you want to control the length of the response content (for example limiting to values smaller than the default of 2 MiB), then you can use a `org.eclipse.jetty.client.CompletableResponseListener` in this way:
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/client/http/HTTPClientDocs.java[tags=futureResponseListener]
include::../../{doc_code}/org/eclipse/jetty/docs/programming/client/http/HTTPClientDocs.java[tags=completableResponseListener]
----
If the response content length is exceeded, the response will be aborted, and an exception will be thrown by method `get(\...)`.

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.AsyncRequestContent;
@ -30,11 +31,11 @@ import org.eclipse.jetty.client.AuthenticationStore;
import org.eclipse.jetty.client.BasicAuthentication;
import org.eclipse.jetty.client.BufferingResponseListener;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.DigestAuthentication;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpProxy;
@ -384,17 +385,19 @@ public class HTTPClientDocs
HttpClient httpClient = new HttpClient();
httpClient.start();
// tag::futureResponseListener[]
// tag::completableResponseListener[]
Request request = httpClient.newRequest("http://domain.com/path");
// Limit response content buffer to 512 KiB.
FutureResponseListener listener = new FutureResponseListener(request, 512 * 1024);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request, 512 * 1024)
.send();
request.send(listener);
// You can attach actions to the CompletableFuture,
// to be performed when the request+response completes.
// Wait at most 5 seconds for request+response to complete.
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
// end::futureResponseListener[]
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
// end::completableResponseListener[]
}
public void bufferingResponseListener() throws Exception

View File

@ -0,0 +1,111 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.client;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.client.internal.HttpContentResponse;
/**
* <p>A {@link BufferingResponseListener} that sends a {@link Request}
* and returns a {@link CompletableFuture} that is completed when
* {@link #onComplete(Result)} is called.</p>
* <p>Typical usage:</p>
* <pre>{@code
* var request = client.newRequest(...)...;
* CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
*
* // Attach actions that run when the request/response is complete.
* completable.thenApply(response -> ...)
* .whenComplete((response, failure) -> ...);
*
* // Possibly even block waiting for the response.
* ContentResponse response = completable.get(5, TimeUnit.SECONDS);
* }</pre>
*/
public class CompletableResponseListener extends BufferingResponseListener
{
private final CompletableFuture<ContentResponse> completable = new CompletableFuture<>();
private final Request request;
public CompletableResponseListener(Request request)
{
this(request, 2 * 1024 * 1024);
}
public CompletableResponseListener(Request request, int maxLength)
{
super(maxLength);
this.request = request;
this.completable.whenComplete(this::handleExternalFailure);
}
private void handleExternalFailure(ContentResponse response, Throwable failure)
{
// External failures applied to the CompletableFuture,
// such as timeouts or cancel(), must abort the request.
if (failure != null)
request.abort(failure);
}
/**
* <p>Sends the request asynchronously and returns a {@link CompletableFuture}
* that is completed when the request/response completes.</p>
*
* @return a {@link CompletableFuture} that is completed when the request/response completes
* @see Request#send(Response.CompleteListener)
*/
public CompletableFuture<ContentResponse> send()
{
request.send(this);
return completable;
}
/**
* <p>Sends the request asynchronously via the given {@link Destination} and returns
* a {@link CompletableFuture} that is completed when the request/response completes.</p>
*
* @param destination the destination to send the request to
* @return a {@link CompletableFuture} that is completed when the request/response completes
* @see Destination#send(Request, Response.CompleteListener)
*/
public CompletableFuture<ContentResponse> send(Destination destination)
{
destination.send(request, this);
return completable;
}
/**
* <p>Sends the request asynchronously via the given {@link Connection} and returns
* a {@link CompletableFuture} that is completed when the request/response completes.</p>
*
* @param connection the connection to send the request to
* @return a {@link CompletableFuture} that is completed when the request/response completes
* @see Connection#send(Request, Response.CompleteListener)
*/
public CompletableFuture<ContentResponse> send(Connection connection)
{
connection.send(request, this);
return completable;
}
@Override
public void onComplete(Result result)
{
if (result.isFailed())
completable.completeExceptionally(result.getFailure());
else
completable.complete(new HttpContentResponse(result.getResponse(), getContent(), getMediaType(), getEncoding()));
}
}

View File

@ -13,6 +13,8 @@
package org.eclipse.jetty.client;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
@ -68,6 +70,19 @@ public interface Destination
*/
void newConnection(Promise<Connection> promise);
/**
* <p>Creates asynchronously a new, unpooled, {@link Connection} that
* will be returned at a later time through the given {@link Promise}.</p>
*
* @return a {@link CompletableFuture} for a new, unpooled, {@link Connection}
*/
default CompletableFuture<Connection> newConnection()
{
Promise.Completable<Connection> promise = new Promise.Completable<>();
newConnection(promise);
return promise;
}
/**
* <p>Sends the given request to this destination.</p>
* <p>You can use this method to send the request to a specific

View File

@ -35,7 +35,10 @@ import org.eclipse.jetty.client.internal.HttpContentResponse;
* request.send(listener); // Asynchronous send
* ContentResponse response = listener.get(5, TimeUnit.SECONDS); // Timed block
* </pre>
*
* @deprecated Use {@link CompletableResponseListener} instead
*/
@Deprecated
public class FutureResponseListener extends BufferingResponseListener implements Future<ContentResponse>
{
private final AtomicBoolean cancelled = new AtomicBoolean();

View File

@ -39,9 +39,9 @@ import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.PathRequestContent;
@ -674,27 +674,26 @@ public class HttpRequest implements Request
@Override
public ContentResponse send() throws InterruptedException, TimeoutException, ExecutionException
{
FutureResponseListener listener = new FutureResponseListener(this);
send(listener);
try
{
return listener.get();
CompletableResponseListener listener = new CompletableResponseListener(this);
return listener.send().get();
}
catch (ExecutionException x)
{
// Previously this method used a timed get on the future, which was in a race
// with the timeouts implemented in HttpDestination and HttpConnection. The change to
// make those timeouts relative to the timestamp taken in sent() has made that race
// less certain, so a timeout could be either a TimeoutException from the get() or
// a ExecutionException(TimeoutException) from the HttpDestination/HttpConnection.
// Previously this method used a timed get on the future, which was in a
// race with the timeouts implemented in HttpDestination and HttpConnection.
// The change to make those timeouts relative to the timestamp taken in sent()
// has made that race less certain, so a timeout could be either a TimeoutException
// from the get() or a ExecutionException(TimeoutException) from the HttpDestination
// or HttpConnection.
// We now do not do a timed get and just rely on the HttpDestination/HttpConnection
// timeouts. This has the affect of changing this method from mostly throwing a
// TimeoutException to always throwing a ExecutionException(TimeoutException).
// Thus for backwards compatibility we unwrap the timeout exception here
if (x.getCause() instanceof TimeoutException)
// timeouts.
// This has the affect of changing this method from mostly throwing a TimeoutException
// to always throwing an ExecutionException(TimeoutException).
// Thus, for backwards compatibility we unwrap the TimeoutException here.
if (x.getCause() instanceof TimeoutException t)
{
TimeoutException t = (TimeoutException)(x.getCause());
abort(t);
throw t;
}
@ -704,8 +703,8 @@ public class HttpRequest implements Request
}
catch (Throwable x)
{
// Differently from the Future, the semantic of this method is that if
// the send() is interrupted or times out, we abort the request.
// Differently from the Future, the semantic of this method is that
// if the send() is interrupted or times out, we abort the request.
abort(x);
throw x;
}

View File

@ -0,0 +1,212 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.client;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Response;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class CompletableResponseListenerTest extends AbstractHttpClientServerTest
{
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testSend(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme());
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testSendDestination(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme());
Destination destination = client.resolveDestination(request);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send(destination);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testSendConnection(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme());
Destination destination = client.resolveDestination(request);
Connection connection = destination.newConnection().get();
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send(connection);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testAbort(Scenario scenario) throws Exception
{
long delay = 1000;
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(org.eclipse.jetty.server.Request request, Response response) throws Throwable
{
// Delay the response.
Thread.sleep(delay);
}
});
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme());
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Wait and then abort().
Thread.sleep(delay / 2);
Throwable failure = new Throwable();
CompletableFuture<Boolean> abortCompletable = request.abort(failure);
CompletableFuture<Void> combinedCompletable = completable.thenCombine(abortCompletable, (response, aborted) -> null);
// There should be no response.
ExecutionException executionFailure = assertThrows(ExecutionException.class, () -> combinedCompletable.get(5, TimeUnit.SECONDS));
assertThat(executionFailure.getCause(), sameInstance(failure));
// Trying to abort again should return false.
assertFalse(request.abort(new Throwable()).get(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testCompletableFutureTimeout(Scenario scenario) throws Exception
{
long delay = 1000;
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(org.eclipse.jetty.server.Request request, Response response) throws Throwable
{
// Delay the response.
Thread.sleep(delay);
}
});
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme());
// Add a timeout to fail the request.
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send()
.orTimeout(delay / 2, TimeUnit.MILLISECONDS);
// There should be no response.
ExecutionException failure = assertThrows(ExecutionException.class, () -> completable.get(5, TimeUnit.SECONDS));
assertThat(failure.getCause(), instanceOf(TimeoutException.class));
// Trying to abort again should return false.
assertFalse(request.abort(new Throwable()).get(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testCompletableFutureCancel(Scenario scenario) throws Exception
{
long delay = 1000;
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(org.eclipse.jetty.server.Request request, Response response) throws Throwable
{
// Delay the response.
Thread.sleep(delay);
}
});
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme());
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Wait and then cancel().
Thread.sleep(delay / 2);
assertTrue(completable.cancel(false));
// There should be no response.
assertThrows(CancellationException.class, () -> completable.get(5, TimeUnit.SECONDS));
// Trying to abort again should return false.
assertFalse(request.abort(new Throwable()).get(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testCompletableFutureCompletedExceptionally(Scenario scenario) throws Exception
{
long delay = 1000;
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(org.eclipse.jetty.server.Request request, Response response) throws Throwable
{
// Delay the response.
Thread.sleep(delay);
}
});
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme());
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Wait and then completeExceptionally().
Thread.sleep(delay / 2);
Throwable failure = new Throwable();
assertTrue(completable.completeExceptionally(failure));
// There should be no response.
ExecutionException executionFailure = assertThrows(ExecutionException.class, () -> completable.get(5, TimeUnit.SECONDS));
assertThat(executionFailure.getCause(), sameInstance(failure));
// Trying to abort again should return false.
assertFalse(request.abort(new Throwable()).get(5, TimeUnit.SECONDS));
}
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.client;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@ -263,12 +264,10 @@ public class ConnectionPoolTest
default -> throw new IllegalStateException();
}
FutureResponseListener listener = new FutureResponseListener(request, contentLength);
request.send(listener);
try
{
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request, contentLength).send();
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
}
catch (Throwable x)

View File

@ -20,6 +20,7 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -159,14 +160,13 @@ public class HttpClientChunkedContentTest
// Issue another request to be sure the connection is sane.
Request request = client.newRequest("localhost", server.getLocalPort())
.timeout(5, TimeUnit.SECONDS);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
consumeRequestHeaders(socket);
output.write(response.getBytes(StandardCharsets.UTF_8));
output.flush();
assertEquals(200, listener.get(5, TimeUnit.SECONDS).getStatus());
assertEquals(200, completable.get(5, TimeUnit.SECONDS).getStatus());
}
}
}

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.client;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -42,9 +43,8 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
destination.newConnection(futureConnection);
try (Connection connection = futureConnection.get(5, TimeUnit.SECONDS))
{
FutureResponseListener listener = new FutureResponseListener(request);
connection.send(request, listener);
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send(connection);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertNotNull(response);
assertEquals(200, response.getStatus());
@ -67,9 +67,8 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
FuturePromise<Connection> futureConnection = new FuturePromise<>();
destination.newConnection(futureConnection);
Connection connection = futureConnection.get(5, TimeUnit.SECONDS);
FutureResponseListener listener = new FutureResponseListener(request);
connection.send(request, listener);
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send(connection);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(200, response.getStatus());
@ -105,9 +104,8 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
destination.newConnection(futureConnection);
Connection connection = futureConnection.get(5, TimeUnit.SECONDS);
FutureResponseListener listener = new FutureResponseListener(request);
connection.send(request, listener);
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send(connection);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));

View File

@ -33,6 +33,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
@ -1210,9 +1211,8 @@ public class HttpClientTest extends AbstractHttpClientServerTest
destination.newConnection(promise);
try (Connection connection = promise.get(5, TimeUnit.SECONDS))
{
FutureResponseListener listener = new FutureResponseListener(request);
connection.send(request, listener);
ContentResponse response = listener.get(2 * timeout, TimeUnit.MILLISECONDS);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send(connection);
ContentResponse response = completable.get(2 * timeout, TimeUnit.MILLISECONDS);
assertEquals(200, response.getStatus());
// The parser notifies end-of-content and therefore the CompleteListener
@ -1340,13 +1340,12 @@ public class HttpClientTest extends AbstractHttpClientServerTest
.scheme(scenario.getScheme())
.version(version)
.body(content);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Wait some time to simulate a slow request.
Thread.sleep(1000);
content.close();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(200, response.getStatus());
assertArrayEquals(data, response.getContent());
@ -1497,8 +1496,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
FuturePromise<Connection> promise = new FuturePromise<>();
client.resolveDestination(request).newConnection(promise);
Connection connection = promise.get(5, TimeUnit.SECONDS);
FutureResponseListener listener = new FutureResponseListener(request);
connection.send(request, listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send(connection);
try (Socket socket = server.accept())
{
@ -1513,7 +1511,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
output.write(httpResponse.getBytes(UTF_8));
output.flush();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(200, response.getStatus());
assertThat(connection, Matchers.instanceOf(HttpConnectionOverHTTP.class));
HttpConnectionOverHTTP httpConnection = (HttpConnectionOverHTTP)connection;
@ -1528,8 +1526,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
httpConnection.fillInterested();
request = client.newRequest(host, port);
listener = new FutureResponseListener(request);
connection.send(request, listener);
completable = new CompletableResponseListener(request).send(connection);
consume(input, false);
@ -1540,7 +1537,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
output.write(httpResponse.getBytes(UTF_8));
output.flush();
listener.get(5, TimeUnit.SECONDS);
completable.get(5, TimeUnit.SECONDS);
}
}
}
@ -1667,8 +1664,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Request request = client.newRequest("localhost", server.getLocalPort())
.scheme(scenario.getScheme())
.timeout(5, TimeUnit.SECONDS);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (Socket socket = server.accept())
{
@ -1686,7 +1682,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
output.write(httpResponse.getBytes(UTF_8));
output.flush();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(204, response.getStatus());
byte[] responseContent = response.getContent();
@ -1697,8 +1693,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
request = client.newRequest("localhost", server.getLocalPort())
.scheme(scenario.getScheme())
.timeout(5, TimeUnit.SECONDS);
listener = new FutureResponseListener(request);
request.send(listener);
completable = new CompletableResponseListener(request).send();
consume(input, false);
@ -1709,7 +1704,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
output.write(httpResponse.getBytes(UTF_8));
output.flush();
response = listener.get(5, TimeUnit.SECONDS);
response = completable.get(5, TimeUnit.SECONDS);
assertEquals(200, response.getStatus());
}
}

View File

@ -19,6 +19,7 @@ import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP;
@ -95,8 +96,7 @@ public class ServerConnectionCloseTest
startClient();
Request request = client.newRequest("localhost", port).path("/ctx/path");
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (Socket socket = server.accept())
{
@ -134,7 +134,7 @@ public class ServerConnectionCloseTest
if (shutdownOutput)
socket.shutdownOutput();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
// Give some time to process the connection.

View File

@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -288,14 +289,13 @@ public class Socks4ProxyTest
int serverPort = proxyPort + 1; // Any port will do
Request request = client.newRequest(serverHost, serverPort)
.timeout(timeout, TimeUnit.MILLISECONDS);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (SocketChannel ignored = proxy.accept())
{
// Accept the connection, but do not reply and don't close.
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(2 * timeout, TimeUnit.MILLISECONDS));
ExecutionException x = assertThrows(ExecutionException.class, () -> completable.get(2 * timeout, TimeUnit.MILLISECONDS));
assertThat(x.getCause(), instanceOf(TimeoutException.class));
}
}
@ -312,14 +312,13 @@ public class Socks4ProxyTest
String serverHost = "127.0.0.13";
int serverPort = proxyPort + 1; // Any port will do
Request request = client.newRequest(serverHost, serverPort);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (SocketChannel ignored = proxy.accept())
{
// Accept the connection, but do not reply and don't close.
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(2 * idleTimeout, TimeUnit.MILLISECONDS));
ExecutionException x = assertThrows(ExecutionException.class, () -> completable.get(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertThat(x.getCause(), instanceOf(TimeoutException.class));
}
}
@ -334,15 +333,14 @@ public class Socks4ProxyTest
String serverHost = "127.0.0.13";
int serverPort = proxyPort + 1; // Any port will do
Request request = client.newRequest(serverHost, serverPort);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (SocketChannel channel = proxy.accept())
{
// Immediately close the connection.
channel.close();
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(5, TimeUnit.SECONDS));
ExecutionException x = assertThrows(ExecutionException.class, () -> completable.get(5, TimeUnit.SECONDS));
assertThat(x.getCause(), instanceOf(IOException.class));
}
}

View File

@ -22,6 +22,7 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -361,8 +362,7 @@ public class Socks5ProxyTest
.path(path)
.timeout(timeout, TimeUnit.MILLISECONDS);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (SocketChannel channel = proxy.accept())
{
@ -375,7 +375,7 @@ public class Socks5ProxyTest
byte notAcceptable = -1;
channel.write(ByteBuffer.wrap(new byte[]{Socks5.VERSION, notAcceptable}));
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(2 * timeout, TimeUnit.MILLISECONDS));
ExecutionException x = assertThrows(ExecutionException.class, () -> completable.get(2 * timeout, TimeUnit.MILLISECONDS));
assertThat(x.getCause(), instanceOf(IOException.class));
}
}
@ -400,8 +400,7 @@ public class Socks5ProxyTest
.path(path)
.timeout(timeout, TimeUnit.MILLISECONDS);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (SocketChannel channel = proxy.accept())
{
@ -446,7 +445,7 @@ public class Socks5ProxyTest
byte authenticationFailed = 1; // Any non-zero.
channel.write(ByteBuffer.wrap(new byte[]{1, authenticationFailed}));
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(2 * timeout, TimeUnit.MILLISECONDS));
ExecutionException x = assertThrows(ExecutionException.class, () -> completable.get(2 * timeout, TimeUnit.MILLISECONDS));
assertThat(x.getCause(), instanceOf(IOException.class));
}
}
@ -777,14 +776,13 @@ public class Socks5ProxyTest
int serverPort = proxyPort + 1; // Any port will do
Request request = client.newRequest(serverHost, serverPort)
.timeout(timeout, TimeUnit.MILLISECONDS);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (SocketChannel ignored = proxy.accept())
{
// Accept the connection, but do not reply and don't close.
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(2 * timeout, TimeUnit.MILLISECONDS));
ExecutionException x = assertThrows(ExecutionException.class, () -> completable.get(2 * timeout, TimeUnit.MILLISECONDS));
assertThat(x.getCause(), instanceOf(TimeoutException.class));
}
}
@ -799,15 +797,14 @@ public class Socks5ProxyTest
String serverHost = "127.0.0.13";
int serverPort = proxyPort + 1; // Any port will do
Request request = client.newRequest(serverHost, serverPort);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (SocketChannel channel = proxy.accept())
{
// Immediately close the connection.
channel.close();
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(5, TimeUnit.SECONDS));
ExecutionException x = assertThrows(ExecutionException.class, () -> completable.get(5, TimeUnit.SECONDS));
assertThat(x.getCause(), instanceOf(ClosedChannelException.class));
}
}
@ -826,8 +823,7 @@ public class Socks5ProxyTest
String serverHost = "127.0.0.13";
int serverPort = proxyPort + 1; // Any port will do
Request request = client.newRequest(serverHost, serverPort);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (SocketChannel channel = proxy.accept())
{
@ -870,7 +866,7 @@ public class Socks5ProxyTest
channel.close();
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(5, TimeUnit.SECONDS));
ExecutionException x = assertThrows(ExecutionException.class, () -> completable.get(5, TimeUnit.SECONDS));
assertThat(x.getCause(), instanceOf(ClosedChannelException.class));
}
}
@ -885,8 +881,7 @@ public class Socks5ProxyTest
String serverHost = "127.0.0.13";
int serverPort = proxyPort + 1; // Any port will do
Request request = client.newRequest(serverHost, serverPort);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (SocketChannel channel = proxy.accept())
{
@ -912,7 +907,7 @@ public class Socks5ProxyTest
channel.close();
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(5, TimeUnit.SECONDS));
ExecutionException x = assertThrows(ExecutionException.class, () -> completable.get(5, TimeUnit.SECONDS));
assertThat(x.getCause(), instanceOf(ClosedChannelException.class));
}
}
@ -927,14 +922,13 @@ public class Socks5ProxyTest
String serverHost = "127.0.0.13";
int serverPort = proxyPort + 1; // Any port will do
Request request = client.newRequest(serverHost, serverPort);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (SocketChannel channel = proxy.accept())
{
channel.write(ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}));
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(5, TimeUnit.SECONDS));
ExecutionException x = assertThrows(ExecutionException.class, () -> completable.get(5, TimeUnit.SECONDS));
assertThat(x.getCause(), instanceOf(IOException.class));
}
}
@ -948,8 +942,7 @@ public class Socks5ProxyTest
String serverHost = "127.0.0.13";
int serverPort = proxyPort + 1; // Any port will do
Request request = client.newRequest(serverHost, serverPort);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (SocketChannel channel = proxy.accept())
{
@ -974,7 +967,7 @@ public class Socks5ProxyTest
Socks5.VERSION, 1, Socks5.RESERVED, Socks5.ADDRESS_TYPE_IPV4, 127, 0, 0, 8, 29, 29
}));
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(5, TimeUnit.SECONDS));
ExecutionException x = assertThrows(ExecutionException.class, () -> completable.get(5, TimeUnit.SECONDS));
assertThat(x.getCause(), instanceOf(IOException.class));
}
}

View File

@ -19,6 +19,7 @@ import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
@ -95,8 +96,7 @@ public class TLSServerConnectionCloseTest
startClient();
Request request = client.newRequest("localhost", port).scheme("https").path("/ctx/path");
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (Socket socket = server.accept())
{
@ -159,7 +159,7 @@ public class TLSServerConnectionCloseTest
}
}
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
// Give some time to process the connection.

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.client;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -149,23 +150,21 @@ public class ValidatingConnectionPoolTest extends AbstractHttpClientServerTest
r.abort(x);
}
});
FutureResponseListener listener1 = new FutureResponseListener(request1);
request1.send(listener1);
CompletableFuture<ContentResponse> completable1 = new CompletableResponseListener(request1).send();
Request request2 = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/two");
FutureResponseListener listener2 = new FutureResponseListener(request2);
request2.send(listener2);
CompletableFuture<ContentResponse> completable2 = new CompletableResponseListener(request2).send();
// Now we have one request about to be sent, and one queued.
latch.countDown();
ContentResponse response1 = listener1.get(5, TimeUnit.SECONDS);
ContentResponse response1 = completable1.get(5, TimeUnit.SECONDS);
assertEquals(200, response1.getStatus());
ContentResponse response2 = listener2.get(5, TimeUnit.SECONDS);
ContentResponse response2 = completable2.get(5, TimeUnit.SECONDS);
assertEquals(200, response2.getStatus());
}
}

View File

@ -14,17 +14,20 @@
package org.eclipse.jetty.client.http;
import java.io.EOFException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.BufferingResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.Response;
import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.client.internal.HttpContentResponse;
import org.eclipse.jetty.client.transport.HttpDestination;
import org.eclipse.jetty.client.transport.HttpExchange;
import org.eclipse.jetty.client.transport.HttpRequest;
@ -89,17 +92,28 @@ public class HttpReceiverOverHTTPTest
client.stop();
}
protected FutureResponseListener startExchange()
protected CompletableFuture<ContentResponse> startExchange()
{
HttpRequest request = (HttpRequest)client.newRequest("http://localhost");
FutureResponseListener listener = new FutureResponseListener(request);
CompletableFuture<ContentResponse> completable = new CompletableFuture<>();
BufferingResponseListener listener = new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
completable.completeExceptionally(result.getFailure());
else
completable.complete(new HttpContentResponse(result.getResponse(), getContent(), getMediaType(), getEncoding()));
}
};
request.getResponseListeners().addListener(listener);
HttpExchange exchange = new HttpExchange(destination, request);
boolean associated = connection.getHttpChannel().associate(exchange);
assertTrue(associated);
exchange.requestComplete(null);
exchange.terminateRequest();
return listener;
return completable;
}
@ParameterizedTest
@ -107,14 +121,15 @@ public class HttpReceiverOverHTTPTest
public void testReceiveNoResponseContent(HttpCompliance compliance) throws Exception
{
init(compliance);
endPoint.addInput(
"HTTP/1.1 200 OK\r\n" +
"Content-length: 0\r\n" +
"\r\n");
FutureResponseListener listener = startExchange();
endPoint.addInput("""
HTTP/1.1 200 OK
Content-length: 0
""");
CompletableFuture<ContentResponse> completable = startExchange();
connection.getHttpChannel().receive();
Response response = listener.get(5, TimeUnit.SECONDS);
Response response = completable.get(5, TimeUnit.SECONDS);
assertNotNull(response);
assertEquals(200, response.getStatus());
assertEquals("OK", response.getReason());
@ -133,13 +148,13 @@ public class HttpReceiverOverHTTPTest
String content = "0123456789ABCDEF";
endPoint.addInput(
"HTTP/1.1 200 OK\r\n" +
"Content-length: " + content.length() + "\r\n" +
"\r\n" +
content);
FutureResponseListener listener = startExchange();
"Content-length: " + content.length() + "\r\n" +
"\r\n" +
content);
CompletableFuture<ContentResponse> completable = startExchange();
connection.getHttpChannel().receive();
Response response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertNotNull(response);
assertEquals(200, response.getStatus());
assertEquals("OK", response.getReason());
@ -148,7 +163,7 @@ public class HttpReceiverOverHTTPTest
assertNotNull(headers);
assertEquals(1, headers.size());
assertEquals(String.valueOf(content.length()), headers.get(HttpHeader.CONTENT_LENGTH));
String received = listener.getContentAsString(StandardCharsets.UTF_8);
String received = response.getContentAsString();
assertEquals(content, received);
}
@ -161,15 +176,15 @@ public class HttpReceiverOverHTTPTest
String content2 = "ABCDEF";
endPoint.addInput(
"HTTP/1.1 200 OK\r\n" +
"Content-length: " + (content1.length() + content2.length()) + "\r\n" +
"\r\n" +
content1);
FutureResponseListener listener = startExchange();
"Content-length: " + (content1.length() + content2.length()) + "\r\n" +
"\r\n" +
content1);
CompletableFuture<ContentResponse> completable = startExchange();
connection.getHttpChannel().receive();
endPoint.addInputEOF();
connection.getHttpChannel().receive();
ExecutionException e = assertThrows(ExecutionException.class, () -> listener.get(5, TimeUnit.SECONDS));
ExecutionException e = assertThrows(ExecutionException.class, () -> completable.get(5, TimeUnit.SECONDS));
assertThat(e.getCause(), instanceOf(EOFException.class));
}
@ -178,11 +193,12 @@ public class HttpReceiverOverHTTPTest
public void testReceiveResponseContentIdleTimeout(HttpCompliance compliance) throws Exception
{
init(compliance);
endPoint.addInput(
"HTTP/1.1 200 OK\r\n" +
"Content-length: 1\r\n" +
"\r\n");
FutureResponseListener listener = startExchange();
endPoint.addInput("""
HTTP/1.1 200 OK
Content-length: 1
""");
CompletableFuture<ContentResponse> completable = startExchange();
connection.getHttpChannel().receive();
// ByteArrayEndPoint has an idle timeout of 0 by default,
// so to simulate an idle timeout is enough to wait a bit.
@ -190,7 +206,7 @@ public class HttpReceiverOverHTTPTest
TimeoutException timeoutException = new TimeoutException();
connection.onIdleExpired(timeoutException);
ExecutionException e = assertThrows(ExecutionException.class, () -> listener.get(5, TimeUnit.SECONDS));
ExecutionException e = assertThrows(ExecutionException.class, () -> completable.get(5, TimeUnit.SECONDS));
assertThat(e.getCause(), instanceOf(TimeoutException.class));
assertThat(e.getCause(), sameInstance(timeoutException));
}
@ -200,14 +216,15 @@ public class HttpReceiverOverHTTPTest
public void testReceiveBadResponse(HttpCompliance compliance) throws Exception
{
init(compliance);
endPoint.addInput(
"HTTP/1.1 200 OK\r\n" +
"Content-length: A\r\n" +
"\r\n");
FutureResponseListener listener = startExchange();
endPoint.addInput("""
HTTP/1.1 200 OK
Content-length: A
""");
CompletableFuture<ContentResponse> completable = startExchange();
connection.getHttpChannel().receive();
ExecutionException e = assertThrows(ExecutionException.class, () -> listener.get(5, TimeUnit.SECONDS));
ExecutionException e = assertThrows(ExecutionException.class, () -> completable.get(5, TimeUnit.SECONDS));
assertThat(e.getCause(), instanceOf(HttpResponseException.class));
assertThat(e.getCause().getCause(), instanceOf(BadMessageException.class));
assertThat(e.getCause().getCause().getCause(), instanceOf(NumberFormatException.class));
@ -248,15 +265,16 @@ public class HttpReceiverOverHTTPTest
endPoint.setConnection(connection);
// Partial response to trigger the call to fillInterested().
endPoint.addInput(
"HTTP/1.1 200 OK\r\n" +
"Content-Length: 1\r\n" +
"\r\n");
endPoint.addInput("""
HTTP/1.1 200 OK
Content-Length: 1
""");
FutureResponseListener listener = startExchange();
CompletableFuture<ContentResponse> completable = startExchange();
connection.getHttpChannel().receive();
Response response = listener.get(5, TimeUnit.SECONDS);
Response response = completable.get(5, TimeUnit.SECONDS);
assertNotNull(response);
assertEquals(200, response.getStatus());
}

View File

@ -17,10 +17,11 @@ import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.fcgi.server.ServerFCGIConnectionFactory;
@ -175,10 +176,9 @@ public class FastCGIProxyHandlerTest
}
})
.path(proxyContext.getContextPath() + path);
FutureResponseListener listener = new FutureResponseListener(request, length);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request, length).send();
ContentResponse response = listener.get(30, TimeUnit.SECONDS);
ContentResponse response = completable.get(30, TimeUnit.SECONDS);
assertEquals(200, response.getStatus());
assertArrayEquals(data, response.getContent());

View File

@ -23,6 +23,7 @@ import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -32,10 +33,10 @@ import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.client.AsyncRequestContent;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.Response;
@ -157,8 +158,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
client.start();
Request request = client.newRequest("localhost", serverChannel.socket().getLocalPort());
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
try (SocketChannel channel = serverChannel.accept())
{
@ -186,7 +186,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
.flip();
channel.write(responseByteBuffer);
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(200, response.getStatus());
String content = response.getContentAsString();
assertEquals("hello world\n", content);
@ -214,9 +214,8 @@ public class HttpClientTest extends AbstractHttpClientServerTest
});
Request request = client.newRequest(scheme + "://localhost:" + connector.getLocalPort());
FutureResponseListener listener = new FutureResponseListener(request, data.length);
request.send(listener);
ContentResponse response = listener.get(15, TimeUnit.SECONDS);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request, data.length).send();
ContentResponse response = completable.get(15, TimeUnit.SECONDS);
assertNotNull(response);
assertEquals(200, response.getStatus());
byte[] content = response.getContent();
@ -748,13 +747,12 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.body(content);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Wait some time to simulate a slow request.
Thread.sleep(1000);
content.close();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(200, response.getStatus());
assertArrayEquals(data, response.getContent());

View File

@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -31,9 +32,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.InputStreamResponseListener;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.Response;
@ -129,10 +130,10 @@ public class HttpClientTest extends AbstractTest
}
});
org.eclipse.jetty.client.Request request = client.newRequest(newURI(transport));
FutureResponseListener listener = new FutureResponseListener(request, length);
request.timeout(10, TimeUnit.SECONDS).send(listener);
ContentResponse response = listener.get();
var request = client.newRequest(newURI(transport))
.timeout(10, TimeUnit.SECONDS);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request, length).send();
ContentResponse response = completable.get();
assertEquals(200, response.getStatus());
assertArrayEquals(bytes, response.getContent());
@ -173,10 +174,10 @@ public class HttpClientTest extends AbstractTest
}
});
org.eclipse.jetty.client.Request request = client.newRequest(newURI(transport));
FutureResponseListener listener = new FutureResponseListener(request, 2 * length);
request.timeout(10, TimeUnit.SECONDS).send(listener);
ContentResponse response = listener.get();
var request = client.newRequest(newURI(transport))
.timeout(10, TimeUnit.SECONDS);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request, 2 * length).send();
ContentResponse response = completable.get();
assertEquals(200, response.getStatus());
assertArrayEquals(bytes, response.getContent());
@ -306,19 +307,17 @@ public class HttpClientTest extends AbstractTest
});
// Make a request with a large enough response buffer.
org.eclipse.jetty.client.Request request = client.newRequest(newURI(transport));
FutureResponseListener listener = new FutureResponseListener(request, length);
request.send(listener);
ContentResponse response = listener.get(15, TimeUnit.SECONDS);
var request = client.newRequest(newURI(transport));
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request, length).send();
ContentResponse response = completable.get(15, TimeUnit.SECONDS);
assertEquals(response.getStatus(), 200);
// Make a request with a small response buffer, should fail.
try
{
request = client.newRequest(newURI(transport));
listener = new FutureResponseListener(request, length / 10);
request.send(listener);
listener.get(15, TimeUnit.SECONDS);
completable = new CompletableResponseListener(request, length / 10).send();
completable.get(15, TimeUnit.SECONDS);
fail("Expected ExecutionException");
}
catch (ExecutionException x)
@ -328,9 +327,8 @@ public class HttpClientTest extends AbstractTest
// Verify that we can make another request.
request = client.newRequest(newURI(transport));
listener = new FutureResponseListener(request, length);
request.send(listener);
response = listener.get(15, TimeUnit.SECONDS);
completable = new CompletableResponseListener(request, length).send();
response = completable.get(15, TimeUnit.SECONDS);
assertEquals(response.getStatus(), 200);
}
@ -637,11 +635,10 @@ public class HttpClientTest extends AbstractTest
}
});
org.eclipse.jetty.client.Request request = client.newRequest(newURI(transport))
var request = client.newRequest(newURI(transport))
.method(HttpMethod.HEAD);
FutureResponseListener listener = new FutureResponseListener(request, length / 2);
request.send(listener);
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request, length / 2).send();
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(0, response.getContent().length);
@ -1091,7 +1088,7 @@ public class HttpClientTest extends AbstractTest
}
@Override
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
response.getHeaders().put(HttpHeader.CONTENT_TYPE, "text/plain");

View File

@ -23,8 +23,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.eclipse.jetty.client.AsyncRequestContent;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Content;
@ -123,8 +123,7 @@ public class ServerTimeoutsTest extends AbstractTest
.onResponseSuccess(s ->
content.close())
.body(content);
FutureResponseListener futureResponse = new FutureResponseListener(request);
request.send(futureResponse);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Demand is invoked by the idle timeout
assertTrue(demanded.await(2 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS));
@ -144,7 +143,7 @@ public class ServerTimeoutsTest extends AbstractTest
// Complete the callback as the error listener promised.
callbackRef.get().failed(cause);
ContentResponse response = futureResponse.get(IDLE_TIMEOUT / 2, TimeUnit.MILLISECONDS);
ContentResponse response = completable.get(IDLE_TIMEOUT / 2, TimeUnit.MILLISECONDS);
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
assertThat(response.getContentAsString(), containsStringIgnoringCase("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout"));
@ -175,15 +174,14 @@ public class ServerTimeoutsTest extends AbstractTest
org.eclipse.jetty.client.Request request = client.newRequest(newURI(transport))
.timeout(IDLE_TIMEOUT * 5, TimeUnit.MILLISECONDS);
FutureResponseListener futureResponse = new FutureResponseListener(request);
request.send(futureResponse);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Get the callback as promised by the error listener.
Callback callback = callbackOnTimeout.get(3 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS);
assertNotNull(callback);
Content.Sink.write(responseRef.get(), true, "OK", callback);
ContentResponse response = futureResponse.get(IDLE_TIMEOUT / 2, TimeUnit.MILLISECONDS);
ContentResponse response = completable.get(IDLE_TIMEOUT / 2, TimeUnit.MILLISECONDS);
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(response.getContentAsString(), is("OK"));
}

View File

@ -16,10 +16,11 @@ package org.eclipse.jetty.test.client.transport;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.InputStreamResponseListener;
import org.eclipse.jetty.client.OutputStreamRequestContent;
import org.eclipse.jetty.client.StringRequestContent;
@ -146,10 +147,9 @@ public class TrailersTest extends AbstractTest
.headers(headers -> headers.put(HttpHeader.TRAILER, trailerName))
.body(new StringRequestContent(content))
.trailersSupplier(() -> HttpFields.build().put(trailerName, trailerValue));
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(content, response.getContentAsString());
assertEquals(trailerValue, response.getTrailers().get(trailerName));

View File

@ -0,0 +1,96 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.test.client.transport;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.OutputStreamRequestContent;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
public class ZippedRequestContentTest extends AbstractTest
{
@ParameterizedTest
@MethodSource("transportsNoFCGI")
public void testZippedRequestContent(Transport transport) throws Exception
{
start(transport, new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
InputStream input = Content.Source.asInputStream(request);
try (ZipInputStream zipInput = new ZipInputStream(input))
{
ZipEntry zipEntry1 = zipInput.getNextEntry();
assertNotNull(zipEntry1);
assertEquals("first.txt", zipEntry1.getName());
IO.copy(zipInput, OutputStream.nullOutputStream());
ZipEntry zipEntry2 = zipInput.getNextEntry();
assertNotNull(zipEntry2);
assertEquals("second.txt", zipEntry2.getName());
IO.copy(zipInput, OutputStream.nullOutputStream());
assertNull(zipInput.getNextEntry());
IO.copy(input, OutputStream.nullOutputStream());
}
response.setStatus(HttpStatus.OK_200);
callback.succeeded();
return true;
}
});
OutputStreamRequestContent content = new OutputStreamRequestContent();
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(
client.newRequest(newURI(transport))
.method(HttpMethod.POST)
.body(content)
).send();
OutputStream output = content.getOutputStream();
try (ZipOutputStream zipOutput = new ZipOutputStream(output))
{
zipOutput.putNextEntry(new ZipEntry("first.txt"));
zipOutput.write("Hello!".repeat(128).getBytes(StandardCharsets.UTF_8));
zipOutput.closeEntry();
zipOutput.putNextEntry(new ZipEntry("second.txt"));
zipOutput.write("Jetty!".repeat(128).getBytes(StandardCharsets.UTF_8));
zipOutput.closeEntry();
}
ContentResponse response = completable.get(15, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
}
}

View File

@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ -54,8 +55,8 @@ import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.AsyncRequestContent;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.Request;
@ -396,17 +397,16 @@ public class AsyncMiddleManServletTest
startClient();
AsyncRequestContent content = new AsyncRequestContent();
Request request = client.newRequest("localhost", serverConnector.getLocalPort());
FutureResponseListener listener = new FutureResponseListener(request);
request.headers(headers -> headers.put(HttpHeader.CONTENT_ENCODING, HttpHeaderValue.GZIP))
.body(content)
.send(listener);
Request request = client.newRequest("localhost", serverConnector.getLocalPort())
.headers(headers -> headers.put(HttpHeader.CONTENT_ENCODING, HttpHeaderValue.GZIP))
.body(content);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
byte[] bytes = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8);
content.write(ByteBuffer.wrap(gzip(bytes)), Callback.NOOP);
sleep(1000);
content.close();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(200, response.getStatus());
assertArrayEquals(bytes, response.getContent());
}
@ -1367,8 +1367,7 @@ public class AsyncMiddleManServletTest
Request request = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.body(content);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Send one chunk of content, the proxy request must not be sent.
ByteBuffer chunk1 = ByteBuffer.allocate(1024);
@ -1384,7 +1383,7 @@ public class AsyncMiddleManServletTest
content.close();
assertTrue(proxyRequestLatch.await(1, TimeUnit.SECONDS));
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(0, response.getContent().length);
}
@ -1415,8 +1414,7 @@ public class AsyncMiddleManServletTest
Request request = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.body(content);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Send one chunk of content, the proxy request must not be sent.
ByteBuffer chunk1 = ByteBuffer.allocate(1024);
@ -1432,7 +1430,7 @@ public class AsyncMiddleManServletTest
content.close();
assertTrue(proxyRequestLatch.await(1, TimeUnit.SECONDS));
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(chunk1.capacity() + chunk2.capacity(), response.getContent().length);
}
@ -1486,8 +1484,7 @@ public class AsyncMiddleManServletTest
Request request = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.body(content);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Send one chunk of content, the proxy request must not be sent.
ByteBuffer chunk1 = ByteBuffer.allocate(1024);
@ -1502,7 +1499,7 @@ public class AsyncMiddleManServletTest
// Finish the content.
content.close();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(chunk1.capacity() + chunk2.capacity(), response.getContent().length);
}

View File

@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
import java.security.Principal;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -33,10 +34,10 @@ import javax.net.ssl.X509ExtendedKeyManager;
import jakarta.servlet.ServletException;
import org.eclipse.jetty.client.BasicAuthentication;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.Origin;
@ -325,9 +326,8 @@ public class ForwardProxyTLSServerTest
.body(new StringRequestContent(body2));
// Make sure the second connection can send the exchange via the tunnel
FutureResponseListener listener2 = new FutureResponseListener(request2);
connection.get().send(request2, listener2);
ContentResponse response2 = listener2.get(5, TimeUnit.SECONDS);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request2).send(connection.get());
ContentResponse response2 = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response2.getStatus());
String content2 = response2.getContentAsString();

View File

@ -44,9 +44,9 @@ import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.AsyncRequestContent;
import org.eclipse.jetty.client.BufferingResponseListener;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.InputStreamRequestContent;
import org.eclipse.jetty.client.OutputStreamRequestContent;
import org.eclipse.jetty.client.Response;
@ -1523,8 +1523,7 @@ public class AsyncIOServletTest extends AbstractTest
.method(HttpMethod.POST)
.body(body)
.timeout(15, TimeUnit.SECONDS);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
assertTrue(bodyLatch.await(5, TimeUnit.SECONDS));
@ -1536,7 +1535,7 @@ public class AsyncIOServletTest extends AbstractTest
// Complete the body.
body.close();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus());
}

View File

@ -24,6 +24,7 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -37,9 +38,9 @@ import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.AsyncRequestContent;
import org.eclipse.jetty.client.BufferingResponseListener;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.ContinueProtocolHandler;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.Response;
import org.eclipse.jetty.client.Result;
@ -749,8 +750,7 @@ public class HttpClientContinueTest extends AbstractTest
Request clientRequest = client.newRequest("localhost", server.getLocalPort())
.body(new BytesRequestContent(bytes))
.timeout(5, TimeUnit.SECONDS);
FutureResponseListener listener = new FutureResponseListener(clientRequest);
clientRequest.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(clientRequest).send();
try (Socket socket = server.accept())
{
@ -773,7 +773,7 @@ public class HttpClientContinueTest extends AbstractTest
output.flush();
}
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertArrayEquals(bytes, response.getContent());
}

View File

@ -0,0 +1,93 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.ee10.test.client.transport;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.OutputStreamRequestContent;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.IO;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
public class ZippedRequestContentTest extends AbstractTest
{
@ParameterizedTest
@MethodSource("transportsNoFCGI")
public void testZippedRequestContent(Transport transport) throws Exception
{
start(transport, new HttpServlet()
{
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException
{
InputStream input = req.getInputStream();
try (ZipInputStream zipInput = new ZipInputStream(input))
{
ZipEntry zipEntry1 = zipInput.getNextEntry();
assertNotNull(zipEntry1);
assertEquals("first.txt", zipEntry1.getName());
IO.copy(zipInput, OutputStream.nullOutputStream());
ZipEntry zipEntry2 = zipInput.getNextEntry();
assertNotNull(zipEntry2);
assertEquals("second.txt", zipEntry2.getName());
IO.copy(zipInput, OutputStream.nullOutputStream());
assertNull(zipInput.getNextEntry());
IO.copy(input, OutputStream.nullOutputStream());
}
resp.setStatus(HttpStatus.OK_200);
}
});
OutputStreamRequestContent content = new OutputStreamRequestContent();
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(
client.newRequest(newURI(transport))
.method(HttpMethod.POST)
.body(content)
).send();
OutputStream output = content.getOutputStream();
try (ZipOutputStream zipOutput = new ZipOutputStream(output))
{
zipOutput.putNextEntry(new ZipEntry("first.txt"));
zipOutput.write("Hello!".repeat(128).getBytes(StandardCharsets.UTF_8));
zipOutput.closeEntry();
zipOutput.putNextEntry(new ZipEntry("second.txt"));
zipOutput.write("Jetty!".repeat(128).getBytes(StandardCharsets.UTF_8));
zipOutput.closeEntry();
}
ContentResponse response = completable.get(15, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
}
}

View File

@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ -54,8 +55,8 @@ import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.AsyncRequestContent;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.Request;
@ -402,17 +403,16 @@ public class AsyncMiddleManServletTest
startClient();
AsyncRequestContent content = new AsyncRequestContent();
Request request = client.newRequest("localhost", serverConnector.getLocalPort());
FutureResponseListener listener = new FutureResponseListener(request);
request.headers(headers -> headers.put(HttpHeader.CONTENT_ENCODING, HttpHeaderValue.GZIP))
.body(content)
.send(listener);
Request request = client.newRequest("localhost", serverConnector.getLocalPort())
.headers(headers -> headers.put(HttpHeader.CONTENT_ENCODING, HttpHeaderValue.GZIP))
.body(content);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
byte[] bytes = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8);
content.write(ByteBuffer.wrap(gzip(bytes)), Callback.NOOP);
sleep(1000);
content.close();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(200, response.getStatus());
assertArrayEquals(bytes, response.getContent());
}
@ -1373,8 +1373,7 @@ public class AsyncMiddleManServletTest
Request request = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.body(content);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Send one chunk of content, the proxy request must not be sent.
ByteBuffer chunk1 = ByteBuffer.allocate(1024);
@ -1390,7 +1389,7 @@ public class AsyncMiddleManServletTest
content.close();
assertTrue(proxyRequestLatch.await(1, TimeUnit.SECONDS));
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(0, response.getContent().length);
}
@ -1421,8 +1420,7 @@ public class AsyncMiddleManServletTest
Request request = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.body(content);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Send one chunk of content, the proxy request must not be sent.
ByteBuffer chunk1 = ByteBuffer.allocate(1024);
@ -1438,7 +1436,7 @@ public class AsyncMiddleManServletTest
content.close();
assertTrue(proxyRequestLatch.await(1, TimeUnit.SECONDS));
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(chunk1.capacity() + chunk2.capacity(), response.getContent().length);
}
@ -1492,8 +1490,7 @@ public class AsyncMiddleManServletTest
Request request = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.body(content);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Send one chunk of content, the proxy request must not be sent.
ByteBuffer chunk1 = ByteBuffer.allocate(1024);
@ -1508,7 +1505,7 @@ public class AsyncMiddleManServletTest
// Finish the content.
content.close();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(chunk1.capacity() + chunk2.capacity(), response.getContent().length);
}

View File

@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
import java.security.Principal;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -32,10 +33,10 @@ import javax.net.ssl.X509ExtendedKeyManager;
import jakarta.servlet.ServletException;
import org.eclipse.jetty.client.BasicAuthentication;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.Origin;
@ -322,9 +323,8 @@ public class ForwardProxyTLSServerTest
.body(new StringRequestContent(body2));
// Make sure the second connection can send the exchange via the tunnel
FutureResponseListener listener2 = new FutureResponseListener(request2);
connection.get().send(request2, listener2);
ContentResponse response2 = listener2.get(5, TimeUnit.SECONDS);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request2).send(connection.get());
ContentResponse response2 = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response2.getStatus());
String content2 = response2.getContentAsString();

View File

@ -24,6 +24,7 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -37,9 +38,9 @@ import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.AsyncRequestContent;
import org.eclipse.jetty.client.BufferingResponseListener;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.ContinueProtocolHandler;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.Response;
import org.eclipse.jetty.client.Result;
@ -749,8 +750,7 @@ public class HttpClientContinueTest extends AbstractTest
Request clientRequest = client.newRequest("localhost", server.getLocalPort())
.body(new BytesRequestContent(bytes))
.timeout(5, TimeUnit.SECONDS);
FutureResponseListener listener = new FutureResponseListener(clientRequest);
clientRequest.send(listener);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(clientRequest).send();
try (Socket socket = server.accept())
{
@ -773,7 +773,7 @@ public class HttpClientContinueTest extends AbstractTest
output.flush();
}
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
ContentResponse response = completable.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertArrayEquals(bytes, response.getContent());
}