Fixes #11563 - HttpClient InputStream.read() hangs intermittently.

Clarified documentation about HttpClient request and response listener: they cannot block.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2024-03-25 10:55:29 +01:00
parent bb30fdfeca
commit 29bed1435f
4 changed files with 25 additions and 108 deletions

View File

@ -277,6 +277,9 @@ include::{doc_code}/org/eclipse/jetty/docs/programming/ContentDocs.java[tags=idi
Note how the reads happen in a loop, consuming the `Content.Source` as soon as it has content available to be read, and therefore no backpressure is applied to the reads.
IMPORTANT: Calling `Content.Chunk.release()` must be done only after the bytes in the `ByteBuffer` returned by `Content.Chunk.getByteBuffer()` have been consumed.
When the `Content.Chunk` is released, the implementation may reuse the `ByteBuffer` and overwrite the bytes with different bytes; if the application looks at the `ByteBuffer` _after_ having released the `Content.Chunk` is may see other, unrelated, bytes.
An alternative way to read from a `Content.Source`, to use when the chunk is consumed asynchronously, and you don't want to read again until the `Content.Chunk` is consumed, is the following:
[source,java,indent=0]

View File

@ -91,7 +91,8 @@ These listeners are implemented by applications and may perform any kind of logi
The implementation invokes these listeners in the same thread that is used to process the request or response.
Therefore, if the application code in these listeners takes a long time to execute, the request or response processing is delayed until the listener returns.
If you need to execute application code that takes long time inside a listener, you must spawn your own thread.
If you need to execute application code that takes long time inside a listener, it is typically better to spawn your own thread to execute the code that takes long time.
In this way you return from the listener as soon as possible and allow the implementation to resume the processing of the request or response (or of other requests/responses).
Request and response processing are executed by two different threads and therefore may happen concurrently.
A typical example of this concurrent processing is an echo server, where a large upload may be concurrent with the large download echoed back.
@ -136,6 +137,19 @@ include::../../{doc_code}/org/eclipse/jetty/docs/programming/client/http/HTTPCli
This makes Jetty HTTP client suitable for HTTP load testing because, for example, you can accurately time every step of the request/response conversation (thus knowing where the request/response time is really spent).
[IMPORTANT]
====
The code in request and response listeners __should not__ block.
It is allowed to call other blocking APIs, such as the Java file-system APIs.
You should not call blocking APIs that:
* Wait for other request or response events, such as receiving other request or response content chunks.
* Use wait/notify primitives such as those available in `java.lang.Object` or `java.util.concurrent.locks.Condition`.
If the listener code blocks, the implementation also will be blocked and will not be able to advance the processing of the request or response that the listener code is likely waiting for, causing a deadlock.
====
Have a look at the link:{javadoc-url}/org/eclipse/jetty/client/api/Request.Listener.html[`Request.Listener`] class to know about request events, and to the link:{javadoc-url}/org/eclipse/jetty/client/api/Response.Listener.html[`Response.Listener`] class to know about response events.
[[pg-client-http-content-request]]
@ -224,28 +238,7 @@ The listener that follows this model is `Response.ContentSourceListener`.
After the response headers have been processed by the `HttpClient` implementation, `Response.ContentSourceListener.onContentSource(response, contentSource)` is invoked once and only once.
This allows the application to control precisely the read/demand loop: when to read a chunk, how to process it and when to demand the next one.
// TODO: move this section to the IO arch docs.
You must provide a `ContentSourceListener` whose implementation reads a `Content.Chunk` from the provided `Content.Source`, as follows:
Then the following cases may happen:
* The read chunk ; you should process it by consuming its bytes and then releasing the chunk, then either trying to read another chunk, or demanding for another chunk (unless the current chunk is the last).
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/client/http/HTTPClientDocs.java[tags=contentSourceListener]
----
<1> Using a `Runnable` anonymous class instead of a lambda in this example just for simplicity, and to be able to call `contentSource.demand(this)` passing `this` as parameter.
<2> Read in a loop
<3> Read a `Content.Chunk` from the `Content.Source`.
<4> The read chunk is `null`, call `Content.Source.demand(Runnable)` to be called back when more chunks are available.
<5> The read reported a failure, which may be transient or terminal; the application must handle them appropriately.
<6> The read chunk is a normal chunk of content; the application should consume its `ByteBuffer` bytes.
<7> The chunk must be released.
IMPORTANT: Calling `Content.Chunk.release()` must be done only after the bytes in the `ByteBuffer` returned by `Content.Chunk.getByteBuffer()` have been consumed.
When the `Content.Chunk` is released, the `HttpClient` implementation may reuse the `ByteBuffer` and overwrite the bytes with different bytes; if the application looks at the `ByteBuffer` _after_ having released the `Content.Chunk` is may see other, unrelated, bytes.
You must provide a `ContentSourceListener` whose implementation reads a `Content.Chunk` from the provided `Content.Source`, as explained in xref:pg-arch-io-content-source[this section].
The invocation of `onContentSource(Request, Content.Source)` and of the demand callback passed to `contentSource.demand(Runnable)` are serialized with respect to asynchronous events such as timeouts or an asynchronous call to `Request.abort(Throwable)`.
This means that these asynchronous events are not processed until the invocation of `onContentSource(Request, Content.Source)` returns, or until the invocation of the demand callback returns.
@ -256,7 +249,7 @@ Demanding for content and consuming the content are orthogonal activities.
An application can read, store aside the `Content.Chunk` objects without releasing them (to consume them later), and demand for more chunks, but it must call `Chunk.retain()` on the stored chunks, and arrange to release them after they have been consumed later.
If not done carefully, this may lead to excessive memory consumption, since the `ByteBuffer` bytes are not consumed.
Releasing the `Content.Chunk`s will result in the ``ByteBuffer``s to be disposed/recycled and may be performed at any time.
Releasing the ``Content.Chunk``s will result in the ``ByteBuffer``s to be disposed/recycled and may be performed at any time.
An application can also read one chunk of content, consume it, release it, and then _not_ demand for more content until a later time.

View File

@ -60,12 +60,17 @@ public class ContentDocs
boolean fatal = chunk.isLast();
if (fatal)
{
// A fatal failure, such as a network failure.
handleFatalFailure(chunk.getFailure());
// No recovery is possible, stop reading
// by returning without demanding.
return;
}
else
{
// A transient failure such as a read timeout.
handleTransientFailure(chunk.getFailure());
// Recovery is possible, try to read again.
continue;
}
}

