Calling notIdle() to avoid the idle timeout triggers when sending frames.

This commit is contained in:
Simone Bordet 2014-09-25 18:18:28 +02:00
parent 9a1acd59fc
commit 8af9ea4030
2 changed files with 100 additions and 26 deletions

View File

@ -467,7 +467,7 @@ public class IdleTimeoutTest extends AbstractTest
final Stream stream = promise.get(5, TimeUnit.SECONDS); final Stream stream = promise.get(5, TimeUnit.SECONDS);
sleep(idleTimeout / 2); sleep(idleTimeout / 2);
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), new Callback.Adapter() stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), new Callback.Adapter()
{ {
private int sends; private int sends;
@ -482,16 +482,62 @@ public class IdleTimeoutTest extends AbstractTest
@Override @Override
public void succeeded() public void succeeded()
{ {
latch.countDown(); dataLatch.countDown();
} }
}); });
} }
}); });
Assert.assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); Assert.assertTrue(dataLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertFalse(timeoutLatch.await(0, TimeUnit.SECONDS)); Assert.assertFalse(timeoutLatch.await(0, TimeUnit.SECONDS));
} }
@Test
public void testStreamIdleTimeoutIsNotEnforcedWhenSending() throws Exception
{
final CountDownLatch resetLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE);
return null;
}
@Override
public void onReset(Session session, ResetFrame frame)
{
resetLatch.countDown();
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, false);
FuturePromise<Stream> promise = new FuturePromise<Stream>()
{
@Override
public void succeeded(Stream stream)
{
stream.setIdleTimeout(idleTimeout);
super.succeeded(stream);
}
};
session.newStream(requestFrame, promise, new Stream.Listener.Adapter());
final Stream stream = promise.get(5, TimeUnit.SECONDS);
sleep(idleTimeout / 2);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.Adapter.INSTANCE);
sleep(idleTimeout / 2);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.Adapter.INSTANCE);
sleep(idleTimeout / 2);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.Adapter.INSTANCE);
Assert.assertFalse(resetLatch.await(0, TimeUnit.SECONDS));
}
private void sleep(long value) private void sleep(long value)
{ {
try try

View File

@ -72,24 +72,28 @@ public class HTTP2Stream extends IdleTimeout implements IStream
@Override @Override
public void headers(HeadersFrame frame, Callback callback) public void headers(HeadersFrame frame, Callback callback)
{ {
notIdle();
session.control(this, callback, frame, Frame.EMPTY_ARRAY); session.control(this, callback, frame, Frame.EMPTY_ARRAY);
} }
@Override @Override
public void push(PushPromiseFrame frame, Promise<Stream> promise) public void push(PushPromiseFrame frame, Promise<Stream> promise)
{ {
notIdle();
session.push(this, promise, frame); session.push(this, promise, frame);
} }
@Override @Override
public void data(DataFrame frame, Callback callback) public void data(DataFrame frame, Callback callback)
{ {
notIdle();
session.data(this, callback, frame); session.data(this, callback, frame);
} }
@Override @Override
public void reset(ResetFrame frame, Callback callback) public void reset(ResetFrame frame, Callback callback)
{ {
notIdle();
session.control(this, callback, frame, Frame.EMPTY_ARRAY); session.control(this, callback, frame, Frame.EMPTY_ARRAY);
} }
@ -176,39 +180,23 @@ public class HTTP2Stream extends IdleTimeout implements IStream
public boolean process(Frame frame, Callback callback) public boolean process(Frame frame, Callback callback)
{ {
notIdle(); notIdle();
switch (frame.getType()) switch (frame.getType())
{ {
case DATA:
{
// TODO: handle cases where:
// TODO: A) stream already remotely close.
// TODO: B) DATA before HEADERS.
if (getRecvWindow() < 0)
{
// It's a bad client, it does not deserve to be
// treated gently by just resetting the stream.
session.close(ErrorCodes.FLOW_CONTROL_ERROR, "stream_window_exceeded", callback);
return false;
}
notifyData(this, (DataFrame)frame, callback);
return false;
}
case HEADERS: case HEADERS:
{ {
// TODO: handle case where HEADERS after DATA. return onHeaders((HeadersFrame)frame, callback);
return false; }
case DATA:
{
return onData((DataFrame)frame, callback);
} }
case RST_STREAM: case RST_STREAM:
{ {
reset = true; return onReset((ResetFrame)frame, callback);
return false;
} }
case PUSH_PROMISE: case PUSH_PROMISE:
{ {
return false; return onPush((PushPromiseFrame)frame, callback);
} }
default: default:
{ {
@ -217,6 +205,46 @@ public class HTTP2Stream extends IdleTimeout implements IStream
} }
} }
private boolean onHeaders(HeadersFrame frame, Callback callback)
{
// TODO: handle case where HEADERS after DATA.
callback.succeeded();
return false;
}
private boolean onData(DataFrame frame, Callback callback)
{
// TODO: handle cases where:
// TODO: A) stream already remotely close.
// TODO: B) DATA before HEADERS.
if (getRecvWindow() < 0)
{
// It's a bad client, it does not deserve to be
// treated gently by just resetting the stream.
session.close(ErrorCodes.FLOW_CONTROL_ERROR, "stream_window_exceeded", callback);
return true;
}
else
{
notifyData(this, frame, callback);
return false;
}
}
private boolean onReset(ResetFrame frame, Callback callback)
{
reset = true;
callback.succeeded();
return false;
}
private boolean onPush(PushPromiseFrame frame, Callback callback)
{
callback.succeeded();
return false;
}
@Override @Override
public void updateClose(boolean update, boolean local) public void updateClose(boolean update, boolean local)
{ {