Accounting for the session flow control window in case of reset streams.

This commit is contained in:
Simone Bordet 2014-08-19 13:38:43 +02:00
parent 728a7c3442
commit 09d54eacab
3 changed files with 15 additions and 10 deletions

View File

@ -30,7 +30,7 @@ public interface FlowControl
public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame); public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame);
public void onDataReceived(IStream stream, int length); public void onDataReceived(ISession session, IStream stream, int length);
public void onDataConsumed(IStream stream, int length); public void onDataConsumed(IStream stream, int length);

View File

@ -78,17 +78,19 @@ public class HTTP2FlowControl implements FlowControl
} }
@Override @Override
public void onDataReceived(IStream stream, int length) public void onDataReceived(ISession session, IStream stream, int length)
{ {
ISession session = stream.getSession();
int oldSize = session.updateRecvWindow(-length); int oldSize = session.updateRecvWindow(-length);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Updated session recv window {} -> {} for {}", oldSize, oldSize - length, session); LOG.debug("Updated session recv window {} -> {} for {}", oldSize, oldSize - length, session);
if (stream != null)
{
oldSize = stream.updateRecvWindow(-length); oldSize = stream.updateRecvWindow(-length);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Updated stream recv window {} -> {} for {}", oldSize, oldSize - length, stream); LOG.debug("Updated stream recv window {} -> {} for {}", oldSize, oldSize - length, stream);
} }
}
@Override @Override
public void onDataConsumed(IStream stream, int length) public void onDataConsumed(IStream stream, int length)

View File

@ -129,16 +129,19 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame); LOG.debug("Received {}", frame);
int streamId = frame.getStreamId(); int streamId = frame.getStreamId();
final IStream stream = getStream(streamId); final IStream stream = getStream(streamId);
// SPEC: the session window must be updated even if the stream is null.
// The flow control length includes the padding bytes.
final int flowControlLength = frame.remaining() + frame.padding();
flowControl.onDataReceived(this, stream, flowControlLength);
if (stream != null) if (stream != null)
{ {
stream.updateClose(frame.isEndStream(), false); stream.updateClose(frame.isEndStream(), false);
// The flow control length includes the padding bytes.
final int flowControlLength = frame.remaining() + frame.padding();
flowControl.onDataReceived(stream, flowControlLength);
if (getRecvWindow() < 0) if (getRecvWindow() < 0)
{ {
close(ErrorCodes.FLOW_CONTROL_ERROR, "session_window_exceeded", disconnectOnFailure); close(ErrorCodes.FLOW_CONTROL_ERROR, "session_window_exceeded", disconnectOnFailure);