View File

@ -491,90 +491,6 @@ public class HTTPClientDocs
// end::inputStreamResponseListener[]
}
public void contentSourceListener() throws Exception
{
HttpClient httpClient = new HttpClient();
httpClient.start();
// tag::contentSourceListener[]
httpClient.newRequest("http://domain.com/path")
.onResponseContentSource(((response, contentSource) ->
{
// The function (as a Runnable) that reads the response content.
Runnable demander = new Runnable() // <1>
{
@Override
public void run()
{
while (true) // <2>
{
Content.Chunk chunk = contentSource.read(); // <3>
// No chunk of content, demand again and return.
if (chunk == null)
{
contentSource.demand(this); // <4>
return;
}
// A failure happened.
if (Content.Chunk.isFailure(chunk)) // <5>
{
Throwable failure = chunk.getFailure();
if (chunk.isLast())
{
// A terminal failure, such as a network failure.
// Your logic to handle terminal failures here.
System.getLogger("failure").log(ERROR, "Unexpected terminal failure", failure);
return;
}
else
{
// A transient failure such as a read timeout.
// Your logic to handle transient failures here.
if (ignoreTransientFailure(response, failure))
{
// Try to read again.
continue;
}
else
{
// The transient failure is treated as a terminal failure.
System.getLogger("failure").log(ERROR, "Unexpected transient failure", failure);
return;
}
}
}
// A normal chunk of content.
consumeResponseContentChunk(response, chunk); // <6>
// Release the chunk.
chunk.release(); // <7>
// Loop around to read another response chunk.
}
}
};
// Initiate the reads.
demander.run();
}))
.send(result ->
{
});
// end::contentSourceListener[]
}
private boolean ignoreTransientFailure(Response response, Throwable failure)
{
return false;
}
private void consumeResponseContentChunk(Response response, Content.Chunk chunk)
{
}
public void forwardContent() throws Exception
{
HttpClient httpClient = new HttpClient();