diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index 30ccfc75de9..ca85a6e1d15 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -20,7 +20,9 @@ package org.eclipse.jetty.http2.client.http; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.Locale; +import java.util.Queue; import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpExchange; @@ -38,10 +40,12 @@ import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.CompletableCallback; +import org.eclipse.jetty.util.IteratingCallback; public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listener { + private final ContentNotifier contentNotifier = new ContentNotifier(); + public HttpReceiverOverHTTP2(HttpChannel channel) { super(channel); @@ -111,40 +115,8 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen copy.put(original); BufferUtil.flipToFlush(copy, 0); - CompletableCallback delegate = new CompletableCallback() - { - @Override - public void succeeded() - { - byteBufferPool.release(copy); - callback.succeeded(); - super.succeeded(); - } - - @Override - public void failed(Throwable x) - { - byteBufferPool.release(copy); - callback.failed(x); - super.failed(x); - } - - @Override - public void resume() - { - if (frame.isEndStream()) - responseSuccess(exchange); - } - - @Override - public void abort(Throwable failure) - { - } - }; - - responseContent(exchange, copy, delegate); - if (!delegate.tryComplete()) - delegate.resume(); + contentNotifier.offer(new DataInfo(exchange, copy, callback, frame.isEndStream())); + contentNotifier.iterate(); } @Override @@ -164,4 +136,70 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen { responseFailure(failure); } + + private class ContentNotifier extends IteratingCallback + { + private final Queue queue = new ArrayDeque<>(); + private DataInfo dataInfo; + + private boolean offer(DataInfo dataInfo) + { + synchronized (this) + { + return queue.offer(dataInfo); + } + } + + @Override + protected Action process() throws Exception + { + DataInfo dataInfo; + synchronized (this) + { + dataInfo = queue.poll(); + } + if (dataInfo == null) + return Action.IDLE; + + this.dataInfo = dataInfo; + responseContent(dataInfo.exchange, dataInfo.buffer, this); + return Action.SCHEDULED; + } + + @Override + public void succeeded() + { + ByteBufferPool byteBufferPool = getHttpDestination().getHttpClient().getByteBufferPool(); + byteBufferPool.release(dataInfo.buffer); + dataInfo.callback.succeeded(); + if (dataInfo.last) + responseSuccess(dataInfo.exchange); + super.succeeded(); + } + + @Override + protected void onCompleteFailure(Throwable failure) + { + ByteBufferPool byteBufferPool = getHttpDestination().getHttpClient().getByteBufferPool(); + byteBufferPool.release(dataInfo.buffer); + dataInfo.callback.failed(failure); + responseFailure(failure); + } + } + + private static class DataInfo + { + private final HttpExchange exchange; + private final ByteBuffer buffer; + private final Callback callback; + private final boolean last; + + private DataInfo(HttpExchange exchange, ByteBuffer buffer, Callback callback, boolean last) + { + this.exchange = exchange; + this.buffer = buffer; + this.callback = callback; + this.last = last; + } + } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java index dcfdde77b54..5567ed81f7d 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java @@ -26,6 +26,8 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import javax.servlet.ServletException; @@ -44,6 +46,7 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; @@ -442,6 +445,49 @@ public class HttpClientTest extends AbstractTest Assert.assertTrue(closeLatch.await(1, TimeUnit.SECONDS)); } + @Test + public void testAsyncResponseContentBackPressure() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + // Large write to generate multiple DATA frames. + response.getOutputStream().write(new byte[256 * 1024]); + } + }); + + CountDownLatch completeLatch = new CountDownLatch(1); + AtomicInteger counter = new AtomicInteger(); + AtomicReference callbackRef = new AtomicReference<>(); + AtomicReference latchRef = new AtomicReference<>(new CountDownLatch(1)); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(getScheme()) + .onResponseContentAsync((response, content, callback) -> + { + if (counter.incrementAndGet() == 1) + { + callbackRef.set(callback); + latchRef.get().countDown(); + } + else + { + callback.succeeded(); + } + }) + .send(result -> completeLatch.countDown()); + + Assert.assertTrue(latchRef.get().await(5, TimeUnit.SECONDS)); + // Wait some time to verify that back pressure is applied correctly. + Thread.sleep(1000); + Assert.assertEquals(1, counter.get()); + callbackRef.get().succeeded(); + + Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); + } + private void sleep(long time) throws IOException { try