From a83c297a11814bc7b4a5623278e422b33143f05d Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 7 Nov 2019 22:32:11 +0100 Subject: [PATCH] Fixes #4277 - Reading streamed gzipped body never terminates. Fixed handling of demand in case of gzipped response content. Signed-off-by: Simone Bordet --- .../eclipse/jetty/client/HttpReceiver.java | 136 ++++++++++-------- .../http/client/HttpClientDemandTest.java | 54 +++++++ 2 files changed, 133 insertions(+), 57 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 9aaf2a89ae0..0920daaa76c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -381,40 +381,67 @@ public abstract class HttpReceiver } } + boolean proceed = true; if (demand() <= 0) { callback.failed(new IllegalStateException("No demand for response content")); - return false; + proceed = false; } HttpResponse response = exchange.getResponse(); - if (LOG.isDebugEnabled()) - LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer)); + if (proceed) + { + if (LOG.isDebugEnabled()) + LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer)); - if (contentListeners.isEmpty()) - { - callback.succeeded(); - } - else - { - Decoder decoder = this.decoder; - if (decoder == null) + ContentListeners listeners = this.contentListeners; + if (listeners != null) { - contentListeners.notifyContent(response, buffer, callback); + if (listeners.isEmpty()) + { + callback.succeeded(); + } + else + { + Decoder decoder = this.decoder; + if (decoder == null) + { + listeners.notifyContent(response, buffer, callback); + } + else + { + try + { + proceed = decoder.decode(buffer, callback); + } + catch (Throwable x) + { + callback.failed(x); + proceed = false; + } + } + } } else { - if (!decoder.decode(buffer, callback)) - return false; + // May happen in case of concurrent abort. + proceed = false; } } if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) { - boolean hasDemand = hasDemandOrStall(); - if (LOG.isDebugEnabled()) - LOG.debug("Response content {}, hasDemand={}", response, hasDemand); - return hasDemand; + if (proceed) + { + boolean hasDemand = hasDemandOrStall(); + if (LOG.isDebugEnabled()) + LOG.debug("Response content {}, hasDemand={}", response, hasDemand); + return hasDemand; + } + else + { + return false; + } } terminateResponse(exchange); @@ -580,6 +607,9 @@ public abstract class HttpReceiver ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); notifier.notifyFailure(listeners, response, failure); + // We want to deliver the "complete" event as last, + // so we emit it here only if no event handlers are + // executing, otherwise they will emit it. if (terminate) { // Mark atomically the response as terminated, with @@ -758,56 +788,48 @@ public abstract class HttpReceiver private boolean decode(ByteBuffer encoded, Callback callback) { - try + this.encoded = encoded; + this.callback = callback; + return decode(); + } + + private boolean decode() + { + while (true) { + ByteBuffer buffer; while (true) { - ByteBuffer buffer; - while (true) + buffer = decoder.decode(encoded); + if (buffer.hasRemaining()) + break; + if (!encoded.hasRemaining()) { - buffer = decoder.decode(encoded); - if (buffer.hasRemaining()) - break; - if (!encoded.hasRemaining()) - { - callback.succeeded(); - return true; - } - } - ByteBuffer decoded = buffer; - if (LOG.isDebugEnabled()) - LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded)); - - contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); - - synchronized (this) - { - if (demand() <= 0) - { - this.encoded = encoded; - this.callback = callback; - return false; - } + callback.succeeded(); + encoded = null; + callback = null; + return true; } } - } - catch (Throwable x) - { - callback.failed(x); - return true; + ByteBuffer decoded = buffer; + if (LOG.isDebugEnabled()) + LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded)); + + contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); + + boolean hasDemand = hasDemandOrStall(); + if (LOG.isDebugEnabled()) + LOG.debug("Response content decoded {}, hasDemand={}", response, hasDemand); + if (!hasDemand) + return false; } } private void resume() { - ByteBuffer encoded; - Callback callback; - synchronized (this) - { - encoded = this.encoded; - callback = this.callback; - } - if (decode(encoded, callback)) + if (LOG.isDebugEnabled()) + LOG.debug("Response content resuming decoding {}", response); + if (decode()) receive(); } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientDemandTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientDemandTest.java index faf58f25cdc..879016d03a6 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientDemandTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientDemandTest.java @@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongConsumer; +import java.util.zip.GZIPOutputStream; +import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -38,6 +40,7 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.server.Request; @@ -360,4 +363,55 @@ public class HttpClientDemandTest extends AbstractTest demand.accept(Long.MAX_VALUE); assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); } + + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testGZippedResponseContentWithAsyncDemand(Transport transport) throws Exception + { + init(transport); + + int chunks = 64; + byte[] content = new byte[chunks * 1024]; + new Random().nextBytes(content); + + scenario.start(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try (GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream())) + { + response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip"); + for (int i = 0; i < chunks; ++i) + { + Thread.sleep(10); + gzip.write(content, i * 1024, 1024); + } + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + }); + + byte[] bytes = new byte[content.length]; + ByteBuffer received = ByteBuffer.wrap(bytes); + CountDownLatch resultLatch = new CountDownLatch(1); + scenario.client.newRequest(scenario.newURI()) + .onResponseContentDemanded((response, demand, buffer, callback) -> + { + received.put(buffer); + callback.succeeded(); + new Thread(() -> demand.accept(1)).start(); + }) + .send(result -> + { + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + resultLatch.countDown(); + }); + assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + assertArrayEquals(content, bytes); + } }