diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 0920daaa76c..623589d07df 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -119,7 +119,7 @@ public abstract class HttpReceiver } } - private long demand() + protected long demand() { return demand(LongUnaryOperator.identity()); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java index 27eecb7af55..346252343e9 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java @@ -552,6 +552,12 @@ public class HttpRequest implements Request { this.responseListeners.add(new Response.DemandedContentListener() { + @Override + public void onBeforeContent(Response response, LongConsumer demand) + { + listener.onBeforeContent(response, demand); + } + @Override public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) { diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java index 145edd67ed4..bb887cda691 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java @@ -416,13 +416,13 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne } @Override - public void onHeaders(int request) + public boolean onHeaders(int request) { HttpChannelOverFCGI channel = activeChannels.get(request); if (channel != null) - channel.responseHeaders(); - else - noChannel(request); + return !channel.responseHeaders(); + noChannel(request); + return false; } @Override diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java index 94dc7e43e19..33754cda695 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java @@ -80,9 +80,9 @@ public class ClientParser extends Parser } @Override - public void onHeaders(int request) + public boolean onHeaders(int request) { - listener.onHeaders(request); + return listener.onHeaders(request); } @Override diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java index 8eee11f53b1..23ed83171e3 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java @@ -135,7 +135,11 @@ public abstract class Parser { public void onHeader(int request, HttpField field); - public void onHeaders(int request); + /** + * @param request the request id + * @return true to signal to the parser to stop parsing, false to continue parsing + */ + public boolean onHeaders(int request); /** * @param request the request id @@ -158,8 +162,9 @@ public abstract class Parser } @Override - public void onHeaders(int request) + public boolean onHeaders(int request) { + return false; } @Override diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java index 036947cec7f..7dd1211fc4d 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java @@ -82,7 +82,7 @@ public class ResponseContentParser extends StreamContentParser parsers.remove(request); } - private class ResponseParser implements HttpParser.ResponseHandler + private static class ResponseParser implements HttpParser.ResponseHandler { private final HttpFields fields = new HttpFields(); private ClientParser.Listener listener; @@ -90,6 +90,7 @@ public class ResponseContentParser extends StreamContentParser private final FCGIHttpParser httpParser; private State state = State.HEADERS; private boolean seenResponseCode; + private boolean stalled; private ResponseParser(ClientParser.Listener listener, int request) { @@ -111,7 +112,11 @@ public class ResponseContentParser extends StreamContentParser case HEADERS: { if (httpParser.parseNext(buffer)) + { state = State.CONTENT_MODE; + if (stalled) + return true; + } remaining = buffer.remaining(); break; } @@ -233,16 +238,17 @@ public class ResponseContentParser extends StreamContentParser } } - private void notifyHeaders() + private boolean notifyHeaders() { try { - listener.onHeaders(request); + return listener.onHeaders(request); } catch (Throwable x) { if (LOG.isDebugEnabled()) LOG.debug("Exception while invoking listener " + listener, x); + return false; } } @@ -255,8 +261,10 @@ public class ResponseContentParser extends StreamContentParser notifyBegin(200, "OK"); notifyHeaders(fields); } - notifyHeaders(); - // Return from HTTP parsing so that we can parse the content. + // Remember whether we have demand. + stalled = notifyHeaders(); + // Always return from HTTP parsing so that we + // can parse the content with the FCGI parser. return true; } diff --git a/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/generator/ClientGeneratorTest.java b/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/generator/ClientGeneratorTest.java index 67e6fe9761b..10050c37f7d 100644 --- a/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/generator/ClientGeneratorTest.java +++ b/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/generator/ClientGeneratorTest.java @@ -109,10 +109,11 @@ public class ClientGeneratorTest } @Override - public void onHeaders(int request) + public boolean onHeaders(int request) { assertEquals(id, request); params.set(params.get() * primes[4]); + return false; } }); diff --git a/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java b/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java index 7e9e9dbb2b6..f61a46fee1f 100644 --- a/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java +++ b/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java @@ -91,10 +91,11 @@ public class ClientParserTest } @Override - public void onHeaders(int request) + public boolean onHeaders(int request) { assertEquals(id, request); params.set(params.get() * primes[2]); + return false; } }); diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java index dc9547d7119..73b45bda8c8 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java @@ -144,7 +144,7 @@ public class ServerFCGIConnection extends AbstractConnection } @Override - public void onHeaders(int request) + public boolean onHeaders(int request) { HttpChannelOverFCGI channel = channels.get(request); if (LOG.isDebugEnabled()) @@ -154,6 +154,7 @@ public class ServerFCGIConnection extends AbstractConnection channel.onRequest(); channel.dispatch(); } + return false; } @Override diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index aadd4aa67a9..23e5a75ccdc 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -120,12 +120,24 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. if (frame.isEndStream() || informational) responseSuccess(exchange); } + else + { + if (frame.isEndStream()) + { + // There is no demand to trigger response success, so add + // a poison pill to trigger it when there will be demand. + notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); + } + } } } else // Response trailers. { HttpFields trailers = metaData.getFields(); trailers.forEach(httpResponse::trailer); + // Previous DataFrames had endStream=false, so + // add a poison pill to trigger response success + // after all normal DataFrames have been consumed. notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); } } @@ -212,7 +224,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. contentNotifier.offer(exchange, frame, callback); } - private static class ContentNotifier + private class ContentNotifier { private final Queue queue = new ArrayDeque<>(); private final HttpReceiverOverHTTP2 receiver; @@ -246,9 +258,25 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. private void process(boolean resume) { // Allow only one thread at a time. - if (active(resume)) + boolean busy = active(resume); + if (LOG.isDebugEnabled()) + LOG.debug("Resuming({}) processing({}) of content", resume, !busy); + if (busy) return; + // Process only if there is demand. + synchronized (this) + { + if (!resume && demand() <= 0) + { + if (LOG.isDebugEnabled()) + LOG.debug("Stalling processing, content available but no demand"); + active = false; + stalled = true; + return; + } + } + while (true) { if (dataInfo != null) @@ -265,7 +293,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. { dataInfo = queue.poll(); if (LOG.isDebugEnabled()) - LOG.debug("Dequeued content {}", dataInfo); + LOG.debug("Processing content {}", dataInfo); if (dataInfo == null) { active = false; @@ -281,8 +309,12 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. boolean proceed = receiver.responseContent(dataInfo.exchange, buffer, Callback.from(callback::succeeded, x -> fail(callback, x))); if (!proceed) { - // Should stall, unless just resumed. - if (stall()) + // The call to responseContent() said we should + // stall, but another thread may have just resumed. + boolean stall = stall(); + if (LOG.isDebugEnabled()) + LOG.debug("Stalling({}) processing", stall); + if (stall) return; } } @@ -299,27 +331,46 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. { if (active) { + // There is a thread in process(), + // but it may be about to exit, so + // remember "resume" to signal the + // processing thread to continue. if (resume) this.resume = true; return true; } + + // If there is no demand (i.e. stalled + // and not resuming) then don't process. if (stalled && !resume) return true; + + // Start processing. active = true; stalled = false; return false; } } + /** + * Called when there is no demand, this method checks whether + * the processing should really stop or it should continue. + * + * @return true to stop processing, false to continue processing + */ private boolean stall() { synchronized (this) { if (resume) { + // There was no demand, but another thread + // just demanded, continue processing. resume = false; return false; } + + // There is no demand, stop processing. active = false; stalled = true; return true; @@ -344,7 +395,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. receiver.responseFailure(failure); } - private static class DataInfo + private class DataInfo { private final HttpExchange exchange; private final DataFrame frame; diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientDemandTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientDemandTest.java index 879016d03a6..884853e21d9 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientDemandTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientDemandTest.java @@ -414,4 +414,119 @@ public class HttpClientDemandTest extends AbstractTest assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); assertArrayEquals(content, bytes); } + + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testDelayedBeforeContentDemand(Transport transport) throws Exception + { + init(transport); + + byte[] content = new byte[1024]; + new Random().nextBytes(content); + scenario.start(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + response.setContentLength(content.length); + response.getOutputStream().write(content); + } + }); + + byte[] bytes = new byte[content.length]; + ByteBuffer received = ByteBuffer.wrap(bytes); + AtomicReference beforeContentDemandRef = new AtomicReference<>(); + CountDownLatch beforeContentLatch = new CountDownLatch(1); + CountDownLatch contentLatch = new CountDownLatch(1); + CountDownLatch resultLatch = new CountDownLatch(1); + scenario.client.newRequest(scenario.newURI()) + .onResponseContentDemanded(new Response.DemandedContentListener() + { + @Override + public void onBeforeContent(Response response, LongConsumer demand) + { + // Do not demand now. + beforeContentDemandRef.set(demand); + beforeContentLatch.countDown(); + } + + @Override + public void onContent(Response response, LongConsumer demand, ByteBuffer buffer, Callback callback) + { + contentLatch.countDown(); + received.put(buffer); + callback.succeeded(); + demand.accept(1); + } + }) + .send(result -> + { + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + resultLatch.countDown(); + }); + + assertTrue(beforeContentLatch.await(5, TimeUnit.SECONDS)); + LongConsumer demand = beforeContentDemandRef.get(); + + // Content must not be notified until we demand. + assertFalse(contentLatch.await(1, TimeUnit.SECONDS)); + + demand.accept(1); + + assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + assertArrayEquals(content, bytes); + } + + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testDelayedBeforeContentDemandWithNoResponseContent(Transport transport) throws Exception + { + init(transport); + + scenario.start(new EmptyServerHandler()); + + AtomicReference beforeContentDemandRef = new AtomicReference<>(); + CountDownLatch beforeContentLatch = new CountDownLatch(1); + CountDownLatch contentLatch = new CountDownLatch(1); + CountDownLatch resultLatch = new CountDownLatch(1); + scenario.client.newRequest(scenario.newURI()) + .onResponseContentDemanded(new Response.DemandedContentListener() + { + @Override + public void onBeforeContent(Response response, LongConsumer demand) + { + // Do not demand now. + beforeContentDemandRef.set(demand); + beforeContentLatch.countDown(); + } + + @Override + public void onContent(Response response, LongConsumer demand, ByteBuffer buffer, Callback callback) + { + contentLatch.countDown(); + callback.succeeded(); + demand.accept(1); + } + }) + .send(result -> + { + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + resultLatch.countDown(); + }); + + assertTrue(beforeContentLatch.await(5, TimeUnit.SECONDS)); + LongConsumer demand = beforeContentDemandRef.get(); + + // Content must not be notified until we demand. + assertFalse(contentLatch.await(1, TimeUnit.SECONDS)); + + demand.accept(1); + + // Content must not be notified as there is no content. + assertFalse(contentLatch.await(1, TimeUnit.SECONDS)); + + assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } }