diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/DataDemandTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/DataDemandTest.java new file mode 100644 index 00000000000..22cb5a55e03 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/DataDemandTest.java @@ -0,0 +1,386 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.FlowControlStrategy; +import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.ISession; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.generator.Generator; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.MappedByteBufferPool; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.Promise; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class DataDemandTest extends AbstractTest +{ + @Test + public void testExplicitDemand() throws Exception + { + int length = FlowControlStrategy.DEFAULT_WINDOW_SIZE - 1; + AtomicReference serverStreamRef = new AtomicReference<>(); + Queue serverQueue = new ConcurrentLinkedQueue<>(); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverStreamRef.set(stream); + return new Stream.Listener.Adapter() + { + @Override + public void onDataDemanded(Stream stream, DataFrame frame, Callback callback) + { + // Don't demand and don't complete callbacks. + serverQueue.offer(frame); + } + }; + } + }); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request post = newRequest("POST", new HttpFields()); + FuturePromise promise = new FuturePromise<>(); + Queue clientQueue = new ConcurrentLinkedQueue<>(); + client.newStream(new HeadersFrame(post, null, false), promise, new Stream.Listener.Adapter() + { + @Override + public void onDataDemanded(Stream stream, DataFrame frame, Callback callback) + { + clientQueue.offer(frame); + } + }); + Stream clientStream = promise.get(5, TimeUnit.SECONDS); + // Send a single frame larger than the default frame size, + // so that it will be split on the server in multiple frames. + clientStream.data(new DataFrame(clientStream.getId(), ByteBuffer.allocate(length), true), Callback.NOOP); + + // The server should receive only 1 DATA frame because it does explicit demand. + // Wait a bit more to be sure it only receives 1 DATA frame. + Thread.sleep(1000); + assertEquals(1, serverQueue.size()); + + Stream serverStream = serverStreamRef.get(); + assertNotNull(serverStream); + + // Demand more DATA frames. + int count = 2; + serverStream.demand(count); + Thread.sleep(1000); + // The server should have received `count` more DATA frames. + assertEquals(1 + count, serverQueue.size()); + + // Demand all the rest. + serverStream.demand(Long.MAX_VALUE); + int loops = 0; + while (true) + { + if (++loops > 100) + fail(); + + Thread.sleep(100); + + long sum = serverQueue.stream() + .mapToLong(frame -> frame.getData().remaining()) + .sum(); + if (sum == length) + break; + } + + // Even if demanded, the flow control window should not have + // decreased because the callbacks have not been completed. + int recvWindow = ((ISession)serverStream.getSession()).updateRecvWindow(0); + assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE - length, recvWindow); + + // Send a large DATA frame to the client. + serverStream.data(new DataFrame(serverStream.getId(), ByteBuffer.allocate(length), true), Callback.NOOP); + + + // The client should receive only 1 DATA frame because it does explicit demand. + // Wait a bit more to be sure it only receives 1 DATA frame. + Thread.sleep(1000); + assertEquals(1, clientQueue.size()); + + // Demand more DATA frames. + clientStream.demand(count); + Thread.sleep(1000); + // The client should have received `count` more DATA frames. + assertEquals(1 + count, clientQueue.size()); + + // Demand all the rest. + clientStream.demand(Long.MAX_VALUE); + loops = 0; + while (true) + { + if (++loops > 100) + fail(); + + Thread.sleep(100); + + long sum = clientQueue.stream() + .mapToLong(frame -> frame.getData().remaining()) + .sum(); + if (sum == length) + break; + } + + // Both the client and server streams should be gone now. + assertNull(clientStream.getSession().getStream(clientStream.getId())); + assertNull(serverStream.getSession().getStream(serverStream.getId())); + } + + @Test + public void testOnBeforeData() throws Exception + { + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, false), Callback.from(() -> sendData(stream), x -> {})); + return null; + } + + private void sendData(Stream stream) + { + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024 * 1024), true), Callback.NOOP); + } + }); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request post = newRequest("GET", new HttpFields()); + FuturePromise promise = new FuturePromise<>(); + CountDownLatch responseLatch = new CountDownLatch(1); + CountDownLatch beforeDataLatch = new CountDownLatch(1); + CountDownLatch latch = new CountDownLatch(1); + client.newStream(new HeadersFrame(post, null, true), promise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + responseLatch.countDown(); + } + + @Override + public void onBeforeData(Stream stream) + { + beforeDataLatch.countDown(); + // Don't demand. + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + callback.succeeded(); + if (frame.isEndStream()) + latch.countDown(); + } + }); + Stream clientStream = promise.get(5, TimeUnit.SECONDS); + assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + assertTrue(beforeDataLatch.await(5, TimeUnit.SECONDS)); + // Should not receive DATA frames until demanded. + assertFalse(latch.await(1, TimeUnit.SECONDS)); + // Now demand the first DATA frame. + clientStream.demand(1); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testDemandFromOnHeaders() throws Exception + { + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, false), Callback.from(() -> sendData(stream), x -> {})); + return null; + } + + private void sendData(Stream stream) + { + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024 * 1024), true), Callback.NOOP); + } + }); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request post = newRequest("GET", new HttpFields()); + CountDownLatch latch = new CountDownLatch(1); + client.newStream(new HeadersFrame(post, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + stream.demand(1); + } + + @Override + public void onBeforeData(Stream stream) + { + // Do not demand from here, we have already demanded in onHeaders(). + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + callback.succeeded(); + if (frame.isEndStream()) + latch.countDown(); + } + }); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testOnBeforeDataDoesNotReenter() throws Exception + { + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, false), Callback.from(() -> sendData(stream), x -> {})); + return null; + } + + private void sendData(Stream stream) + { + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024 * 1024), true), Callback.NOOP); + } + }); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request post = newRequest("GET", new HttpFields()); + CountDownLatch latch = new CountDownLatch(1); + client.newStream(new HeadersFrame(post, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + private boolean inBeforeData; + + @Override + public void onBeforeData(Stream stream) + { + inBeforeData = true; + stream.demand(1); + inBeforeData = false; + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + assertFalse(inBeforeData); + callback.succeeded(); + if (frame.isEndStream()) + latch.countDown(); + } + }); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testSynchronousDemandDoesNotStackOverflow() throws Exception + { + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + return new Stream.Listener.Adapter() + { + @Override + public void onDataDemanded(Stream stream, DataFrame frame, Callback callback) + { + callback.succeeded(); + stream.demand(1); + if (frame.isEndStream()) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + } + } + }; + } + }); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request post = newRequest("POST", new HttpFields()); + FuturePromise promise = new FuturePromise<>(); + CountDownLatch latch = new CountDownLatch(1); + client.newStream(new HeadersFrame(post, null, false), promise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + latch.countDown(); + } + } + }); + Stream clientStream = promise.get(5, TimeUnit.SECONDS); + + // Generate a lot of small DATA frames and write them in a single + // write so that the server will continuously be notified and demand, + // which will test that it won't throw StackOverflowError. + MappedByteBufferPool byteBufferPool = new MappedByteBufferPool(); + Generator generator = new Generator(byteBufferPool); + ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); + for (int i = 512; i >= 0; --i) + generator.data(lease, new DataFrame(clientStream.getId(), ByteBuffer.allocate(1), i == 0), 1); + + // Since this is a naked write, we need to wait that the + // client finishes writing the SETTINGS reply to the server + // during connection initialization, or we risk a WritePendingException. + Thread.sleep(1000); + ((HTTP2Session)clientStream.getSession()).getEndPoint().write(Callback.NOOP, lease.getByteBuffers().toArray(new ByteBuffer[0])); + + assertTrue(latch.await(15, TimeUnit.SECONDS)); + } +} diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index c7030d731fe..9209243c75d 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -21,6 +21,8 @@ package org.eclipse.jetty.http2; import java.io.EOFException; import java.io.IOException; import java.nio.channels.WritePendingException; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -42,16 +44,20 @@ import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.io.IdleTimeout; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.MathUtils; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.Scheduler; public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpable { private static final Logger LOG = Log.getLogger(HTTP2Stream.class); + private final AutoLock lock = new AutoLock(); + private final Queue dataQueue = new ArrayDeque<>(); private final AtomicReference attachment = new AtomicReference<>(); private final AtomicReference> attributes = new AtomicReference<>(); private final AtomicReference closeState = new AtomicReference<>(CloseState.NOT_CLOSED); @@ -67,6 +73,9 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa private Listener listener; private boolean remoteReset; private long dataLength; + private long dataDemand; + private boolean dataInitial; + private boolean dataProcess; public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData.Request request, boolean local) { @@ -76,6 +85,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa this.request = request; this.local = local; this.dataLength = Long.MIN_VALUE; + this.dataInitial = true; } @Override @@ -343,10 +353,88 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa } } - if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED)) - session.removeStream(this); + boolean initial; + boolean proceed = false; + DataEntry entry = new DataEntry(frame, callback); + try (AutoLock l = lock.lock()) + { + dataQueue.offer(entry); + initial = dataInitial; + if (initial) + { + dataInitial = false; + // Fake that we are processing data so we return + // from onBeforeData() before calling onData(). + dataProcess = true; + } + else if (!dataProcess) + { + dataProcess = proceed = dataDemand > 0; + } + } + if (LOG.isDebugEnabled()) + LOG.debug("{} data processing of {} for {}", initial ? "Starting" : proceed ? "Proceeding" : "Stalling", frame, this); + if (initial) + { + notifyBeforeData(this); + try (AutoLock l = lock.lock()) + { + dataProcess = proceed = dataDemand > 0; + } + } + if (proceed) + processData(); + } - notifyData(this, frame, callback); + @Override + public void demand(long n) + { + if (n <= 0) + throw new IllegalArgumentException("Invalid demand " + n); + long demand; + boolean proceed = false; + try (AutoLock l = lock.lock()) + { + demand = dataDemand = MathUtils.cappedAdd(dataDemand, n); + if (!dataProcess) + dataProcess = proceed = !dataQueue.isEmpty(); + } + if (LOG.isDebugEnabled()) + LOG.debug("Demand {}/{}, {} data processing for {}", n, demand, proceed ? "proceeding" : "stalling", this); + if (proceed) + processData(); + } + + private void processData() + { + while (true) + { + DataEntry dataEntry; + try (AutoLock l = lock.lock()) + { + if (dataQueue.isEmpty() || dataDemand == 0) + { + if (LOG.isDebugEnabled()) + LOG.debug("Stalling data processing for {}", this); + dataProcess = false; + return; + } + --dataDemand; + dataEntry = dataQueue.poll(); + } + DataFrame frame = dataEntry.frame; + if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED)) + session.removeStream(this); + notifyDataDemanded(this, frame, dataEntry.callback); + } + } + + private long demand() + { + try (AutoLock l = lock.lock()) + { + return dataDemand; + } } private void onReset(ResetFrame frame, Callback callback) @@ -573,14 +661,34 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa } } - private void notifyData(Stream stream, DataFrame frame, Callback callback) + private void notifyBeforeData(Stream stream) { Listener listener = this.listener; if (listener != null) { try { - listener.onData(stream, frame, callback); + listener.onBeforeData(stream); + } + catch (Throwable x) + { + LOG.info("Failure while notifying listener " + listener, x); + } + } + else + { + stream.demand(1); + } + } + + private void notifyDataDemanded(Stream stream, DataFrame frame, Callback callback) + { + Listener listener = this.listener; + if (listener != null) + { + try + { + listener.onDataDemanded(stream, frame, callback); } catch (Throwable x) { @@ -591,6 +699,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa else { callback.succeeded(); + stream.demand(1); } } @@ -682,16 +791,29 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa @Override public String toString() { - return String.format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b/%b,%s,age=%d,attachment=%s}", + return String.format("%s@%x#%d{sendWindow=%s,recvWindow=%s,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}", getClass().getSimpleName(), hashCode(), getId(), sendWindow, recvWindow, + demand(), localReset, remoteReset, closeState, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - timeStamp), attachment); } + + private static class DataEntry + { + private final DataFrame frame; + private final Callback callback; + + private DataEntry(DataFrame frame, Callback callback) + { + this.frame = frame; + this.callback = callback; + } + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java index d2b5e9ed5f4..1f831073f68 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java @@ -29,7 +29,7 @@ import org.eclipse.jetty.util.Promise; *

A {@link Stream} represents a bidirectional exchange of data on top of a {@link Session}.

*

Differently from socket streams, where the input and output streams are permanently associated * with the socket (and hence with the connection that the socket represents), there can be multiple - * HTTP/2 streams present concurrent for an HTTP/2 session.

+ * HTTP/2 streams present concurrently for an HTTP/2 session.

*

A {@link Stream} maps to an HTTP request/response cycle, and after the request/response cycle is * completed, the stream is closed and removed from the session.

*

Like {@link Session}, {@link Stream} is the active part and by calling its API applications @@ -129,9 +129,25 @@ public interface Stream */ public void setIdleTimeout(long idleTimeout); + /** + *

Demands {@code n} more {@code DATA} frames for this stream.

+ * + * @param n the increment of the demand, must be greater than zero + * @see Listener#onDataDemanded(Stream, DataFrame, Callback) + */ + public void demand(long n); + /** *

A {@link Stream.Listener} is the passive counterpart of a {@link Stream} and receives * events happening on an HTTP/2 stream.

+ *

HTTP/2 data is flow controlled - this means that only a finite number of data events + * are delivered, until the flow control window is exhausted.

+ *

Applications control the delivery of data events by requesting them via + * {@link Stream#demand(long)}; the first event is always delivered, while subsequent + * events must be explicitly demanded.

+ *

Applications control the HTTP/2 flow control by completing the callback associated + * with data events - this allows the implementation to recycle the data buffer and + * eventually to enlarge the flow control window so that the sender can send more data.

* * @see Stream */ @@ -164,15 +180,42 @@ public interface Stream */ public Listener onPush(Stream stream, PushPromiseFrame frame); + /** + *

Callback method invoked before notifying the first DATA frame.

+ *

The default implementation initializes the demand for DATA frames.

+ * + * @param stream the stream + */ + public default void onBeforeData(Stream stream) + { + stream.demand(1); + } + /** *

Callback method invoked when a DATA frame has been received.

* * @param stream the stream * @param frame the DATA frame received * @param callback the callback to complete when the bytes of the DATA frame have been consumed + * @see #onDataDemanded(Stream, DataFrame, Callback) */ public void onData(Stream stream, DataFrame frame, Callback callback); + /** + *

Callback method invoked when a DATA frame has been demanded.

+ *

Implementations of this method must arrange to call (within the + * method or otherwise asynchronously) {@link #demand(long)}.

+ * + * @param stream the stream + * @param frame the DATA frame received + * @param callback the callback to complete when the bytes of the DATA frame have been consumed + */ + public default void onDataDemanded(Stream stream, DataFrame frame, Callback callback) + { + onData(stream, frame, callback); + stream.demand(1); + } + /** *

Callback method invoked when a RST_STREAM frame has been received for this stream.

*