Fixes #3951 - Consider adding demand API to HTTP/2.

Implemented the demand mechanism for HTTP/2, both client and server.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-08-23 12:17:28 +02:00
parent a36af67abf
commit acc2f75f59
3 changed files with 358 additions and 8 deletions

View File

@ -0,0 +1,226 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class DataDemandTest extends AbstractTest
{
@Test
public void testExplicitDemand() throws Exception
{
int length = FlowControlStrategy.DEFAULT_WINDOW_SIZE - 1;
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
Queue<DataFrame> serverQueue = new ConcurrentLinkedQueue<>();
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
serverStreamRef.set(stream);
return new Stream.Listener.Adapter()
{
@Override
public void onDataRequested(Stream stream, DataFrame frame, Callback callback)
{
// Don't demand and don't complete callbacks.
serverQueue.offer(frame);
}
};
}
});
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request post = newRequest("POST", new HttpFields());
FuturePromise<Stream> promise = new FuturePromise<>();
Queue<DataFrame> clientQueue = new ConcurrentLinkedQueue<>();
client.newStream(new HeadersFrame(post, null, false), promise, new Stream.Listener.Adapter()
{
@Override
public void onDataRequested(Stream stream, DataFrame frame, Callback callback)
{
clientQueue.offer(frame);
}
});
Stream clientStream = promise.get(5, TimeUnit.SECONDS);
// Send a single frame larger than the default frame size,
// so that it will be split on the server in multiple frames.
clientStream.data(new DataFrame(clientStream.getId(), ByteBuffer.allocate(length), true), Callback.NOOP);
// The server should receive only 1 DATA frame because it does explicit demand.
// Wait a bit more to be sure it only receives 1 DATA frame.
Thread.sleep(1000);
assertEquals(1, serverQueue.size());
Stream serverStream = serverStreamRef.get();
assertNotNull(serverStream);
// Demand more DATA frames.
int count = 2;
serverStream.demand(count);
Thread.sleep(1000);
// The server should have received `count` more DATA frames.
assertEquals(1 + count, serverQueue.size());
// Demand all the rest.
serverStream.demand(Long.MAX_VALUE);
int loops = 0;
while (true)
{
if (++loops > 100)
fail();
Thread.sleep(100);
long sum = serverQueue.stream()
.mapToLong(frame -> frame.getData().remaining())
.sum();
if (sum == length)
break;
}
// Even if demanded, the flow control window should not have
// decreased because the callbacks have not been completed.
int recvWindow = ((ISession)serverStream.getSession()).updateRecvWindow(0);
assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE - length, recvWindow);
// Send a large DATA frame to the client.
serverStream.data(new DataFrame(serverStream.getId(), ByteBuffer.allocate(length), true), Callback.NOOP);
// The client should receive only 1 DATA frame because it does explicit demand.
// Wait a bit more to be sure it only receives 1 DATA frame.
Thread.sleep(1000);
assertEquals(1, clientQueue.size());
// Demand more DATA frames.
clientStream.demand(count);
Thread.sleep(1000);
// The client should have received `count` more DATA frames.
assertEquals(1 + count, clientQueue.size());
// Demand all the rest.
clientStream.demand(Long.MAX_VALUE);
loops = 0;
while (true)
{
if (++loops > 100)
fail();
Thread.sleep(100);
long sum = clientQueue.stream()
.mapToLong(frame -> frame.getData().remaining())
.sum();
if (sum == length)
break;
}
// Both the client and server streams should be gone now.
assertNull(clientStream.getSession().getStream(clientStream.getId()));
assertNull(serverStream.getSession().getStream(serverStream.getId()));
}
@Test
public void testSynchronousDemandDoesNotStackOverflow() throws Exception
{
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
return new Stream.Listener.Adapter()
{
@Override
public void onDataRequested(Stream stream, DataFrame frame, Callback callback)
{
stream.demand(1);
callback.succeeded();
if (frame.isEndStream())
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
}
};
}
});
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request post = newRequest("POST", new HttpFields());
FuturePromise<Stream> promise = new FuturePromise<>();
CountDownLatch latch = new CountDownLatch(1);
client.newStream(new HeadersFrame(post, null, false), promise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
assertEquals(HttpStatus.OK_200, response.getStatus());
latch.countDown();
}
}
});
Stream clientStream = promise.get(5, TimeUnit.SECONDS);
// Generate a lot of small DATA frames and write them in a single
// write so that the server will continuously be notified and demand,
// which will test that it won't throw StackOverflowError.
MappedByteBufferPool byteBufferPool = new MappedByteBufferPool();
Generator generator = new Generator(byteBufferPool);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
for (int i = 512; i >= 0; --i)
generator.data(lease, new DataFrame(clientStream.getId(), ByteBuffer.allocate(1), i == 0), 1);
((HTTP2Session)clientStream.getSession()).getEndPoint().write(Callback.NOOP, lease.getByteBuffers().toArray(new ByteBuffer[0]));
assertTrue(latch.await(15, TimeUnit.SECONDS));
}
}

View File

