Fixes #4277 - Reading streamed gzipped body never terminates.

Fixed handling of demand in case of gzipped response content.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-11-07 22:32:11 +01:00
parent 18e7ee5940
commit a83c297a11
2 changed files with 133 additions and 57 deletions

View File

@ -381,40 +381,67 @@ public abstract class HttpReceiver
}
}
boolean proceed = true;
if (demand() <= 0)
{
callback.failed(new IllegalStateException("No demand for response content"));
return false;
proceed = false;
}
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (proceed)
{
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (contentListeners.isEmpty())
{
callback.succeeded();
}
else
{
Decoder decoder = this.decoder;
if (decoder == null)
ContentListeners listeners = this.contentListeners;
if (listeners != null)
{
contentListeners.notifyContent(response, buffer, callback);
if (listeners.isEmpty())
{
callback.succeeded();
}
else
{
Decoder decoder = this.decoder;
if (decoder == null)
{
listeners.notifyContent(response, buffer, callback);
}
else
{
try
{
proceed = decoder.decode(buffer, callback);
}
catch (Throwable x)
{
callback.failed(x);
proceed = false;
}
}
}
}
else
{
if (!decoder.decode(buffer, callback))
return false;
// May happen in case of concurrent abort.
proceed = false;
}
}
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
return hasDemand;
if (proceed)
{
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
return hasDemand;
}
else
{
return false;
}
}
terminateResponse(exchange);
@ -580,6 +607,9 @@ public abstract class HttpReceiver
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyFailure(listeners, response, failure);
// We want to deliver the "complete" event as last,
// so we emit it here only if no event handlers are
// executing, otherwise they will emit it.
if (terminate)
{
// Mark atomically the response as terminated, with
@ -758,56 +788,48 @@ public abstract class HttpReceiver
private boolean decode(ByteBuffer encoded, Callback callback)
{
try
this.encoded = encoded;
this.callback = callback;
return decode();
}
private boolean decode()
{
while (true)
{
ByteBuffer buffer;
while (true)
{
ByteBuffer buffer;
while (true)
buffer = decoder.decode(encoded);
if (buffer.hasRemaining())
break;
if (!encoded.hasRemaining())
{
buffer = decoder.decode(encoded);
if (buffer.hasRemaining())
break;
if (!encoded.hasRemaining())
{
callback.succeeded();
return true;
}
}
ByteBuffer decoded = buffer;
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
synchronized (this)
{
if (demand() <= 0)
{
this.encoded = encoded;
this.callback = callback;
return false;
}
callback.succeeded();
encoded = null;
callback = null;
return true;
}
}
}
catch (Throwable x)
{
callback.failed(x);
return true;
ByteBuffer decoded = buffer;
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded {}, hasDemand={}", response, hasDemand);
if (!hasDemand)
return false;
}
}
private void resume()
{
ByteBuffer encoded;
Callback callback;
synchronized (this)
{
encoded = this.encoded;
callback = this.callback;
}
if (decode(encoded, callback))
if (LOG.isDebugEnabled())
LOG.debug("Response content resuming decoding {}", response);
if (decode())
receive();
}

View File

@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.zip.GZIPOutputStream;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -38,6 +40,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.Request;
@ -360,4 +363,55 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
demand.accept(Long.MAX_VALUE);
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testGZippedResponseContentWithAsyncDemand(Transport transport) throws Exception
{
init(transport);
int chunks = 64;
byte[] content = new byte[chunks * 1024];
new Random().nextBytes(content);
scenario.start(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try (GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream()))
{
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
for (int i = 0; i < chunks; ++i)
{
Thread.sleep(10);
gzip.write(content, i * 1024, 1024);
}
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
});
byte[] bytes = new byte[content.length];
ByteBuffer received = ByteBuffer.wrap(bytes);
CountDownLatch resultLatch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
.onResponseContentDemanded((response, demand, buffer, callback) ->
{
received.put(buffer);
callback.succeeded();
new Thread(() -> demand.accept(1)).start();
})
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown();
});
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
assertArrayEquals(content, bytes);
}
}