461052 - Local streams created after INITIAL_WINDOW_SIZE setting have wrong send window.

Fixed by tracking both send and recv initial stream windows.
This is needed because both client and server may send an
INITIAL_WINDOW_SIZE setting, and they must be treated
separately.
This commit is contained in:
Simone Bordet 2015-02-27 16:00:36 +01:00
parent b533aa6ce5
commit 73821e7ac6
12 changed files with 292 additions and 139 deletions

View File

@ -50,7 +50,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
public static final String SESSION_LISTENER_CONTEXT_KEY = "http2.client.sessionListener"; public static final String SESSION_LISTENER_CONTEXT_KEY = "http2.client.sessionListener";
public static final String SESSION_PROMISE_CONTEXT_KEY = "http2.client.sessionPromise"; public static final String SESSION_PROMISE_CONTEXT_KEY = "http2.client.sessionPromise";
private int initialSessionWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE; private int initialSessionRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
@Override @Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
@ -75,14 +75,14 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
return new SimpleFlowControlStrategy(); return new SimpleFlowControlStrategy();
} }
public int getInitialSessionWindow() public int getInitialSessionRecvWindow()
{ {
return initialSessionWindow; return initialSessionRecvWindow;
} }
public void setInitialSessionWindow(int initialSessionWindow) public void setInitialSessionRecvWindow(int initialSessionRecvWindow)
{ {
this.initialSessionWindow = initialSessionWindow; this.initialSessionRecvWindow = initialSessionRecvWindow;
} }
private class HTTP2ClientConnection extends HTTP2Connection implements Callback private class HTTP2ClientConnection extends HTTP2Connection implements Callback
@ -109,11 +109,17 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
PrefaceFrame prefaceFrame = new PrefaceFrame(); PrefaceFrame prefaceFrame = new PrefaceFrame();
SettingsFrame settingsFrame = new SettingsFrame(settings, false); SettingsFrame settingsFrame = new SettingsFrame(settings, false);
int windowDelta = getInitialSessionWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE; ISession session = getSession();
int windowDelta = getInitialSessionRecvWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE;
if (windowDelta > 0) if (windowDelta > 0)
getSession().control(null, this, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)); {
session.updateRecvWindow(windowDelta);
session.control(null, this, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta));
}
else else
getSession().control(null, this, prefaceFrame, settingsFrame); {
session.control(null, this, prefaceFrame, settingsFrame);
}
} }
@Override @Override

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.HTTP2Stream;
import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.SimpleFlowControlStrategy; import org.eclipse.jetty.http2.SimpleFlowControlStrategy;
import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Session;
@ -55,6 +56,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.toolchain.test.TestTracker; import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
@ -132,6 +134,98 @@ public abstract class FlowControlStrategyTest
server.stop(); server.stop();
} }
@Test
public void testWindowSizeUpdates() throws Exception
{
final CountDownLatch prefaceLatch = new CountDownLatch(1);
final CountDownLatch stream1Latch = new CountDownLatch(1);
final CountDownLatch stream2Latch = new CountDownLatch(1);
final CountDownLatch settingsLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
{
HTTP2Session serverSession = (HTTP2Session)session;
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, serverSession.getSendWindow());
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, serverSession.getRecvWindow());
prefaceLatch.countDown();
return null;
}
@Override
public void onSettings(Session session, SettingsFrame frame)
{
for (Stream stream : session.getStreams())
{
HTTP2Stream serverStream = (HTTP2Stream)stream;
Assert.assertEquals(0, serverStream.getSendWindow());
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, serverStream.getRecvWindow());
}
settingsLatch.countDown();
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
HTTP2Stream serverStream = (HTTP2Stream)stream;
MetaData.Request request = (MetaData.Request)frame.getMetaData();
if ("GET".equalsIgnoreCase(request.getMethod()))
{
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, serverStream.getSendWindow());
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, serverStream.getRecvWindow());
stream1Latch.countDown();
}
else
{
Assert.assertEquals(0, serverStream.getSendWindow());
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, serverStream.getRecvWindow());
stream2Latch.countDown();
}
return null;
}
});
HTTP2Session clientSession = (HTTP2Session)newClient(new Session.Listener.Adapter());
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, clientSession.getSendWindow());
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, clientSession.getRecvWindow());
Assert.assertTrue(prefaceLatch.await(5, TimeUnit.SECONDS));
MetaData.Request request1 = newRequest("GET", new HttpFields());
FuturePromise<Stream> promise1 = new FuturePromise<>();
clientSession.newStream(new HeadersFrame(0, request1, null, true), promise1, new Stream.Listener.Adapter());
HTTP2Stream clientStream1 = (HTTP2Stream)promise1.get(5, TimeUnit.SECONDS);
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, clientStream1.getSendWindow());
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, clientStream1.getRecvWindow());
Assert.assertTrue(stream1Latch.await(5, TimeUnit.SECONDS));
// Send a SETTINGS frame that changes the window size.
// This tells the server that its stream send window must be updated,
// so on the client it's the receive window that must be updated.
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, 0);
SettingsFrame frame = new SettingsFrame(settings, false);
FutureCallback callback = new FutureCallback();
clientSession.settings(frame, callback);
callback.get(5, TimeUnit.SECONDS);
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, clientStream1.getSendWindow());
Assert.assertEquals(0, clientStream1.getRecvWindow());
settingsLatch.await(5, TimeUnit.SECONDS);
// Now create a new stream, it must pick up the new value.
MetaData.Request request2 = newRequest("POST", new HttpFields());
FuturePromise<Stream> promise2 = new FuturePromise<>();
clientSession.newStream(new HeadersFrame(0, request2, null, true), promise2, new Stream.Listener.Adapter());
HTTP2Stream clientStream2 = (HTTP2Stream)promise2.get(5, TimeUnit.SECONDS);
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, clientStream2.getSendWindow());
Assert.assertEquals(0, clientStream2.getRecvWindow());
Assert.assertTrue(stream2Latch.await(5, TimeUnit.SECONDS));
}
@Test @Test
public void testFlowControlWithConcurrentSettings() throws Exception public void testFlowControlWithConcurrentSettings() throws Exception
{ {
@ -401,6 +495,7 @@ public abstract class FlowControlStrategyTest
try try
{ {
exchanger.exchange(null, 1, TimeUnit.SECONDS); exchanger.exchange(null, 1, TimeUnit.SECONDS);
Assert.fail();
} }
catch (TimeoutException x) catch (TimeoutException x)
{ {
@ -565,10 +660,12 @@ public abstract class FlowControlStrategyTest
Random random = new Random(); Random random = new Random();
final byte[] chunk1 = new byte[1024]; final byte[] chunk1 = new byte[1024];
random.nextBytes(chunk1); random.nextBytes(chunk1);
final byte[] chunk2 = new byte[1024]; final byte[] chunk2 = new byte[2048];
random.nextBytes(chunk2); random.nextBytes(chunk2);
final AtomicReference<CountDownLatch> settingsLatch = new AtomicReference<>(new CountDownLatch(1)); // Two SETTINGS frames: the initial after the preface,
// and the explicit where we set the stream window size to zero.
final AtomicReference<CountDownLatch> settingsLatch = new AtomicReference<>(new CountDownLatch(2));
final CountDownLatch dataLatch = new CountDownLatch(1); final CountDownLatch dataLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter() start(new ServerSessionListener.Adapter()
{ {
@ -614,7 +711,7 @@ public abstract class FlowControlStrategyTest
// Now we have the 2 DATA frames queued in the server. // Now we have the 2 DATA frames queued in the server.
// Partially unstall the first DATA frame. // Unstall the stream window.
settingsLatch.set(new CountDownLatch(1)); settingsLatch.set(new CountDownLatch(1));
settings.clear(); settings.clear();
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, chunk1.length / 2); settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, chunk1.length / 2);
@ -622,6 +719,12 @@ public abstract class FlowControlStrategyTest
Assert.assertTrue(settingsLatch.get().await(5, TimeUnit.SECONDS)); Assert.assertTrue(settingsLatch.get().await(5, TimeUnit.SECONDS));
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
// Check that the data is sent correctly.
byte[] expected = new byte[content.length];
System.arraycopy(chunk1, 0, expected, 0, chunk1.length);
System.arraycopy(chunk2, 0, expected, chunk1.length, chunk2.length);
Assert.assertArrayEquals(expected, content);
} }
@Test @Test

View File

@ -27,36 +27,52 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy
{ {
protected static final Logger LOG = Log.getLogger(FlowControlStrategy.class); protected static final Logger LOG = Log.getLogger(FlowControlStrategy.class);
private int initialStreamWindow; private int initialStreamSendWindow;
private int initialStreamRecvWindow;
public AbstractFlowControlStrategy(int initialStreamWindow) public AbstractFlowControlStrategy(int initialStreamSendWindow)
{ {
this.initialStreamWindow = initialStreamWindow; this.initialStreamSendWindow = initialStreamSendWindow;
this.initialStreamRecvWindow = DEFAULT_WINDOW_SIZE;
} }
protected int getInitialStreamWindow() protected int getInitialStreamSendWindow()
{ {
return initialStreamWindow; return initialStreamSendWindow;
}
protected int getInitialStreamRecvWindow()
{
return initialStreamRecvWindow;
} }
@Override @Override
public void onNewStream(IStream stream) public void onNewStream(IStream stream, boolean local)
{ {
stream.updateSendWindow(initialStreamWindow); stream.updateSendWindow(initialStreamSendWindow);
stream.updateRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); stream.updateRecvWindow(initialStreamRecvWindow);
} }
@Override @Override
public void onStreamTerminated(IStream stream) public void onStreamTerminated(IStream stream, boolean local)
{ {
} }
@Override @Override
public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local) public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local)
{ {
int initialWindow = this.initialStreamWindow; int previousInitialStreamWindow;
this.initialStreamWindow = initialStreamWindow; if (local)
int delta = initialStreamWindow - initialWindow; {
previousInitialStreamWindow = getInitialStreamRecvWindow();
this.initialStreamRecvWindow = initialStreamWindow;
}
else
{
previousInitialStreamWindow = getInitialStreamSendWindow();
this.initialStreamSendWindow = initialStreamWindow;
}
int delta = initialStreamWindow - previousInitialStreamWindow;
// SPEC: updates of the initial window size only affect stream windows, not session's. // SPEC: updates of the initial window size only affect stream windows, not session's.
for (Stream stream : session.getStreams()) for (Stream stream : session.getStreams())
@ -65,7 +81,7 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy
{ {
((IStream)stream).updateRecvWindow(delta); ((IStream)stream).updateRecvWindow(delta);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Updated initial stream recv window {} -> {} for {}", initialWindow, initialStreamWindow, stream); LOG.debug("Updated initial stream recv window {} -> {} for {}", previousInitialStreamWindow, initialStreamWindow, stream);
} }
else else
{ {
@ -111,6 +127,11 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy
} }
} }
@Override
public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
{
}
@Override @Override
public void onDataSending(IStream stream, int length) public void onDataSending(IStream stream, int length)
{ {

View File

@ -51,7 +51,7 @@ import org.eclipse.jetty.util.Callback;
*/ */
public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
{ {
private final AtomicInteger initialSessionWindow = new AtomicInteger(DEFAULT_WINDOW_SIZE); private final AtomicInteger maxSessionRecvWindow = new AtomicInteger(DEFAULT_WINDOW_SIZE);
private final AtomicInteger sessionLevel = new AtomicInteger(); private final AtomicInteger sessionLevel = new AtomicInteger();
private final Map<IStream, AtomicInteger> streamLevels = new ConcurrentHashMap<>(); private final Map<IStream, AtomicInteger> streamLevels = new ConcurrentHashMap<>();
private final float bufferRatio; private final float bufferRatio;
@ -61,99 +61,122 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
this(DEFAULT_WINDOW_SIZE, bufferRatio); this(DEFAULT_WINDOW_SIZE, bufferRatio);
} }
public BufferingFlowControlStrategy(int initialStreamWindow, float bufferRatio) public BufferingFlowControlStrategy(int initialStreamSendWindow, float bufferRatio)
{ {
super(initialStreamWindow); super(initialStreamSendWindow);
this.bufferRatio = bufferRatio; this.bufferRatio = bufferRatio;
} }
@Override @Override
public void onNewStream(IStream stream) public void onNewStream(IStream stream, boolean local)
{ {
super.onNewStream(stream); super.onNewStream(stream, local);
streamLevels.put(stream, new AtomicInteger()); streamLevels.put(stream, new AtomicInteger());
} }
@Override @Override
public void onStreamTerminated(IStream stream) public void onStreamTerminated(IStream stream, boolean local)
{ {
streamLevels.remove(stream); streamLevels.remove(stream);
super.onStreamTerminated(stream); super.onStreamTerminated(stream, local);
}
@Override
public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
{
// Window updates cannot be negative.
// The SettingsFrame.INITIAL_WINDOW_SIZE setting only influences
// the stream initial window size.
// Therefore the session window can only be enlarged, and here we
// keep track of its max value.
super.onWindowUpdate(session, stream, frame);
if (frame.getStreamId() == 0)
{
int sessionWindow = session.updateSendWindow(0);
Atomics.updateMax(initialSessionWindow, sessionWindow);
}
} }
@Override @Override
public void onDataConsumed(ISession session, IStream stream, int length) public void onDataConsumed(ISession session, IStream stream, int length)
{ {
if (length > 0) if (length <= 0)
return;
WindowUpdateFrame windowFrame = null;
int level = sessionLevel.addAndGet(length);
int maxLevel = (int)(maxSessionRecvWindow.get() * bufferRatio);
if (level > maxLevel)
{ {
WindowUpdateFrame windowFrame = null; level = sessionLevel.getAndSet(0);
int level = sessionLevel.addAndGet(length); session.updateRecvWindow(level);
int maxLevel = (int)(initialSessionWindow.get() * bufferRatio); if (LOG.isDebugEnabled())
if (level > maxLevel) LOG.debug("Data consumed, updated session recv window by {} for {}", level, session);
windowFrame = new WindowUpdateFrame(0, level);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, session recv window level {}/{} for {}", level, maxLevel, session);
}
Frame[] windowFrames = Frame.EMPTY_ARRAY;
if (stream != null)
{
if (stream.isClosed())
{ {
level = sessionLevel.getAndSet(0);
session.updateRecvWindow(level);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Data consumed, updated session recv window by {} for {}", level, session); LOG.debug("Data consumed, ignoring update stream recv window by {} for closed {}", length, stream);
windowFrame = new WindowUpdateFrame(0, level);
} }
else else
{ {
if (LOG.isDebugEnabled()) AtomicInteger streamLevel = streamLevels.get(stream);
LOG.debug("Data consumed, session recv window level {}/{} for {}", level, maxLevel, session); level = streamLevel.addAndGet(length);
} maxLevel = (int)(getInitialStreamRecvWindow() * bufferRatio);
if (level > maxLevel)
Frame[] windowFrames = Frame.EMPTY_ARRAY;
if (stream != null)
{
if (stream.isClosed())
{ {
level = streamLevel.getAndSet(0);
stream.updateRecvWindow(level);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Data consumed, ignoring update stream recv window by {} for closed {}", length, stream); LOG.debug("Data consumed, updated stream recv window by {} for {}", level, stream);
WindowUpdateFrame frame = new WindowUpdateFrame(stream.getId(), level);
if (windowFrame == null)
windowFrame = frame;
else
windowFrames = new Frame[]{frame};
} }
else else
{ {
AtomicInteger streamLevel = streamLevels.get(stream); if (LOG.isDebugEnabled())
level = streamLevel.addAndGet(length); LOG.debug("Data consumed, stream recv window level {}/{} for {}", level, maxLevel, session);
maxLevel = (int)(getInitialStreamWindow() * bufferRatio);
if (level > maxLevel)
{
level = streamLevel.getAndSet(0);
stream.updateRecvWindow(length);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, updated stream recv window by {} for {}", length, stream);
WindowUpdateFrame frame = new WindowUpdateFrame(stream.getId(), level);
if (windowFrame == null)
windowFrame = frame;
else
windowFrames = new Frame[]{frame};
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, stream recv window level {}/{} for {}", level, maxLevel, session);
}
} }
} }
}
if (windowFrame != null) if (windowFrame != null)
session.control(stream, Callback.Adapter.INSTANCE, windowFrame, windowFrames); session.control(stream, Callback.Adapter.INSTANCE, windowFrame, windowFrames);
}
@Override
public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
{
super.windowUpdate(session, stream, frame);
// Window updates cannot be negative.
// The SettingsFrame.INITIAL_WINDOW_SIZE setting
// only influences the *stream* window size.
// Therefore the session window can only be enlarged,
// and here we keep track of its max value.
// Updating the max session recv window is done here
// so that if a peer decides to send an unilateral
// window update to enlarge the session window,
// without the corresponding data consumption, here
// we can track it.
// Note that it is not perfect, since there is a time
// window between the session recv window being updated
// before the window update frame is sent, and the
// invocation of this method: in between data may arrive
// and reduce the session recv window size.
// But eventually the max value will be seen.
// Note that we cannot avoid the time window described
// above by updating the session recv window from here
// because there is a race between the sender and the
// receiver: the sender may receive a window update and
// send more data, while this method has not yet been
// invoked; when the data is received the session recv
// window may become negative and the connection will
// be closed (per specification).
if (frame.getStreamId() == 0)
{
int sessionWindow = session.updateRecvWindow(0);
Atomics.updateMax(maxSessionRecvWindow, sessionWindow);
} }
} }
} }

View File

@ -24,9 +24,9 @@ public interface FlowControlStrategy
{ {
public static int DEFAULT_WINDOW_SIZE = 65535; public static int DEFAULT_WINDOW_SIZE = 65535;
public void onNewStream(IStream stream); public void onNewStream(IStream stream, boolean local);
public void onStreamTerminated(IStream stream); public void onStreamTerminated(IStream stream, boolean local);
public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local); public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local);
@ -36,6 +36,8 @@ public interface FlowControlStrategy
public void onDataConsumed(ISession session, IStream stream, int length); public void onDataConsumed(ISession session, IStream stream, int length);
public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame);
public void onDataSending(IStream stream, int length); public void onDataSending(IStream stream, int length);
public void onDataSent(IStream stream, int length); public void onDataSent(IStream stream, int length);

View File

@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -42,7 +41,7 @@ public class HTTP2Flusher extends IteratingCallback
{ {
private static final Logger LOG = Log.getLogger(HTTP2Flusher.class); private static final Logger LOG = Log.getLogger(HTTP2Flusher.class);
private final Deque<WindowEntry> windows = new ArrayDeque<>(); private final Queue<WindowEntry> windows = new ArrayDeque<>();
private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this); private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this);
private final Map<IStream, Integer> streams = new HashMap<>(); private final Map<IStream, Integer> streams = new HashMap<>();
private final List<Entry> resets = new ArrayList<>(); private final List<Entry> resets = new ArrayList<>();
@ -59,15 +58,15 @@ public class HTTP2Flusher extends IteratingCallback
public void window(IStream stream, WindowUpdateFrame frame) public void window(IStream stream, WindowUpdateFrame frame)
{ {
boolean added = false;
synchronized (this) synchronized (this)
{ {
if (!isClosed()) if (!isClosed())
{ added = windows.offer(new WindowEntry(stream, frame));
windows.offer(new WindowEntry(stream, frame));
// Flush stalled data.
iterate();
}
} }
// Flush stalled data.
if (added)
iterate();
} }
public boolean prepend(Entry entry) public boolean prepend(Entry entry)
@ -189,7 +188,7 @@ public class HTTP2Flusher extends IteratingCallback
Integer streamWindow = streams.get(stream); Integer streamWindow = streams.get(stream);
if (streamWindow == null) if (streamWindow == null)
{ {
streamWindow = stream.getSendWindow(); streamWindow = stream.updateSendWindow(0);
streams.put(stream, streamWindow); streams.put(stream, streamWindow);
} }

View File

@ -589,7 +589,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (streams.putIfAbsent(streamId, stream) == null) if (streams.putIfAbsent(streamId, stream) == null)
{ {
stream.setIdleTimeout(getStreamIdleTimeout()); stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onNewStream(stream); flowControl.onNewStream(stream, true);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Created local {}", stream); LOG.debug("Created local {}", stream);
return stream; return stream;
@ -624,7 +624,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{ {
updateLastStreamId(streamId); updateLastStreamId(streamId);
stream.setIdleTimeout(getStreamIdleTimeout()); stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onNewStream(stream); flowControl.onNewStream(stream, false);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Created remote {}", stream); LOG.debug("Created remote {}", stream);
return stream; return stream;
@ -654,7 +654,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
else else
remoteStreamCount.decrementAndGet(); remoteStreamCount.decrementAndGet();
flowControl.onStreamTerminated(stream); flowControl.onStreamTerminated(stream, local);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Removed {}", stream); LOG.debug("Removed {}", stream);
@ -1020,6 +1020,11 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
getEndPoint().shutdownOutput(); getEndPoint().shutdownOutput();
break; break;
} }
case WINDOW_UPDATE:
{
flowControl.windowUpdate(HTTP2Session.this, stream, (WindowUpdateFrame)frame);
break;
}
case DISCONNECT: case DISCONNECT:
{ {
terminate(); terminate();
@ -1064,7 +1069,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (sessionSendWindow < 0) if (sessionSendWindow < 0)
throw new IllegalStateException(); throw new IllegalStateException();
int streamSendWindow = stream.getSendWindow(); int streamSendWindow = stream.updateSendWindow(0);
if (streamSendWindow < 0) if (streamSendWindow < 0)
throw new IllegalStateException(); throw new IllegalStateException();

View File

@ -323,13 +323,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream
} }
} }
@Override
public int getSendWindow() public int getSendWindow()
{ {
return sendWindow.get(); return sendWindow.get();
} }
protected int getRecvWindow() public int getRecvWindow()
{ {
return recvWindow.get(); return recvWindow.get();
} }

View File

@ -79,11 +79,6 @@ public interface IStream extends Stream, Closeable
@Override @Override
public void close(); public void close();
/**
* @return the current value of the stream send window
*/
public int getSendWindow();
/** /**
* <p>Updates the stream send window by the given {@code delta}.</p> * <p>Updates the stream send window by the given {@code delta}.</p>
* *

View File

@ -29,36 +29,36 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy
this(DEFAULT_WINDOW_SIZE); this(DEFAULT_WINDOW_SIZE);
} }
public SimpleFlowControlStrategy(int initialStreamWindow) public SimpleFlowControlStrategy(int initialStreamSendWindow)
{ {
super(initialStreamWindow); super(initialStreamSendWindow);
} }
@Override @Override
public void onDataConsumed(ISession session, IStream stream, int length) public void onDataConsumed(ISession session, IStream stream, int length)
{ {
if (length <= 0)
return;
// This is the simple algorithm for flow control. // This is the simple algorithm for flow control.
// This method is called when a whole flow controlled frame has been consumed. // This method is called when a whole flow controlled frame has been consumed.
// We send a WindowUpdate every time, even if the frame was very small. // We send a WindowUpdate every time, even if the frame was very small.
if (length > 0) WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length);
session.updateRecvWindow(length);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased session recv window by {} for {}", length, session);
Frame[] streamFrame = Frame.EMPTY_ARRAY;
if (stream != null)
{ {
WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length); streamFrame = new Frame[1];
session.updateRecvWindow(length); streamFrame[0] = new WindowUpdateFrame(stream.getId(), length);
stream.updateRecvWindow(length);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased session recv window by {} for {}", length, session); LOG.debug("Data consumed, increased stream recv window by {} for {}", length, stream);
Frame[] streamFrame = null;
if (stream != null)
{
streamFrame = new Frame[1];
streamFrame[0] = new WindowUpdateFrame(stream.getId(), length);
stream.updateRecvWindow(length);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased stream recv window by {} for {}", length, stream);
}
session.control(stream, Callback.Adapter.INSTANCE, sessionFrame, streamFrame == null ? Frame.EMPTY_ARRAY : streamFrame);
} }
session.control(stream, Callback.Adapter.INSTANCE, sessionFrame, streamFrame);
} }
} }

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.http2.server; package org.eclipse.jetty.http2.server;
import java.util.Objects;
import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Connection; import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.SimpleFlowControlStrategy; import org.eclipse.jetty.http2.SimpleFlowControlStrategy;
@ -35,7 +37,7 @@ import org.eclipse.jetty.util.annotation.Name;
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
{ {
private int maxDynamicTableSize = 4096; private int maxDynamicTableSize = 4096;
private int initialStreamWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE; private int initialStreamSendWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
private int maxConcurrentStreams = -1; private int maxConcurrentStreams = -1;
private final HttpConfiguration httpConfiguration; private final HttpConfiguration httpConfiguration;
@ -44,12 +46,10 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this(httpConfiguration,"h2-17","h2-16","h2-15","h2-14","h2"); this(httpConfiguration,"h2-17","h2-16","h2-15","h2-14","h2");
} }
protected AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration,String... protocols) protected AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration, String... protocols)
{ {
super(protocols); super(protocols);
if (httpConfiguration==null) this.httpConfiguration = Objects.requireNonNull(httpConfiguration);
throw new IllegalArgumentException("Null HttpConfiguration");
this.httpConfiguration = httpConfiguration;
} }
public int getMaxDynamicTableSize() public int getMaxDynamicTableSize()
@ -57,19 +57,19 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
return maxDynamicTableSize; return maxDynamicTableSize;
} }
public void setMaxHeaderTableSize(int maxHeaderTableSize) public void setMaxDynamicTableSize(int maxDynamicTableSize)
{ {
this.maxDynamicTableSize = maxHeaderTableSize; this.maxDynamicTableSize = maxDynamicTableSize;
} }
public int getInitialStreamWindow() public int getInitialStreamSendWindow()
{ {
return initialStreamWindow; return initialStreamSendWindow;
} }
public void setInitialStreamWindow(int initialStreamWindow) public void setInitialStreamSendWindow(int initialStreamSendWindow)
{ {
this.initialStreamWindow = initialStreamWindow; this.initialStreamSendWindow = initialStreamSendWindow;
} }
public int getMaxConcurrentStreams() public int getMaxConcurrentStreams()
@ -112,7 +112,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
protected FlowControlStrategy newFlowControlStrategy() protected FlowControlStrategy newFlowControlStrategy()
{ {
return new SimpleFlowControlStrategy(getInitialStreamWindow()); return new SimpleFlowControlStrategy(getInitialStreamSendWindow());
} }
protected abstract ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint); protected abstract ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint);

View File

@ -87,7 +87,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
{ {
Map<Integer, Integer> settings = new HashMap<>(); Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.HEADER_TABLE_SIZE, getMaxDynamicTableSize()); settings.put(SettingsFrame.HEADER_TABLE_SIZE, getMaxDynamicTableSize());
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, getInitialStreamWindow()); settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, getInitialStreamSendWindow());
int maxConcurrentStreams = getMaxConcurrentStreams(); int maxConcurrentStreams = getMaxConcurrentStreams();
if (maxConcurrentStreams >= 0) if (maxConcurrentStreams >= 0)
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxConcurrentStreams); settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxConcurrentStreams);