From 73853f7af76bd2dc4fa96caade5a4932088f22cd Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 2 Oct 2019 11:08:16 +0200 Subject: [PATCH] Issue #3951 - Consider adding demand API to HTTP/2. Made sure that Stream.Listener.onBeforeData() returns before calling Stream.Listener.onData(). Added test cases also for calling demand() outside data events. Signed-off-by: Simone Bordet --- .../jetty/http2/client/DataDemandTest.java | 96 ++++++++++++++++++- .../org/eclipse/jetty/http2/HTTP2Stream.java | 22 ++++- 2 files changed, 113 insertions(+), 5 deletions(-) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/DataDemandTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/DataDemandTest.java index f8cac7021aa..b6b8f7a8b38 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/DataDemandTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/DataDemandTest.java @@ -42,6 +42,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.Promise; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -169,7 +170,7 @@ public class DataDemandTest extends AbstractTest } @Test - public void testBeforeData() throws Exception + public void testOnBeforeData() throws Exception { start(new ServerSessionListener.Adapter() { @@ -228,6 +229,99 @@ public class DataDemandTest extends AbstractTest assertTrue(latch.await(5, TimeUnit.SECONDS)); } + @Test + public void testDemandFromOnHeaders() throws Exception + { + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, false), Callback.from(() -> sendData(stream), x -> {})); + return null; + } + + private void sendData(Stream stream) + { + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024 * 1024), true), Callback.NOOP); + } + }); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request post = newRequest("GET", new HttpFields()); + CountDownLatch latch = new CountDownLatch(1); + client.newStream(new HeadersFrame(post, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + stream.demand(1); + } + + @Override + public void onBeforeData(Stream stream) + { + // Do not demand from here, we have already demanded in onHeaders(). + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + callback.succeeded(); + if (frame.isEndStream()) + latch.countDown(); + } + }); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testOnBeforeDataDoesNotReenter() throws Exception + { + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, false), Callback.from(() -> sendData(stream), x -> {})); + return null; + } + + private void sendData(Stream stream) + { + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024 * 1024), true), Callback.NOOP); + } + }); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request post = newRequest("GET", new HttpFields()); + CountDownLatch latch = new CountDownLatch(1); + client.newStream(new HeadersFrame(post, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + private boolean inBeforeData; + + @Override + public void onBeforeData(Stream stream) + { + inBeforeData = true; + stream.demand(1); + inBeforeData = false; + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + assertFalse(inBeforeData); + callback.succeeded(); + if (frame.isEndStream()) + latch.countDown(); + } + }); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + @Test public void testSynchronousDemandDoesNotStackOverflow() throws Exception { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index dc970c81202..9209243c75d 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -74,6 +74,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa private boolean remoteReset; private long dataLength; private long dataDemand; + private boolean dataInitial; private boolean dataProcess; public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData.Request request, boolean local) @@ -84,7 +85,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa this.request = request; this.local = local; this.dataLength = Long.MIN_VALUE; - this.dataDemand = -1; + this.dataInitial = true; } @Override @@ -358,17 +359,30 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa try (AutoLock l = lock.lock()) { dataQueue.offer(entry); - initial = dataDemand < 0; + initial = dataInitial; if (initial) - dataDemand = 0; + { + dataInitial = false; + // Fake that we are processing data so we return + // from onBeforeData() before calling onData(). + dataProcess = true; + } else if (!dataProcess) + { dataProcess = proceed = dataDemand > 0; + } } if (LOG.isDebugEnabled()) LOG.debug("{} data processing of {} for {}", initial ? "Starting" : proceed ? "Proceeding" : "Stalling", frame, this); if (initial) + { notifyBeforeData(this); - else if (proceed) + try (AutoLock l = lock.lock()) + { + dataProcess = proceed = dataDemand > 0; + } + } + if (proceed) processData(); }