Merge pull request #4286 from eclipse/jetty-9.4.x-4277-httpclient_async_gzip
Fixes #4277 - Reading streamed gzipped body never terminates.
This commit is contained in:
commit
4825832a53
|
@ -381,17 +381,23 @@ public abstract class HttpReceiver
|
|||
}
|
||||
}
|
||||
|
||||
boolean proceed = true;
|
||||
if (demand() <= 0)
|
||||
{
|
||||
callback.failed(new IllegalStateException("No demand for response content"));
|
||||
return false;
|
||||
proceed = false;
|
||||
}
|
||||
|
||||
HttpResponse response = exchange.getResponse();
|
||||
if (proceed)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
|
||||
|
||||
if (contentListeners.isEmpty())
|
||||
ContentListeners listeners = this.contentListeners;
|
||||
if (listeners != null)
|
||||
{
|
||||
if (listeners.isEmpty())
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
|
@ -400,22 +406,43 @@ public abstract class HttpReceiver
|
|||
Decoder decoder = this.decoder;
|
||||
if (decoder == null)
|
||||
{
|
||||
contentListeners.notifyContent(response, buffer, callback);
|
||||
listeners.notifyContent(response, buffer, callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!decoder.decode(buffer, callback))
|
||||
return false;
|
||||
try
|
||||
{
|
||||
proceed = decoder.decode(buffer, callback);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
callback.failed(x);
|
||||
proceed = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// May happen in case of concurrent abort.
|
||||
proceed = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
|
||||
{
|
||||
if (proceed)
|
||||
{
|
||||
boolean hasDemand = hasDemandOrStall();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
|
||||
return hasDemand;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
terminateResponse(exchange);
|
||||
return false;
|
||||
|
@ -580,6 +607,9 @@ public abstract class HttpReceiver
|
|||
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
|
||||
notifier.notifyFailure(listeners, response, failure);
|
||||
|
||||
// We want to deliver the "complete" event as last,
|
||||
// so we emit it here only if no event handlers are
|
||||
// executing, otherwise they will emit it.
|
||||
if (terminate)
|
||||
{
|
||||
// Mark atomically the response as terminated, with
|
||||
|
@ -758,7 +788,12 @@ public abstract class HttpReceiver
|
|||
|
||||
private boolean decode(ByteBuffer encoded, Callback callback)
|
||||
{
|
||||
try
|
||||
this.encoded = encoded;
|
||||
this.callback = callback;
|
||||
return decode();
|
||||
}
|
||||
|
||||
private boolean decode()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
|
@ -771,6 +806,8 @@ public abstract class HttpReceiver
|
|||
if (!encoded.hasRemaining())
|
||||
{
|
||||
callback.succeeded();
|
||||
encoded = null;
|
||||
callback = null;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -780,34 +817,19 @@ public abstract class HttpReceiver
|
|||
|
||||
contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
|
||||
|
||||
synchronized (this)
|
||||
{
|
||||
if (demand() <= 0)
|
||||
{
|
||||
this.encoded = encoded;
|
||||
this.callback = callback;
|
||||
boolean hasDemand = hasDemandOrStall();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response content decoded {}, hasDemand={}", response, hasDemand);
|
||||
if (!hasDemand)
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
callback.failed(x);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private void resume()
|
||||
{
|
||||
ByteBuffer encoded;
|
||||
Callback callback;
|
||||
synchronized (this)
|
||||
{
|
||||
encoded = this.encoded;
|
||||
callback = this.callback;
|
||||
}
|
||||
if (decode(encoded, callback))
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response content resuming decoding {}", response);
|
||||
if (decode())
|
||||
receive();
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
@ -38,6 +40,7 @@ import javax.servlet.http.HttpServletResponse;
|
|||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.client.util.BufferingResponseListener;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.io.MappedByteBufferPool;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
|
@ -360,4 +363,55 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
|
|||
demand.accept(Long.MAX_VALUE);
|
||||
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testGZippedResponseContentWithAsyncDemand(Transport transport) throws Exception
|
||||
{
|
||||
init(transport);
|
||||
|
||||
int chunks = 64;
|
||||
byte[] content = new byte[chunks * 1024];
|
||||
new Random().nextBytes(content);
|
||||
|
||||
scenario.start(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
try (GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream()))
|
||||
{
|
||||
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
|
||||
for (int i = 0; i < chunks; ++i)
|
||||
{
|
||||
Thread.sleep(10);
|
||||
gzip.write(content, i * 1024, 1024);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
byte[] bytes = new byte[content.length];
|
||||
ByteBuffer received = ByteBuffer.wrap(bytes);
|
||||
CountDownLatch resultLatch = new CountDownLatch(1);
|
||||
scenario.client.newRequest(scenario.newURI())
|
||||
.onResponseContentDemanded((response, demand, buffer, callback) ->
|
||||
{
|
||||
received.put(buffer);
|
||||
callback.succeeded();
|
||||
new Thread(() -> demand.accept(1)).start();
|
||||
})
|
||||
.send(result ->
|
||||
{
|
||||
assertTrue(result.isSucceeded());
|
||||
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
|
||||
resultLatch.countDown();
|
||||
});
|
||||
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
|
||||
assertArrayEquals(content, bytes);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue