Fixes #4374 - Jetty client: Response.AsyncContentListener.onContent is not called.

Now the various content listeners inherit from each other, like
it should have been from the beginning.
This also allowed to remove code duplication due to the default
implementation of the methods in various places.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-11-29 12:05:30 +01:00
parent d7cf3729a5
commit 902603fc9e
3 changed files with 86 additions and 52 deletions

View File

@ -502,21 +502,12 @@ public class HttpRequest implements Request
@Override
public Request onResponseContent(final Response.ContentListener listener)
{
this.responseListeners.add(new Response.DemandedContentListener()
this.responseListeners.add(new Response.ContentListener()
{
@Override
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
{
try
public void onContent(Response response, ByteBuffer content)
{
listener.onContent(response, content);
callback.succeeded();
demand.accept(1);
}
catch (Throwable x)
{
callback.failed(x);
}
}
});
return this;
@ -525,16 +516,12 @@ public class HttpRequest implements Request
@Override
public Request onResponseContentAsync(final Response.AsyncContentListener listener)
{
this.responseListeners.add(new Response.DemandedContentListener()
this.responseListeners.add(new Response.AsyncContentListener()
{
@Override
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
public void onContent(Response response, ByteBuffer content, Callback callback)
{
listener.onContent(response, content, Callback.from(() ->
{
callback.succeeded();
demand.accept(1);
}, callback::failed));
listener.onContent(response, content, callback);
}
});
return this;

View File

@ -138,7 +138,7 @@ public interface Response
*
* @see AsyncContentListener
*/
interface ContentListener extends ResponseListener
interface ContentListener extends AsyncContentListener
{
/**
* Callback method invoked when the response content has been received, parsed and there is demand.
@ -149,6 +149,20 @@ public interface Response
* @param content the content bytes received
*/
void onContent(Response response, ByteBuffer content);
@Override
default void onContent(Response response, ByteBuffer content, Callback callback)
{
try
{
onContent(response, content);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
}
/**
@ -156,7 +170,7 @@ public interface Response
*
* @see DemandedContentListener
*/
interface AsyncContentListener extends ResponseListener
interface AsyncContentListener extends DemandedContentListener
{
/**
* Callback method invoked when the response content has been received, parsed and there is demand.
@ -168,6 +182,16 @@ public interface Response
* @param callback the callback to call when the content is consumed and to demand more content
*/
void onContent(Response response, ByteBuffer content, Callback callback);
@Override
default void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
{
onContent(response, content, Callback.from(() ->
{
callback.succeeded();
demand.accept(1);
}, callback::failed));
}
}
/**
@ -257,7 +281,7 @@ public interface Response
/**
* Listener for all response events.
*/
interface Listener extends BeginListener, HeaderListener, HeadersListener, ContentListener, AsyncContentListener, DemandedContentListener, SuccessListener, FailureListener, CompleteListener
interface Listener extends BeginListener, HeaderListener, HeadersListener, ContentListener, SuccessListener, FailureListener, CompleteListener
{
@Override
public default void onBegin(Response response)
@ -275,41 +299,11 @@ public interface Response
{
}
@Override
default void onBeforeContent(Response response, LongConsumer demand)
{
demand.accept(1);
}
@Override
public default void onContent(Response response, ByteBuffer content)
{
}
@Override
public default void onContent(Response response, ByteBuffer content, Callback callback)
{
try
{
onContent(response, content);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
@Override
public default void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
{
onContent(response, content, Callback.from(() ->
{
callback.succeeded();
demand.accept(1);
}, callback::failed));
}
@Override
public default void onSuccess(Response response)
{

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -84,6 +85,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.Net;
import org.eclipse.jetty.toolchain.test.jupiter.WorkDir;
import org.eclipse.jetty.toolchain.test.jupiter.WorkDirExtension;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
@ -1788,6 +1790,57 @@ public class HttpClientTest extends AbstractHttpClientServerTest
}
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testContentListenerAsCompleteListener(Scenario scenario) throws Exception
{
byte[] bytes = new byte[1024];
new Random().nextBytes(bytes);
start(scenario, new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletOutputStream output = response.getOutputStream();
output.write(bytes);
}
});
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CountDownLatch latch = new CountDownLatch(1);
class L implements Response.ContentListener, Response.CompleteListener
{
@Override
public void onContent(Response response, ByteBuffer content)
{
try
{
BufferUtil.writeTo(content, baos);
}
catch (IOException x)
{
baos.reset();
x.printStackTrace();
}
}
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
latch.countDown();
}
}
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.send(new L());
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertArrayEquals(bytes, baos.toByteArray());
}
private void assertCopyRequest(Request original)
{
Request copy = client.copyRequest((HttpRequest)original, original.getURI());