Issue #3951 - Consider adding demand API to HTTP/2.
Made sure that Stream.Listener.onBeforeData() returns before calling Stream.Listener.onData(). Added test cases also for calling demand() outside data events. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
7061acfdad
commit
73853f7af7
|
@ -42,6 +42,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
|
||||||
import org.eclipse.jetty.io.MappedByteBufferPool;
|
import org.eclipse.jetty.io.MappedByteBufferPool;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.FuturePromise;
|
import org.eclipse.jetty.util.FuturePromise;
|
||||||
|
import org.eclipse.jetty.util.Promise;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
@ -169,7 +170,7 @@ public class DataDemandTest extends AbstractTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBeforeData() throws Exception
|
public void testOnBeforeData() throws Exception
|
||||||
{
|
{
|
||||||
start(new ServerSessionListener.Adapter()
|
start(new ServerSessionListener.Adapter()
|
||||||
{
|
{
|
||||||
|
@ -228,6 +229,99 @@ public class DataDemandTest extends AbstractTest
|
||||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDemandFromOnHeaders() throws Exception
|
||||||
|
{
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||||
|
stream.headers(new HeadersFrame(stream.getId(), response, null, false), Callback.from(() -> sendData(stream), x -> {}));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendData(Stream stream)
|
||||||
|
{
|
||||||
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024 * 1024), true), Callback.NOOP);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Session client = newClient(new Session.Listener.Adapter());
|
||||||
|
MetaData.Request post = newRequest("GET", new HttpFields());
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
client.newStream(new HeadersFrame(post, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
stream.demand(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onBeforeData(Stream stream)
|
||||||
|
{
|
||||||
|
// Do not demand from here, we have already demanded in onHeaders().
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
|
{
|
||||||
|
callback.succeeded();
|
||||||
|
if (frame.isEndStream())
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOnBeforeDataDoesNotReenter() throws Exception
|
||||||
|
{
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||||
|
stream.headers(new HeadersFrame(stream.getId(), response, null, false), Callback.from(() -> sendData(stream), x -> {}));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendData(Stream stream)
|
||||||
|
{
|
||||||
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024 * 1024), true), Callback.NOOP);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Session client = newClient(new Session.Listener.Adapter());
|
||||||
|
MetaData.Request post = newRequest("GET", new HttpFields());
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
client.newStream(new HeadersFrame(post, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
private boolean inBeforeData;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onBeforeData(Stream stream)
|
||||||
|
{
|
||||||
|
inBeforeData = true;
|
||||||
|
stream.demand(1);
|
||||||
|
inBeforeData = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
|
{
|
||||||
|
assertFalse(inBeforeData);
|
||||||
|
callback.succeeded();
|
||||||
|
if (frame.isEndStream())
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSynchronousDemandDoesNotStackOverflow() throws Exception
|
public void testSynchronousDemandDoesNotStackOverflow() throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -74,6 +74,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||||
private boolean remoteReset;
|
private boolean remoteReset;
|
||||||
private long dataLength;
|
private long dataLength;
|
||||||
private long dataDemand;
|
private long dataDemand;
|
||||||
|
private boolean dataInitial;
|
||||||
private boolean dataProcess;
|
private boolean dataProcess;
|
||||||
|
|
||||||
public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData.Request request, boolean local)
|
public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData.Request request, boolean local)
|
||||||
|
@ -84,7 +85,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||||
this.request = request;
|
this.request = request;
|
||||||
this.local = local;
|
this.local = local;
|
||||||
this.dataLength = Long.MIN_VALUE;
|
this.dataLength = Long.MIN_VALUE;
|
||||||
this.dataDemand = -1;
|
this.dataInitial = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -358,17 +359,30 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||||
try (AutoLock l = lock.lock())
|
try (AutoLock l = lock.lock())
|
||||||
{
|
{
|
||||||
dataQueue.offer(entry);
|
dataQueue.offer(entry);
|
||||||
initial = dataDemand < 0;
|
initial = dataInitial;
|
||||||
if (initial)
|
if (initial)
|
||||||
dataDemand = 0;
|
{
|
||||||
|
dataInitial = false;
|
||||||
|
// Fake that we are processing data so we return
|
||||||
|
// from onBeforeData() before calling onData().
|
||||||
|
dataProcess = true;
|
||||||
|
}
|
||||||
else if (!dataProcess)
|
else if (!dataProcess)
|
||||||
|
{
|
||||||
dataProcess = proceed = dataDemand > 0;
|
dataProcess = proceed = dataDemand > 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} data processing of {} for {}", initial ? "Starting" : proceed ? "Proceeding" : "Stalling", frame, this);
|
LOG.debug("{} data processing of {} for {}", initial ? "Starting" : proceed ? "Proceeding" : "Stalling", frame, this);
|
||||||
if (initial)
|
if (initial)
|
||||||
|
{
|
||||||
notifyBeforeData(this);
|
notifyBeforeData(this);
|
||||||
else if (proceed)
|
try (AutoLock l = lock.lock())
|
||||||
|
{
|
||||||
|
dataProcess = proceed = dataDemand > 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (proceed)
|
||||||
processData();
|
processData();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue