Fixes #8558 - Idle timeout occurs on HTTP/2 with InputStreamResponseL… (#8585)

* Fixes #8558 - Idle timeout occurs on HTTP/2 with InputStreamResponseListener.

The issue was that HttpReceiverOverHTTP2.ContentNotifier.offer() was racy,
as a network thread could have offered a DATA frame, but not yet called
process() -- yet an application thread could have stolen the DATA frame
completed the response and started another response, causing the network
thread to interact with the wrong response.

The implementation has been changed so that HttpReceiverOverHTTP2.ContentNotifier
does not have a queue anymore and it demands DATA frames to the Stream
only when the application demands more -- a simpler model that just forwards
the demand.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-09-15 10:18:36 +02:00 committed by GitHub
parent 270f491ea8
commit dab4fe60d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 112 additions and 187 deletions

View File

@ -285,6 +285,15 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
return committed;
}
@Override
public int dataSize()
{
try (AutoLock l = lock.lock())
{
return dataQueue == null ? 0 : dataQueue.size();
}
}
public boolean isOpen()
{
return !isClosed();
@ -921,13 +930,14 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
@Override
public String toString()
{
return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}",
return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}",
getClass().getSimpleName(),
hashCode(),
getId(),
session.hashCode(),
sendWindow,
recvWindow,
dataSize(),
demand(),
localReset,
remoteReset,

View File

@ -142,6 +142,11 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isCommitted();
/**
* @return the size of the DATA frame queue
*/
int dataSize();
/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/

View File

@ -186,7 +186,14 @@ public class HttpChannelOverHTTP2 extends HttpChannel
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
public void onBeforeData(Stream stream)
{
// Don't demand here, as the initial demand is controlled by
// the application via DemandedContentListener.onBeforeContent().
}
@Override
public void onDataDemanded(Stream stream, DataFrame frame, Callback callback)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((IStream)stream).getAttachment();
channel.onData(frame, callback);

View File

@ -14,10 +14,7 @@
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import java.util.function.BiFunction;
import org.eclipse.jetty.client.HttpChannel;
@ -42,9 +39,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -68,7 +63,11 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
@Override
protected void receive()
{
contentNotifier.process(true);
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
contentNotifier.receive(getHttpChannel().getStream(), exchange);
}
@Override
@ -117,20 +116,14 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
upgrade(upgrader, httpResponse, endPoint);
}
contentNotifier.notifySuccess = frame.isEndStream();
if (responseHeaders(exchange))
{
int status = response.getStatus();
if (frame.isEndStream() || HttpStatus.isInterim(status))
responseSuccess(exchange);
}
else
{
if (frame.isEndStream())
{
// There is no demand to trigger response success, so add
// a poison pill to trigger it when there will be demand.
notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
}
stream.demand(1);
}
}
}
@ -138,10 +131,11 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
{
HttpFields trailers = metaData.getFields();
trailers.forEach(httpResponse::trailer);
// Previous DataFrames had endStream=false, so
// add a poison pill to trigger response success
// after all normal DataFrames have been consumed.
notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
if (((IStream)stream).dataSize() == 0)
responseSuccess(exchange);
else
contentNotifier.notifySuccess = true;
}
}
@ -194,14 +188,10 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
{
callback.failed(new IOException("terminated"));
}
else
{
notifyContent(exchange, frame, callback);
}
}
void onReset(Stream stream, ResetFrame frame)
{
@ -230,172 +220,62 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
{
contentNotifier.offer(exchange, frame, callback);
if (LOG.isDebugEnabled())
LOG.debug("Received content {}", frame);
contentNotifier.process(getHttpChannel().getStream(), exchange, frame, callback);
}
private class ContentNotifier
private static class ContentNotifier
{
private final AutoLock lock = new AutoLock();
private final Queue<DataInfo> queue = new ArrayDeque<>();
private final HttpReceiverOverHTTP2 receiver;
private DataInfo dataInfo;
private boolean active;
private boolean resume;
private boolean stalled;
private volatile boolean notifySuccess;
private ContentNotifier(HttpReceiverOverHTTP2 receiver)
{
this.receiver = receiver;
}
private void offer(HttpExchange exchange, DataFrame frame, Callback callback)
public void receive(Stream stream, HttpExchange exchange)
{
DataInfo dataInfo = new DataInfo(exchange, frame, callback);
if (LOG.isDebugEnabled())
LOG.debug("Queueing content {}", dataInfo);
enqueue(dataInfo);
process(false);
if (notifySuccess && ((IStream)stream).dataSize() == 0)
receiver.responseSuccess(exchange);
else
stream.demand(1);
}
private void enqueue(DataInfo dataInfo)
private void process(Stream stream, HttpExchange exchange, DataFrame dataFrame, Callback callback)
{
try (AutoLock l = lock.lock())
if (dataFrame.getData().hasRemaining())
{
queue.offer(dataInfo);
if (dataFrame.isEndStream())
notifySuccess = true;
boolean proceed = receiver.responseContent(exchange, dataFrame.getData(), Callback.from(callback::succeeded, x -> fail(callback, x)));
if (proceed)
{
if (dataFrame.isEndStream())
receiver.responseSuccess(exchange);
else
stream.demand(1);
}
}
private void process(boolean resume)
{
// Allow only one thread at a time.
boolean busy = active(resume);
if (LOG.isDebugEnabled())
LOG.debug("Resuming({}) processing({}) of content", resume, !busy);
if (busy)
return;
// Process only if there is demand.
try (AutoLock l = lock.lock())
{
if (!resume && demand() <= 0)
else
{
if (LOG.isDebugEnabled())
LOG.debug("Stalling processing, content available but no demand");
active = false;
stalled = true;
return;
}
}
while (true)
{
if (dataInfo != null)
{
if (dataInfo.frame.isEndStream())
{
receiver.responseSuccess(dataInfo.exchange);
// Return even if active, as reset() will be called later.
return;
}
}
try (AutoLock l = lock.lock())
{
dataInfo = queue.poll();
if (LOG.isDebugEnabled())
LOG.debug("Processing content {}", dataInfo);
if (dataInfo == null)
{
active = false;
return;
}
}
ByteBuffer buffer = dataInfo.frame.getData();
Callback callback = dataInfo.callback;
if (buffer.hasRemaining())
{
boolean proceed = receiver.responseContent(dataInfo.exchange, buffer, Callback.from(callback::succeeded, x -> fail(callback, x)));
if (!proceed)
{
// The call to responseContent() said we should
// stall, but another thread may have just resumed.
boolean stall = stall();
if (LOG.isDebugEnabled())
LOG.debug("Stalling({}) processing", stall);
if (stall)
return;
LOG.debug("Stalling processing, no demand after {} on {}", dataFrame, this);
}
}
else
{
callback.succeeded();
}
}
}
private boolean active(boolean resume)
{
try (AutoLock l = lock.lock())
{
if (active)
{
// There is a thread in process(),
// but it may be about to exit, so
// remember "resume" to signal the
// processing thread to continue.
if (resume)
this.resume = true;
return true;
}
// If there is no demand (i.e. stalled
// and not resuming) then don't process.
if (stalled && !resume)
return true;
// Start processing.
active = true;
stalled = false;
return false;
}
}
/**
* Called when there is no demand, this method checks whether
* the processing should really stop or it should continue.
*
* @return true to stop processing, false to continue processing
*/
private boolean stall()
{
try (AutoLock l = lock.lock())
{
if (resume)
{
// There was no demand, but another thread
// just demanded, continue processing.
resume = false;
return false;
}
// There is no demand, stop processing.
active = false;
stalled = true;
return true;
if (dataFrame.isEndStream())
receiver.responseSuccess(exchange);
else
stream.demand(1);
}
}
private void reset()
{
dataInfo = null;
try (AutoLock l = lock.lock())
{
queue.clear();
active = false;
resume = false;
stalled = false;
}
notifySuccess = false;
}
private void fail(Callback callback, Throwable failure)
@ -403,25 +283,5 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
callback.failed(failure);
receiver.responseFailure(failure);
}
private class DataInfo
{
private final HttpExchange exchange;
private final DataFrame frame;
private final Callback callback;
private DataInfo(HttpExchange exchange, DataFrame frame, Callback callback)
{
this.exchange = exchange;
this.frame = frame;
this.callback = callback;
}
@Override
public String toString()
{
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), frame);
}
}
}
}

View File

@ -32,6 +32,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import java.util.stream.IntStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -40,6 +41,8 @@ import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
@ -610,6 +613,43 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testInputStreamResponseListener() throws Exception
{
var bytes = 100_000;
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
int streamId = stream.getId();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
HeadersFrame responseFrame = new HeadersFrame(streamId, response, null, false);
Callback.Completable callback = new Callback.Completable();
stream.headers(responseFrame, callback);
callback.thenRun(() -> stream.data(new DataFrame(streamId, ByteBuffer.wrap(new byte[bytes]), true), Callback.NOOP));
return null;
}
});
var requestCount = 10_000;
IntStream.range(0, requestCount).forEach(i ->
{
try
{
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort()).headers(httpFields -> httpFields.put("X-Request-Id", Integer.toString(i))).send(listener);
Response response = listener.get(15, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(bytes, listener.getInputStream().readAllBytes().length);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
});
}
@Disabled
@Test
@Tag("external")

View File

@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client.Listener
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP3.class);
private boolean notifySuccess;
private volatile boolean notifySuccess;
protected HttpReceiverOverHTTP3(HttpChannelOverHTTP3 channel)
{
@ -86,6 +86,7 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
// TODO: add support for HttpMethod.CONNECT.
notifySuccess = frame.isLast();
if (responseHeaders(exchange))
{
int status = response.getStatus();
@ -98,7 +99,6 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
{
if (LOG.isDebugEnabled())
LOG.debug("stalling response processing, no demand after headers on {}", this);
notifySuccess = frame.isLast();
}
}
}
@ -118,12 +118,16 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
ByteBuffer byteBuffer = data.getByteBuffer();
if (byteBuffer.hasRemaining())
{
if (data.isLast())
notifySuccess = true;
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, data::complete, x ->
{
data.complete();
if (responseFailure(x))
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
});
boolean proceed = responseContent(exchange, byteBuffer, callback);
if (proceed)
{
@ -136,7 +140,6 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
{
if (LOG.isDebugEnabled())
LOG.debug("stalling response processing, no demand after {} on {}", data, this);
notifySuccess = data.isLast();
}
}
else