diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
index a48ca2aaee1..526a0c487e9 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
@@ -285,6 +285,15 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
return committed;
}
+ @Override
+ public int dataSize()
+ {
+ try (AutoLock l = lock.lock())
+ {
+ return dataQueue == null ? 0 : dataQueue.size();
+ }
+ }
+
public boolean isOpen()
{
return !isClosed();
@@ -921,13 +930,14 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
@Override
public String toString()
{
- return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}",
+ return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}",
getClass().getSimpleName(),
hashCode(),
getId(),
session.hashCode(),
sendWindow,
recvWindow,
+ dataSize(),
demand(),
localReset,
remoteReset,
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
index 96327c53a99..50dfded7fdc 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
@@ -142,6 +142,11 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isCommitted();
+ /**
+ * @return the size of the DATA frame queue
+ */
+ int dataSize();
+
/**
*
An ordered list of frames belonging to the same stream.
*/
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java
index 38ca30b601d..6ab789bc045 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java
@@ -186,7 +186,14 @@ public class HttpChannelOverHTTP2 extends HttpChannel
}
@Override
- public void onData(Stream stream, DataFrame frame, Callback callback)
+ public void onBeforeData(Stream stream)
+ {
+ // Don't demand here, as the initial demand is controlled by
+ // the application via DemandedContentListener.onBeforeContent().
+ }
+
+ @Override
+ public void onDataDemanded(Stream stream, DataFrame frame, Callback callback)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((IStream)stream).getAttachment();
channel.onData(frame, callback);
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java
index 24818c28d79..cde5c0e8ba9 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java
@@ -14,10 +14,7 @@
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
import java.util.List;
-import java.util.Queue;
import java.util.function.BiFunction;
import org.eclipse.jetty.client.HttpChannel;
@@ -42,9 +39,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +63,11 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
@Override
protected void receive()
{
- contentNotifier.process(true);
+ HttpExchange exchange = getHttpExchange();
+ if (exchange == null)
+ return;
+
+ contentNotifier.receive(getHttpChannel().getStream(), exchange);
}
@Override
@@ -117,20 +116,14 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
upgrade(upgrader, httpResponse, endPoint);
}
+ contentNotifier.notifySuccess = frame.isEndStream();
if (responseHeaders(exchange))
{
int status = response.getStatus();
if (frame.isEndStream() || HttpStatus.isInterim(status))
responseSuccess(exchange);
- }
- else
- {
- if (frame.isEndStream())
- {
- // There is no demand to trigger response success, so add
- // a poison pill to trigger it when there will be demand.
- notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
- }
+ else
+ stream.demand(1);
}
}
}
@@ -138,10 +131,11 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
{
HttpFields trailers = metaData.getFields();
trailers.forEach(httpResponse::trailer);
- // Previous DataFrames had endStream=false, so
- // add a poison pill to trigger response success
- // after all normal DataFrames have been consumed.
- notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
+
+ if (((IStream)stream).dataSize() == 0)
+ responseSuccess(exchange);
+ else
+ contentNotifier.notifySuccess = true;
}
}
@@ -194,13 +188,9 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
- {
callback.failed(new IOException("terminated"));
- }
else
- {
notifyContent(exchange, frame, callback);
- }
}
void onReset(Stream stream, ResetFrame frame)
@@ -230,172 +220,62 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
{
- contentNotifier.offer(exchange, frame, callback);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Received content {}", frame);
+ contentNotifier.process(getHttpChannel().getStream(), exchange, frame, callback);
}
- private class ContentNotifier
+ private static class ContentNotifier
{
- private final AutoLock lock = new AutoLock();
- private final Queue queue = new ArrayDeque<>();
private final HttpReceiverOverHTTP2 receiver;
- private DataInfo dataInfo;
- private boolean active;
- private boolean resume;
- private boolean stalled;
+ private volatile boolean notifySuccess;
private ContentNotifier(HttpReceiverOverHTTP2 receiver)
{
this.receiver = receiver;
}
- private void offer(HttpExchange exchange, DataFrame frame, Callback callback)
+ public void receive(Stream stream, HttpExchange exchange)
{
- DataInfo dataInfo = new DataInfo(exchange, frame, callback);
- if (LOG.isDebugEnabled())
- LOG.debug("Queueing content {}", dataInfo);
- enqueue(dataInfo);
- process(false);
+ if (notifySuccess && ((IStream)stream).dataSize() == 0)
+ receiver.responseSuccess(exchange);
+ else
+ stream.demand(1);
}
- private void enqueue(DataInfo dataInfo)
+ private void process(Stream stream, HttpExchange exchange, DataFrame dataFrame, Callback callback)
{
- try (AutoLock l = lock.lock())
+ if (dataFrame.getData().hasRemaining())
{
- queue.offer(dataInfo);
- }
- }
-
- private void process(boolean resume)
- {
- // Allow only one thread at a time.
- boolean busy = active(resume);
- if (LOG.isDebugEnabled())
- LOG.debug("Resuming({}) processing({}) of content", resume, !busy);
- if (busy)
- return;
-
- // Process only if there is demand.
- try (AutoLock l = lock.lock())
- {
- if (!resume && demand() <= 0)
+ if (dataFrame.isEndStream())
+ notifySuccess = true;
+ boolean proceed = receiver.responseContent(exchange, dataFrame.getData(), Callback.from(callback::succeeded, x -> fail(callback, x)));
+ if (proceed)
{
- if (LOG.isDebugEnabled())
- LOG.debug("Stalling processing, content available but no demand");
- active = false;
- stalled = true;
- return;
- }
- }
-
- while (true)
- {
- if (dataInfo != null)
- {
- if (dataInfo.frame.isEndStream())
- {
- receiver.responseSuccess(dataInfo.exchange);
- // Return even if active, as reset() will be called later.
- return;
- }
- }
-
- try (AutoLock l = lock.lock())
- {
- dataInfo = queue.poll();
- if (LOG.isDebugEnabled())
- LOG.debug("Processing content {}", dataInfo);
- if (dataInfo == null)
- {
- active = false;
- return;
- }
- }
-
- ByteBuffer buffer = dataInfo.frame.getData();
- Callback callback = dataInfo.callback;
- if (buffer.hasRemaining())
- {
- boolean proceed = receiver.responseContent(dataInfo.exchange, buffer, Callback.from(callback::succeeded, x -> fail(callback, x)));
- if (!proceed)
- {
- // The call to responseContent() said we should
- // stall, but another thread may have just resumed.
- boolean stall = stall();
- if (LOG.isDebugEnabled())
- LOG.debug("Stalling({}) processing", stall);
- if (stall)
- return;
- }
+ if (dataFrame.isEndStream())
+ receiver.responseSuccess(exchange);
+ else
+ stream.demand(1);
}
else
{
- callback.succeeded();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Stalling processing, no demand after {} on {}", dataFrame, this);
}
}
- }
-
- private boolean active(boolean resume)
- {
- try (AutoLock l = lock.lock())
+ else
{
- if (active)
- {
- // There is a thread in process(),
- // but it may be about to exit, so
- // remember "resume" to signal the
- // processing thread to continue.
- if (resume)
- this.resume = true;
- return true;
- }
-
- // If there is no demand (i.e. stalled
- // and not resuming) then don't process.
- if (stalled && !resume)
- return true;
-
- // Start processing.
- active = true;
- stalled = false;
- return false;
- }
- }
-
- /**
- * Called when there is no demand, this method checks whether
- * the processing should really stop or it should continue.
- *
- * @return true to stop processing, false to continue processing
- */
- private boolean stall()
- {
- try (AutoLock l = lock.lock())
- {
- if (resume)
- {
- // There was no demand, but another thread
- // just demanded, continue processing.
- resume = false;
- return false;
- }
-
- // There is no demand, stop processing.
- active = false;
- stalled = true;
- return true;
+ callback.succeeded();
+ if (dataFrame.isEndStream())
+ receiver.responseSuccess(exchange);
+ else
+ stream.demand(1);
}
}
private void reset()
{
- dataInfo = null;
- try (AutoLock l = lock.lock())
- {
- queue.clear();
- active = false;
- resume = false;
- stalled = false;
- }
+ notifySuccess = false;
}
private void fail(Callback callback, Throwable failure)
@@ -403,25 +283,5 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
callback.failed(failure);
receiver.responseFailure(failure);
}
-
- private class DataInfo
- {
- private final HttpExchange exchange;
- private final DataFrame frame;
- private final Callback callback;
-
- private DataInfo(HttpExchange exchange, DataFrame frame, Callback callback)
- {
- this.exchange = exchange;
- this.frame = frame;
- this.callback = callback;
- }
-
- @Override
- public String toString()
- {
- return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), frame);
- }
- }
}
}
diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java
index 5b9b0cb07e5..e8b6d01c9fe 100644
--- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java
+++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
+import java.util.stream.IntStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -40,6 +41,8 @@ import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
@@ -610,6 +613,43 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
+ @Test
+ public void testInputStreamResponseListener() throws Exception
+ {
+ var bytes = 100_000;
+ start(new ServerSessionListener.Adapter()
+ {
+ @Override
+ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
+ {
+ int streamId = stream.getId();
+ MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
+ HeadersFrame responseFrame = new HeadersFrame(streamId, response, null, false);
+ Callback.Completable callback = new Callback.Completable();
+ stream.headers(responseFrame, callback);
+ callback.thenRun(() -> stream.data(new DataFrame(streamId, ByteBuffer.wrap(new byte[bytes]), true), Callback.NOOP));
+ return null;
+ }
+ });
+
+ var requestCount = 10_000;
+ IntStream.range(0, requestCount).forEach(i ->
+ {
+ try
+ {
+ InputStreamResponseListener listener = new InputStreamResponseListener();
+ client.newRequest("localhost", connector.getLocalPort()).headers(httpFields -> httpFields.put("X-Request-Id", Integer.toString(i))).send(listener);
+ Response response = listener.get(15, TimeUnit.SECONDS);
+ assertEquals(HttpStatus.OK_200, response.getStatus());
+ assertEquals(bytes, listener.getInputStream().readAllBytes().length);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
@Disabled
@Test
@Tag("external")
diff --git a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpReceiverOverHTTP3.java b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpReceiverOverHTTP3.java
index d0957ad2149..e5cc0ac8b4a 100644
--- a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpReceiverOverHTTP3.java
+++ b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpReceiverOverHTTP3.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client.Listener
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP3.class);
- private boolean notifySuccess;
+ private volatile boolean notifySuccess;
protected HttpReceiverOverHTTP3(HttpChannelOverHTTP3 channel)
{
@@ -86,6 +86,7 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
// TODO: add support for HttpMethod.CONNECT.
+ notifySuccess = frame.isLast();
if (responseHeaders(exchange))
{
int status = response.getStatus();
@@ -98,7 +99,6 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
{
if (LOG.isDebugEnabled())
LOG.debug("stalling response processing, no demand after headers on {}", this);
- notifySuccess = frame.isLast();
}
}
}
@@ -118,12 +118,16 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
ByteBuffer byteBuffer = data.getByteBuffer();
if (byteBuffer.hasRemaining())
{
+ if (data.isLast())
+ notifySuccess = true;
+
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, data::complete, x ->
{
data.complete();
if (responseFailure(x))
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
});
+
boolean proceed = responseContent(exchange, byteBuffer, callback);
if (proceed)
{
@@ -136,7 +140,6 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
{
if (LOG.isDebugEnabled())
LOG.debug("stalling response processing, no demand after {} on {}", data, this);
- notifySuccess = data.isLast();
}
}
else