Issue #3951 - Consider adding demand API to HTTP/2.

Introduced Stream.Listener.onBeforeData() to initialize the demand.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-10-01 22:23:00 +02:00
parent 16b21ecb6e
commit df8ca37983
3 changed files with 117 additions and 16 deletions

View File

@ -45,6 +45,7 @@ import org.eclipse.jetty.util.FuturePromise;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -167,6 +168,66 @@ public class DataDemandTest extends AbstractTest
assertNull(serverStream.getSession().getStream(serverStream.getId()));
}
@Test
public void testBeforeData() throws Exception
{
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, false), Callback.from(() -> sendData(stream), x -> {}));
return null;
}
private void sendData(Stream stream)
{
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024 * 1024), true), Callback.NOOP);
}
});
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request post = newRequest("GET", new HttpFields());
FuturePromise<Stream> promise = new FuturePromise<>();
CountDownLatch responseLatch = new CountDownLatch(1);
CountDownLatch beforeDataLatch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);
client.newStream(new HeadersFrame(post, null, true), promise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
assertEquals(HttpStatus.OK_200, response.getStatus());
responseLatch.countDown();
}
@Override
public void onBeforeData(Stream stream)
{
beforeDataLatch.countDown();
// Don't demand.
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
latch.countDown();
}
});
Stream clientStream = promise.get(5, TimeUnit.SECONDS);
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
assertTrue(beforeDataLatch.await(5, TimeUnit.SECONDS));
// Should not receive DATA frames until demanded.
assertFalse(latch.await(1, TimeUnit.SECONDS));
// Now demand the first DATA frame.
clientStream.demand(1);
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testSynchronousDemandDoesNotStackOverflow() throws Exception
{
@ -180,8 +241,8 @@ public class DataDemandTest extends AbstractTest
@Override
public void onDataDemanded(Stream stream, DataFrame frame, Callback callback)
{
stream.demand(1);
callback.succeeded();
stream.demand(1);
if (frame.isEndStream())
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());

View File

@ -49,12 +49,14 @@ import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpable
{
private static final Logger LOG = Log.getLogger(HTTP2Stream.class);
private final AutoLock lock = new AutoLock();
private final Queue<DataEntry> dataQueue = new ArrayDeque<>();
private final AtomicReference<Object> attachment = new AtomicReference<>();
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
@ -82,8 +84,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
this.request = request;
this.local = local;
this.dataLength = Long.MIN_VALUE;
// Deliver the first DATA frame.
this.dataDemand = 1;
this.dataDemand = -1;
}
@Override
@ -351,17 +352,23 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
}
}
boolean initial;
boolean proceed = false;
DataEntry entry = new DataEntry(frame, callback);
synchronized (this)
try (AutoLock l = lock.lock())
{
dataQueue.offer(entry);
if (!dataProcess)
initial = dataDemand < 0;
if (initial)
dataDemand = 0;
else if (!dataProcess)
dataProcess = proceed = dataDemand > 0;
if (LOG.isDebugEnabled())
LOG.debug("{} data processing of {} for {}", proceed ? "Proceeding" : "Stalling", frame, this);
}
if (proceed)
if (LOG.isDebugEnabled())
LOG.debug("{} data processing of {} for {}", initial ? "Starting" : proceed ? "Proceeding" : "Stalling", frame, this);
if (initial)
notifyBeforeData(this);
else if (proceed)
processData();
}
@ -370,15 +377,16 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
{
if (n <= 0)
throw new IllegalArgumentException("Invalid demand " + n);
long demand;
boolean proceed = false;
synchronized (this)
try (AutoLock l = lock.lock())
{
dataDemand = MathUtils.cappedAdd(dataDemand, n);
demand = dataDemand = MathUtils.cappedAdd(dataDemand, n);
if (!dataProcess)
dataProcess = proceed = !dataQueue.isEmpty();
if (LOG.isDebugEnabled())
LOG.debug("Demand {}/{}, {} data processing for {}", n, dataDemand, proceed ? "proceeding" : "stalling", this);
}
if (LOG.isDebugEnabled())
LOG.debug("Demand {}/{}, {} data processing for {}", n, demand, proceed ? "proceeding" : "stalling", this);
if (proceed)
processData();
}
@ -388,7 +396,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
while (true)
{
DataEntry dataEntry;
synchronized (this)
try (AutoLock l = lock.lock())
{
if (dataQueue.isEmpty() || dataDemand == 0)
{
@ -403,13 +411,13 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
DataFrame frame = dataEntry.frame;
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
notifyDataRequested(this, frame, dataEntry.callback);
notifyDataDemanded(this, frame, dataEntry.callback);
}
}
private long demand()
{
synchronized (this)
try (AutoLock l = lock.lock())
{
return dataDemand;
}
@ -639,7 +647,27 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
}
}
private void notifyDataRequested(Stream stream, DataFrame frame, Callback callback)
private void notifyBeforeData(Stream stream)
{
Listener listener = this.listener;
if (listener != null)
{
try
{
listener.onBeforeData(stream);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
else
{
stream.demand(1);
}
}
private void notifyDataDemanded(Stream stream, DataFrame frame, Callback callback)
{
Listener listener = this.listener;
if (listener != null)
@ -657,6 +685,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
else
{
callback.succeeded();
stream.demand(1);
}
}

View File

@ -180,6 +180,17 @@ public interface Stream
*/
public Listener onPush(Stream stream, PushPromiseFrame frame);
/**
* <p>Callback method invoked before notifying the first DATA frame.</p>
* <p>The default implementation initializes the demand for DATA frames.</p>
*
* @param stream the stream
*/
public default void onBeforeData(Stream stream)
{
stream.demand(1);
}
/**
* <p>Callback method invoked when a DATA frame has been received.</p>
*