Fixes #790 - AsyncContentListener semantic broken with HTTP/2 transport.

Now using an IteratingCallback to buffer DATA frames and delivering
their content respecting AsyncContentListener semantic.
This commit is contained in:
Simone Bordet 2016-07-29 12:08:22 +02:00
parent e0a1a1988f
commit 2cdea3601b
2 changed files with 119 additions and 35 deletions

View File

@ -20,7 +20,9 @@ package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Locale;
import java.util.Queue;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange;
@ -38,10 +40,12 @@ import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CompletableCallback;
import org.eclipse.jetty.util.IteratingCallback;
public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listener
{
private final ContentNotifier contentNotifier = new ContentNotifier();
public HttpReceiverOverHTTP2(HttpChannel channel)
{
super(channel);
@ -111,40 +115,8 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
copy.put(original);
BufferUtil.flipToFlush(copy, 0);
CompletableCallback delegate = new CompletableCallback()
{
@Override
public void succeeded()
{
byteBufferPool.release(copy);
callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
byteBufferPool.release(copy);
callback.failed(x);
super.failed(x);
}
@Override
public void resume()
{
if (frame.isEndStream())
responseSuccess(exchange);
}
@Override
public void abort(Throwable failure)
{
}
};
responseContent(exchange, copy, delegate);
if (!delegate.tryComplete())
delegate.resume();
contentNotifier.offer(new DataInfo(exchange, copy, callback, frame.isEndStream()));
contentNotifier.iterate();
}
@Override
@ -164,4 +136,70 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
{
responseFailure(failure);
}
private class ContentNotifier extends IteratingCallback
{
private final Queue<DataInfo> queue = new ArrayDeque<>();
private DataInfo dataInfo;
private boolean offer(DataInfo dataInfo)
{
synchronized (this)
{
return queue.offer(dataInfo);
}
}
@Override
protected Action process() throws Exception
{
DataInfo dataInfo;
synchronized (this)
{
dataInfo = queue.poll();
}
if (dataInfo == null)
return Action.IDLE;
this.dataInfo = dataInfo;
responseContent(dataInfo.exchange, dataInfo.buffer, this);
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
ByteBufferPool byteBufferPool = getHttpDestination().getHttpClient().getByteBufferPool();
byteBufferPool.release(dataInfo.buffer);
dataInfo.callback.succeeded();
if (dataInfo.last)
responseSuccess(dataInfo.exchange);
super.succeeded();
}
@Override
protected void onCompleteFailure(Throwable failure)
{
ByteBufferPool byteBufferPool = getHttpDestination().getHttpClient().getByteBufferPool();
byteBufferPool.release(dataInfo.buffer);
dataInfo.callback.failed(failure);
responseFailure(failure);
}
}
private static class DataInfo
{
private final HttpExchange exchange;
private final ByteBuffer buffer;
private final Callback callback;
private final boolean last;
private DataInfo(HttpExchange exchange, ByteBuffer buffer, Callback callback, boolean last)
{
this.exchange = exchange;
this.buffer = buffer;
this.callback = callback;
this.last = last;
}
}
}

View File

@ -26,6 +26,8 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import javax.servlet.ServletException;
@ -44,6 +46,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
@ -437,6 +440,49 @@ public class HttpClientTest extends AbstractTest
Assert.assertTrue(closeLatch.await(1, TimeUnit.SECONDS));
}
@Test
public void testAsyncResponseContentBackPressure() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
// Large write to generate multiple DATA frames.
response.getOutputStream().write(new byte[256 * 1024]);
}
});
CountDownLatch completeLatch = new CountDownLatch(1);
AtomicInteger counter = new AtomicInteger();
AtomicReference<Callback> callbackRef = new AtomicReference<>();
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(new CountDownLatch(1));
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.onResponseContentAsync((response, content, callback) ->
{
if (counter.incrementAndGet() == 1)
{
callbackRef.set(callback);
latchRef.get().countDown();
}
else
{
callback.succeeded();
}
})
.send(result -> completeLatch.countDown());
Assert.assertTrue(latchRef.get().await(5, TimeUnit.SECONDS));
// Wait some time to verify that back pressure is applied correctly.
Thread.sleep(1000);
Assert.assertEquals(1, counter.get());
callbackRef.get().succeeded();
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
private void sleep(long time) throws IOException
{
try