Merge pull request #4380 from eclipse/jetty-9.4.x-4374-httpclient_content_listener
Fixes #4374 - Jetty client: Response.AsyncContentListener.onContent i…
This commit is contained in:
commit
fa973a2df8
|
@ -502,21 +502,12 @@ public class HttpRequest implements Request
|
|||
@Override
|
||||
public Request onResponseContent(final Response.ContentListener listener)
|
||||
{
|
||||
this.responseListeners.add(new Response.DemandedContentListener()
|
||||
this.responseListeners.add(new Response.ContentListener()
|
||||
{
|
||||
@Override
|
||||
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
|
||||
public void onContent(Response response, ByteBuffer content)
|
||||
{
|
||||
try
|
||||
{
|
||||
listener.onContent(response, content);
|
||||
callback.succeeded();
|
||||
demand.accept(1);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
callback.failed(x);
|
||||
}
|
||||
listener.onContent(response, content);
|
||||
}
|
||||
});
|
||||
return this;
|
||||
|
@ -525,16 +516,12 @@ public class HttpRequest implements Request
|
|||
@Override
|
||||
public Request onResponseContentAsync(final Response.AsyncContentListener listener)
|
||||
{
|
||||
this.responseListeners.add(new Response.DemandedContentListener()
|
||||
this.responseListeners.add(new Response.AsyncContentListener()
|
||||
{
|
||||
@Override
|
||||
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
|
||||
public void onContent(Response response, ByteBuffer content, Callback callback)
|
||||
{
|
||||
listener.onContent(response, content, Callback.from(() ->
|
||||
{
|
||||
callback.succeeded();
|
||||
demand.accept(1);
|
||||
}, callback::failed));
|
||||
listener.onContent(response, content, callback);
|
||||
}
|
||||
});
|
||||
return this;
|
||||
|
|
|
@ -138,7 +138,7 @@ public interface Response
|
|||
*
|
||||
* @see AsyncContentListener
|
||||
*/
|
||||
interface ContentListener extends ResponseListener
|
||||
interface ContentListener extends AsyncContentListener
|
||||
{
|
||||
/**
|
||||
* Callback method invoked when the response content has been received, parsed and there is demand.
|
||||
|
@ -149,6 +149,20 @@ public interface Response
|
|||
* @param content the content bytes received
|
||||
*/
|
||||
void onContent(Response response, ByteBuffer content);
|
||||
|
||||
@Override
|
||||
default void onContent(Response response, ByteBuffer content, Callback callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
onContent(response, content);
|
||||
callback.succeeded();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
callback.failed(x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -156,7 +170,7 @@ public interface Response
|
|||
*
|
||||
* @see DemandedContentListener
|
||||
*/
|
||||
interface AsyncContentListener extends ResponseListener
|
||||
interface AsyncContentListener extends DemandedContentListener
|
||||
{
|
||||
/**
|
||||
* Callback method invoked when the response content has been received, parsed and there is demand.
|
||||
|
@ -168,6 +182,16 @@ public interface Response
|
|||
* @param callback the callback to call when the content is consumed and to demand more content
|
||||
*/
|
||||
void onContent(Response response, ByteBuffer content, Callback callback);
|
||||
|
||||
@Override
|
||||
default void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
|
||||
{
|
||||
onContent(response, content, Callback.from(() ->
|
||||
{
|
||||
callback.succeeded();
|
||||
demand.accept(1);
|
||||
}, callback::failed));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -257,7 +281,7 @@ public interface Response
|
|||
/**
|
||||
* Listener for all response events.
|
||||
*/
|
||||
interface Listener extends BeginListener, HeaderListener, HeadersListener, ContentListener, AsyncContentListener, DemandedContentListener, SuccessListener, FailureListener, CompleteListener
|
||||
interface Listener extends BeginListener, HeaderListener, HeadersListener, ContentListener, SuccessListener, FailureListener, CompleteListener
|
||||
{
|
||||
@Override
|
||||
public default void onBegin(Response response)
|
||||
|
@ -275,41 +299,11 @@ public interface Response
|
|||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
default void onBeforeContent(Response response, LongConsumer demand)
|
||||
{
|
||||
demand.accept(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public default void onContent(Response response, ByteBuffer content)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public default void onContent(Response response, ByteBuffer content, Callback callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
onContent(response, content);
|
||||
callback.succeeded();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
callback.failed(x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public default void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
|
||||
{
|
||||
onContent(response, content, Callback.from(() ->
|
||||
{
|
||||
callback.succeeded();
|
||||
demand.accept(1);
|
||||
}, callback::failed));
|
||||
}
|
||||
|
||||
@Override
|
||||
public default void onSuccess(Response response)
|
||||
{
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
@ -84,6 +85,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
|
|||
import org.eclipse.jetty.toolchain.test.Net;
|
||||
import org.eclipse.jetty.toolchain.test.jupiter.WorkDir;
|
||||
import org.eclipse.jetty.toolchain.test.jupiter.WorkDirExtension;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
|
@ -1788,6 +1790,57 @@ public class HttpClientTest extends AbstractHttpClientServerTest
|
|||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(ScenarioProvider.class)
|
||||
public void testContentListenerAsCompleteListener(Scenario scenario) throws Exception
|
||||
{
|
||||
byte[] bytes = new byte[1024];
|
||||
new Random().nextBytes(bytes);
|
||||
start(scenario, new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
ServletOutputStream output = response.getOutputStream();
|
||||
output.write(bytes);
|
||||
}
|
||||
});
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
class L implements Response.ContentListener, Response.CompleteListener
|
||||
{
|
||||
@Override
|
||||
public void onContent(Response response, ByteBuffer content)
|
||||
{
|
||||
try
|
||||
{
|
||||
BufferUtil.writeTo(content, baos);
|
||||
}
|
||||
catch (IOException x)
|
||||
{
|
||||
baos.reset();
|
||||
x.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
if (result.isSucceeded())
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scenario.getScheme())
|
||||
.send(new L());
|
||||
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
assertArrayEquals(bytes, baos.toByteArray());
|
||||
}
|
||||
|
||||
private void assertCopyRequest(Request original)
|
||||
{
|
||||
Request copy = client.copyRequest((HttpRequest)original, original.getURI());
|
||||
|
|
Loading…
Reference in New Issue