@ -21,6 +21,8 @@ package org.eclipse.jetty.http2;
import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.WritePendingException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -52,6 +54,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
{
private static final Logger LOG = Log.getLogger(HTTP2Stream.class);
private final Queue<DataEntry> dataQueue = new ArrayDeque<>();
private final AtomicReference<Object> attachment = new AtomicReference<>();
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
@ -67,6 +70,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private Listener listener;
private boolean remoteReset;
private long dataLength;
private long dataDemand;
private boolean dataProcess;
public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData.Request request, boolean local)
{
@ -76,6 +81,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
this.request = request;
this.local = local;
this.dataLength = Long.MIN_VALUE;
// Deliver the first DATA frame.
this.dataDemand = 1;
}
@Override
@ -343,10 +350,80 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
}
}
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
boolean proceed = false;
DataEntry entry = new DataEntry(frame, callback);
synchronized (this)
{
dataQueue.offer(entry);
if (!dataProcess)
dataProcess = proceed = dataDemand > 0;
if (LOG.isDebugEnabled())
LOG.debug("{} data processing of {} for {}", proceed ? "Proceeding" : "Stalling", frame, this);
}
if (proceed)
processData();
}
notifyData(this, frame, callback);
@Override
public void demand(long n)
{
if (n <= 0)
throw new IllegalArgumentException("Invalid demand " + n);
boolean proceed = false;
synchronized (this)
{
dataDemand = addDemand(dataDemand, n);
if (!dataProcess)
dataProcess = proceed = !dataQueue.isEmpty();
if (LOG.isDebugEnabled())
LOG.debug("Demand {}/{}, {} data processing for {}", n, dataDemand, proceed ? "proceeding" : "stalling", this);
}
if (proceed)
processData();
}
private void processData()
{
while (true)
{
DataEntry dataEntry;
synchronized (this)
{
if (dataQueue.isEmpty() || dataDemand == 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Stalling data processing for {}", this);
dataProcess = false;
return;
}
--dataDemand;
dataEntry = dataQueue.poll();
}
DataFrame frame = dataEntry.frame;
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
notifyDataRequested(this, frame, dataEntry.callback);
}
}
private long demand()
{
synchronized (this)
{
return dataDemand;
}
}
private static long addDemand(long x, long y)
{
try
{
return Math.addExact(x, y);
}
catch (ArithmeticException e)
{
return Long.MAX_VALUE;
}
}
private void onReset(ResetFrame frame, Callback callback)
@ -573,14 +650,14 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
}
}
private void notifyData(Stream stream, DataFrame frame, Callback callback)
private void notifyDataRequested(Stream stream, DataFrame frame, Callback callback)
{
Listener listener = this.listener;
if (listener != null)
{
try
{
listener.onData(stream, frame, callback);
listener.onDataRequested(stream, frame, callback);
}
catch (Throwable x)
{
@ -682,16 +759,29 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
@Override
public String toString()
{
return String.format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b/%b,%s,age=%d,attachment=%s}",
return String.format("%s@%x#%d{sendWindow=%s,recvWindow=%s,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}",
getClass().getSimpleName(),
hashCode(),
getId(),
sendWindow,
recvWindow,
demand(),
localReset,
remoteReset,
closeState,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - timeStamp),
attachment);
}
private static class DataEntry
{
private final DataFrame frame;
private final Callback callback;
private DataEntry(DataFrame frame, Callback callback)
{
this.frame = frame;
this.callback = callback;
}
}
}

View File

@ -29,7 +29,7 @@ import org.eclipse.jetty.util.Promise;
* <p>A {@link Stream} represents a bidirectional exchange of data on top of a {@link Session}.</p>
* <p>Differently from socket streams, where the input and output streams are permanently associated
* with the socket (and hence with the connection that the socket represents), there can be multiple
* HTTP/2 streams present concurrent for a HTTP/2 session.</p>
* HTTP/2 streams present concurrently for a HTTP/2 session.</p>
* <p>A {@link Stream} maps to a HTTP request/response cycle, and after the request/response cycle is
* completed, the stream is closed and removed from the session.</p>
* <p>Like {@link Session}, {@link Stream} is the active part and by calling its API applications
@ -129,9 +129,25 @@ public interface Stream
*/
public void setIdleTimeout(long idleTimeout);
/**
* <p>Demands {@code n} more {@code DATA} frames for this stream.</p>
*
* @param n the increment of the demand, must be greater than zero
* @see Listener#onDataRequested(Stream, DataFrame, Callback)
*/
public void demand(long n);
/**
* <p>A {@link Stream.Listener} is the passive counterpart of a {@link Stream} and receives
* events happening on a HTTP/2 stream.</p>
* <p>HTTP/2 data is flow controlled - this means that only a finite number of data events
* are delivered, until the flow control window is exhausted.</p>
* <p>Applications control the delivery of data events by requesting them via
* {@link Stream#demand(long)}; the first event is always delivered, while subsequent
* events must be explicitly demanded.</p>
* <p>Applications control the HTTP/2 flow control by completing the callback associated
* with data events - this allows the implementation to recycle the data buffer and
* eventually enlarges the flow control window so that the sender can send more data.</p>
*
* @see Stream
*/
@ -166,12 +182,30 @@ public interface Stream
/**
* <p>Callback method invoked when a DATA frame has been received.</p>
* <p>When this method is called, the {@link #demand(long) demand} has
* already been incremented by 1.</p>
*
* @param stream the stream
* @param frame the DATA frame received
* @param callback the callback to complete when the bytes of the DATA frame have been consumed
* @see #onDataRequested(Stream, DataFrame, Callback)
*/
public void onData(Stream stream, DataFrame frame, Callback callback);
/**
* <p>Callback method invoked when a DATA frame has been demanded.</p>
* <p>Implementations of this method must arrange to call (within the
* method or otherwise asynchronously) {@link #demand(long)}.</p>
*
* @param stream the stream
* @param frame the DATA frame received
* @param callback the callback to complete when the bytes of the DATA frame have been consumed
*/
public void onData(Stream stream, DataFrame frame, Callback callback);
public default void onDataRequested(Stream stream, DataFrame frame, Callback callback)
{
stream.demand(1);
onData(stream, frame, callback);
}
/**
* <p>Callback method invoked when a RST_STREAM frame has been received for this stream.</p>