Merged branch 'jetty-9.4.x' into 'master'.

This commit is contained in:
Simone Bordet 2016-07-29 12:10:35 +02:00
commit a2f4e98ec4
2 changed files with 119 additions and 35 deletions

View File

@ -20,7 +20,9 @@ package org.eclipse.jetty.http2.client.http;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Locale; import java.util.Locale;
import java.util.Queue;
import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpExchange;
@ -38,10 +40,12 @@ import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CompletableCallback; import org.eclipse.jetty.util.IteratingCallback;
public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listener public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listener
{ {
private final ContentNotifier contentNotifier = new ContentNotifier();
public HttpReceiverOverHTTP2(HttpChannel channel) public HttpReceiverOverHTTP2(HttpChannel channel)
{ {
super(channel); super(channel);
@ -111,40 +115,8 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
copy.put(original); copy.put(original);
BufferUtil.flipToFlush(copy, 0); BufferUtil.flipToFlush(copy, 0);
CompletableCallback delegate = new CompletableCallback() contentNotifier.offer(new DataInfo(exchange, copy, callback, frame.isEndStream()));
{ contentNotifier.iterate();
@Override
public void succeeded()
{
byteBufferPool.release(copy);
callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
byteBufferPool.release(copy);
callback.failed(x);
super.failed(x);
}
@Override
public void resume()
{
if (frame.isEndStream())
responseSuccess(exchange);
}
@Override
public void abort(Throwable failure)
{
}
};
responseContent(exchange, copy, delegate);
if (!delegate.tryComplete())
delegate.resume();
} }
@Override @Override
@ -164,4 +136,70 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
{ {
responseFailure(failure); responseFailure(failure);
} }
private class ContentNotifier extends IteratingCallback
{
private final Queue<DataInfo> queue = new ArrayDeque<>();
private DataInfo dataInfo;
private boolean offer(DataInfo dataInfo)
{
synchronized (this)
{
return queue.offer(dataInfo);
}
}
@Override
protected Action process() throws Exception
{
DataInfo dataInfo;
synchronized (this)
{
dataInfo = queue.poll();
}
if (dataInfo == null)
return Action.IDLE;
this.dataInfo = dataInfo;
responseContent(dataInfo.exchange, dataInfo.buffer, this);
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
ByteBufferPool byteBufferPool = getHttpDestination().getHttpClient().getByteBufferPool();
byteBufferPool.release(dataInfo.buffer);
dataInfo.callback.succeeded();
if (dataInfo.last)
responseSuccess(dataInfo.exchange);
super.succeeded();
}
@Override
protected void onCompleteFailure(Throwable failure)
{
ByteBufferPool byteBufferPool = getHttpDestination().getHttpClient().getByteBufferPool();
byteBufferPool.release(dataInfo.buffer);
dataInfo.callback.failed(failure);
responseFailure(failure);
}
}
private static class DataInfo
{
private final HttpExchange exchange;
private final ByteBuffer buffer;
private final Callback callback;
private final boolean last;
private DataInfo(HttpExchange exchange, ByteBuffer buffer, Callback callback, boolean last)
{
this.exchange = exchange;
this.buffer = buffer;
this.callback = callback;
this.last = last;
}
}
} }

View File

@ -26,6 +26,8 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import javax.servlet.ServletException; import javax.servlet.ServletException;
@ -44,6 +46,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
@ -442,6 +445,49 @@ public class HttpClientTest extends AbstractTest
Assert.assertTrue(closeLatch.await(1, TimeUnit.SECONDS)); Assert.assertTrue(closeLatch.await(1, TimeUnit.SECONDS));
} }
@Test
public void testAsyncResponseContentBackPressure() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
// Large write to generate multiple DATA frames.
response.getOutputStream().write(new byte[256 * 1024]);
}
});
CountDownLatch completeLatch = new CountDownLatch(1);
AtomicInteger counter = new AtomicInteger();
AtomicReference<Callback> callbackRef = new AtomicReference<>();
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(new CountDownLatch(1));
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.onResponseContentAsync((response, content, callback) ->
{
if (counter.incrementAndGet() == 1)
{
callbackRef.set(callback);
latchRef.get().countDown();
}
else
{
callback.succeeded();
}
})
.send(result -> completeLatch.countDown());
Assert.assertTrue(latchRef.get().await(5, TimeUnit.SECONDS));
// Wait some time to verify that back pressure is applied correctly.
Thread.sleep(1000);
Assert.assertEquals(1, counter.get());
callbackRef.get().succeeded();
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
private void sleep(long time) throws IOException private void sleep(long time) throws IOException
{ {
try try