Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2019-12-16 18:04:11 +01:00
commit 6f2ac512dc
3 changed files with 105 additions and 71 deletions

View File

@ -510,21 +510,12 @@ public class HttpRequest implements Request
@Override @Override
public Request onResponseContent(final Response.ContentListener listener) public Request onResponseContent(final Response.ContentListener listener)
{ {
this.responseListeners.add(new Response.DemandedContentListener() this.responseListeners.add(new Response.ContentListener()
{ {
@Override @Override
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) public void onContent(Response response, ByteBuffer content)
{
try
{ {
listener.onContent(response, content); listener.onContent(response, content);
callback.succeeded();
demand.accept(1);
}
catch (Throwable x)
{
callback.failed(x);
}
} }
}); });
return this; return this;
@ -533,16 +524,12 @@ public class HttpRequest implements Request
@Override @Override
public Request onResponseContentAsync(final Response.AsyncContentListener listener) public Request onResponseContentAsync(final Response.AsyncContentListener listener)
{ {
this.responseListeners.add(new Response.DemandedContentListener() this.responseListeners.add(new Response.AsyncContentListener()
{ {
@Override @Override
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) public void onContent(Response response, ByteBuffer content, Callback callback)
{ {
listener.onContent(response, content, Callback.from(() -> listener.onContent(response, content, callback);
{
callback.succeeded();
demand.accept(1);
}, callback::failed));
} }
}); });
return this; return this;

View File

@ -85,14 +85,14 @@ public interface Response
/** /**
* Common, empty, super-interface for response listeners * Common, empty, super-interface for response listeners
*/ */
public interface ResponseListener extends EventListener interface ResponseListener extends EventListener
{ {
} }
/** /**
* Listener for the response begin event. * Listener for the response begin event.
*/ */
public interface BeginListener extends ResponseListener interface BeginListener extends ResponseListener
{ {
/** /**
* Callback method invoked when the response line containing HTTP version, * Callback method invoked when the response line containing HTTP version,
@ -102,13 +102,13 @@ public interface Response
* *
* @param response the response containing the response line data * @param response the response containing the response line data
*/ */
public void onBegin(Response response); void onBegin(Response response);
} }
/** /**
* Listener for a response header event. * Listener for a response header event.
*/ */
public interface HeaderListener extends ResponseListener interface HeaderListener extends ResponseListener
{ {
/** /**
* Callback method invoked when a response header has been received and parsed, * Callback method invoked when a response header has been received and parsed,
@ -118,20 +118,20 @@ public interface Response
* @param field the header received * @param field the header received
* @return true to process the header, false to skip processing of the header * @return true to process the header, false to skip processing of the header
*/ */
public boolean onHeader(Response response, HttpField field); boolean onHeader(Response response, HttpField field);
} }
/** /**
* Listener for the response headers event. * Listener for the response headers event.
*/ */
public interface HeadersListener extends ResponseListener interface HeadersListener extends ResponseListener
{ {
/** /**
* Callback method invoked when all the response headers have been received and parsed. * Callback method invoked when all the response headers have been received and parsed.
* *
* @param response the response containing the response line data and the headers * @param response the response containing the response line data and the headers
*/ */
public void onHeaders(Response response); void onHeaders(Response response);
} }
/** /**
@ -139,7 +139,7 @@ public interface Response
* *
* @see AsyncContentListener * @see AsyncContentListener
*/ */
public interface ContentListener extends ResponseListener interface ContentListener extends AsyncContentListener
{ {
/** /**
* Callback method invoked when the response content has been received, parsed and there is demand. * Callback method invoked when the response content has been received, parsed and there is demand.
@ -149,7 +149,21 @@ public interface Response
* @param response the response containing the response line data and the headers * @param response the response containing the response line data and the headers
* @param content the content bytes received * @param content the content bytes received
*/ */
public void onContent(Response response, ByteBuffer content); void onContent(Response response, ByteBuffer content);
@Override
default void onContent(Response response, ByteBuffer content, Callback callback)
{
try
{
onContent(response, content);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
} }
/** /**
@ -157,7 +171,7 @@ public interface Response
* *
* @see DemandedContentListener * @see DemandedContentListener
*/ */
public interface AsyncContentListener extends ResponseListener interface AsyncContentListener extends DemandedContentListener
{ {
/** /**
* Callback method invoked when the response content has been received, parsed and there is demand. * Callback method invoked when the response content has been received, parsed and there is demand.
@ -168,13 +182,23 @@ public interface Response
* @param content the content bytes received * @param content the content bytes received
* @param callback the callback to call when the content is consumed and to demand more content * @param callback the callback to call when the content is consumed and to demand more content
*/ */
public void onContent(Response response, ByteBuffer content, Callback callback); void onContent(Response response, ByteBuffer content, Callback callback);
@Override
default void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
{
onContent(response, content, Callback.from(() ->
{
callback.succeeded();
demand.accept(1);
}, callback::failed));
}
} }
/** /**
* Asynchronous listener for the response content events. * Asynchronous listener for the response content events.
*/ */
public interface DemandedContentListener extends ResponseListener interface DemandedContentListener extends ResponseListener
{ {
/** /**
* Callback method invoked before response content events. * Callback method invoked before response content events.
@ -186,7 +210,7 @@ public interface Response
* @param response the response containing the response line data and the headers * @param response the response containing the response line data and the headers
* @param demand the object that allows to demand content buffers * @param demand the object that allows to demand content buffers
*/ */
public default void onBeforeContent(Response response, LongConsumer demand) default void onBeforeContent(Response response, LongConsumer demand)
{ {
demand.accept(1); demand.accept(1);
} }
@ -203,26 +227,26 @@ public interface Response
* @param content the content bytes received * @param content the content bytes received
* @param callback the callback to call when the content is consumed * @param callback the callback to call when the content is consumed
*/ */
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback); void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback);
} }
/** /**
* Listener for the response succeeded event. * Listener for the response succeeded event.
*/ */
public interface SuccessListener extends ResponseListener interface SuccessListener extends ResponseListener
{ {
/** /**
* Callback method invoked when the whole response has been successfully received. * Callback method invoked when the whole response has been successfully received.
* *
* @param response the response containing the response line data and the headers * @param response the response containing the response line data and the headers
*/ */
public void onSuccess(Response response); void onSuccess(Response response);
} }
/** /**
* Listener for the response failure event. * Listener for the response failure event.
*/ */
public interface FailureListener extends ResponseListener interface FailureListener extends ResponseListener
{ {
/** /**
* Callback method invoked when the response has failed in the process of being received * Callback method invoked when the response has failed in the process of being received
@ -230,13 +254,13 @@ public interface Response
* @param response the response containing data up to the point the failure happened * @param response the response containing data up to the point the failure happened
* @param failure the failure happened * @param failure the failure happened
*/ */
public void onFailure(Response response, Throwable failure); void onFailure(Response response, Throwable failure);
} }
/** /**
* Listener for the request and response completed event. * Listener for the request and response completed event.
*/ */
public interface CompleteListener extends ResponseListener interface CompleteListener extends ResponseListener
{ {
/** /**
* Callback method invoked when the request <em><b>and</b></em> the response have been processed, * Callback method invoked when the request <em><b>and</b></em> the response have been processed,
@ -252,13 +276,13 @@ public interface Response
* *
* @param result the result of the request / response exchange * @param result the result of the request / response exchange
*/ */
public void onComplete(Result result); void onComplete(Result result);
} }
/** /**
* Listener for all response events. * Listener for all response events.
*/ */
public interface Listener extends BeginListener, HeaderListener, HeadersListener, ContentListener, AsyncContentListener, DemandedContentListener, SuccessListener, FailureListener, CompleteListener interface Listener extends BeginListener, HeaderListener, HeadersListener, ContentListener, SuccessListener, FailureListener, CompleteListener
{ {
@Override @Override
public default void onBegin(Response response) public default void onBegin(Response response)
@ -276,41 +300,11 @@ public interface Response
{ {
} }
@Override
public default void onBeforeContent(Response response, LongConsumer demand)
{
demand.accept(1);
}
@Override @Override
public default void onContent(Response response, ByteBuffer content) public default void onContent(Response response, ByteBuffer content)
{ {
} }
@Override
public default void onContent(Response response, ByteBuffer content, Callback callback)
{
try
{
onContent(response, content);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
@Override
public default void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
{
onContent(response, content, Callback.from(() ->
{
callback.succeeded();
demand.accept(1);
}, callback::failed));
}
@Override @Override
public default void onSuccess(Response response) public default void onSuccess(Response response)
{ {
@ -329,7 +323,7 @@ public interface Response
/** /**
* An empty implementation of {@link Listener} * An empty implementation of {@link Listener}
*/ */
public static class Adapter implements Listener class Adapter implements Listener
{ {
} }
} }

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -84,6 +85,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.Net; import org.eclipse.jetty.toolchain.test.Net;
import org.eclipse.jetty.toolchain.test.jupiter.WorkDir; import org.eclipse.jetty.toolchain.test.jupiter.WorkDir;
import org.eclipse.jetty.toolchain.test.jupiter.WorkDirExtension; import org.eclipse.jetty.toolchain.test.jupiter.WorkDirExtension;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
@ -1784,6 +1786,57 @@ public class HttpClientTest extends AbstractHttpClientServerTest
} }
} }
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testContentListenerAsCompleteListener(Scenario scenario) throws Exception
{
byte[] bytes = new byte[1024];
new Random().nextBytes(bytes);
start(scenario, new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletOutputStream output = response.getOutputStream();
output.write(bytes);
}
});
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CountDownLatch latch = new CountDownLatch(1);
class L implements Response.ContentListener, Response.CompleteListener
{
@Override
public void onContent(Response response, ByteBuffer content)
{
try
{
BufferUtil.writeTo(content, baos);
}
catch (IOException x)
{
baos.reset();
x.printStackTrace();
}
}
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
latch.countDown();
}
}
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.send(new L());
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertArrayEquals(bytes, baos.toByteArray());
}
private void assertCopyRequest(Request original) private void assertCopyRequest(Request original)
{ {
Request copy = client.copyRequest((HttpRequest)original, original.getURI()); Request copy = client.copyRequest((HttpRequest)original, original.getURI());