Fixed handling of INITIAL_WINDOW_SIZE setting.

It must update only stream windows, and not the session window.
This commit is contained in:
Simone Bordet 2014-08-11 18:43:50 +02:00
parent 187c42fa4a
commit 107a4fff20
6 changed files with 56 additions and 40 deletions

View File

@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.HTTP2Connection; import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl; import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.ISession;
@ -59,7 +60,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
Promise<Session> promise = (Promise<Session>)context.get(SESSION_PROMISE_CONTEXT_KEY); Promise<Session> promise = (Promise<Session>)context.get(SESSION_PROMISE_CONTEXT_KEY);
Generator generator = new Generator(byteBufferPool, 4096); Generator generator = new Generator(byteBufferPool, 4096);
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, new HTTP2FlowControl(65535)); HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, new HTTP2FlowControl(FlowControl.DEFAULT_WINDOW_SIZE));
Parser parser = new Parser(byteBufferPool, session, 4096, 8192); Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
return new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, 8192, promise, listener); return new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, 8192, promise, listener);
} }

View File

@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.api.server.ServerSessionListener;
@ -334,17 +335,21 @@ public class FlowControlTest extends AbstractTest
public void testSessionStalledStallsNewStreams() throws Exception public void testSessionStalledStallsNewStreams() throws Exception
{ {
final int windowSize = 1024; final int windowSize = 1024;
final CountDownLatch settingsLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter() startServer(new ServerSessionListener.Adapter()
{ {
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
@Override @Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame) public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
{
MetaData.Request request = (MetaData.Request)requestFrame.getMetaData();
if ("POST".equalsIgnoreCase(request.getMethod()))
{
// Send data to consume the session window.
ByteBuffer data = ByteBuffer.allocate(FlowControl.DEFAULT_WINDOW_SIZE - windowSize);
DataFrame dataFrame = new DataFrame(stream.getId(), data, true);
stream.data(dataFrame, Callback.Adapter.INSTANCE);
return null;
}
else
{ {
// For every stream, send down half the window size of data. // For every stream, send down half the window size of data.
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
@ -354,33 +359,30 @@ public class FlowControlTest extends AbstractTest
stream.data(dataFrame, Callback.Adapter.INSTANCE); stream.data(dataFrame, Callback.Adapter.INSTANCE);
return null; return null;
} }
}
}); });
Session session = newClient(new Session.Listener.Adapter()); Session session = newClient(new Session.Listener.Adapter());
Map<Integer, Integer> settings = new HashMap<>(); // First request is just to consume the session window.
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, windowSize); final CountDownLatch prepareLatch = new CountDownLatch(1);
session.settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE); MetaData.Request request1 = newRequest("POST", new HttpFields());
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
final AtomicReference<Callback> callbackRef1 = new AtomicReference<>();
final AtomicReference<Callback> callbackRef2 = new AtomicReference<>();
// First request will consume half the session window.
MetaData.Request request1 = newRequest("GET", new HttpFields());
session.newStream(new HeadersFrame(0, request1, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter() session.newStream(new HeadersFrame(0, request1, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{ {
@Override @Override
public void onData(Stream stream, DataFrame frame, Callback callback) public void onData(Stream stream, DataFrame frame, Callback callback)
{ {
// Do not consume it to stall flow control. // Do not consume the data to reduce the session window.
callbackRef1.set(callback); if (frame.isEndStream())
prepareLatch.countDown();
} }
}); });
Assert.assertTrue(prepareLatch.await(5, TimeUnit.SECONDS));
// Second request will consume the session window, which is now stalled. final AtomicReference<Callback> callbackRef2 = new AtomicReference<>();
// A third request will not be able to receive data. final AtomicReference<Callback> callbackRef3 = new AtomicReference<>();
// Second request will consume half the session window.
MetaData.Request request2 = newRequest("GET", new HttpFields()); MetaData.Request request2 = newRequest("GET", new HttpFields());
session.newStream(new HeadersFrame(0, request2, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter() session.newStream(new HeadersFrame(0, request2, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{ {
@ -392,10 +394,23 @@ public class FlowControlTest extends AbstractTest
} }
}); });
// Third request is now stalled. // Third request will consume the session window, which is now stalled.
final CountDownLatch latch = new CountDownLatch(1); // A fourth request will not be able to receive data.
MetaData.Request request3 = newRequest("GET", new HttpFields()); MetaData.Request request3 = newRequest("GET", new HttpFields());
session.newStream(new HeadersFrame(0, request3, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter() session.newStream(new HeadersFrame(0, request3, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
// Do not consume it to stall flow control.
callbackRef3.set(callback);
}
});
// Fourth request is now stalled.
final CountDownLatch latch = new CountDownLatch(1);
MetaData.Request request4 = newRequest("GET", new HttpFields());
session.newStream(new HeadersFrame(0, request4, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{ {
@Override @Override
public void onData(Stream stream, DataFrame frame, Callback callback) public void onData(Stream stream, DataFrame frame, Callback callback)
@ -409,9 +424,9 @@ public class FlowControlTest extends AbstractTest
// Verify that the data does not arrive because the server session is stalled. // Verify that the data does not arrive because the server session is stalled.
Assert.assertFalse(latch.await(1, TimeUnit.SECONDS)); Assert.assertFalse(latch.await(1, TimeUnit.SECONDS));
// Consume the data of the second response. // Consume the data of the third response.
// This will open up the session window, allowing the third stream to send data. // This will open up the session window, allowing the third stream to send data.
Callback callback2 = callbackRef2.getAndSet(null); Callback callback2 = callbackRef3.getAndSet(null);
if (callback2 != null) if (callback2 != null)
callback2.succeeded(); callback2.succeeded();

View File

@ -22,6 +22,8 @@ import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
public interface FlowControl public interface FlowControl
{ {
public static int DEFAULT_WINDOW_SIZE = 65535;
public void onNewStream(IStream stream); public void onNewStream(IStream stream);
public int getInitialWindowSize(); public int getInitialWindowSize();

View File

@ -54,10 +54,7 @@ public class HTTP2FlowControl implements FlowControl
this.initialWindowSize = initialWindowSize; this.initialWindowSize = initialWindowSize;
int delta = initialWindowSize - windowSize; int delta = initialWindowSize - windowSize;
// Update the session's window size. // SPEC: updates of the initial window size only affect stream windows, not session's.
session.onUpdateWindowSize(null, new WindowUpdateFrame(0, delta));
// Update the streams' window size.
for (Stream stream : session.getStreams()) for (Stream stream : session.getStreams())
session.onUpdateWindowSize((IStream)stream, new WindowUpdateFrame(stream.getId(), delta)); session.onUpdateWindowSize((IStream)stream, new WindowUpdateFrame(stream.getId(), delta));
} }

View File

@ -213,7 +213,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (settings.containsKey(SettingsFrame.MAX_FRAME_SIZE)) if (settings.containsKey(SettingsFrame.MAX_FRAME_SIZE))
{ {
int maxFrameSize = settings.get(SettingsFrame.MAX_FRAME_SIZE); int maxFrameSize = settings.get(SettingsFrame.MAX_FRAME_SIZE);
// Spec: check the max frame size is sane. // SPEC: check the max frame size is sane.
if (maxFrameSize < Frame.DEFAULT_MAX_LENGTH || maxFrameSize > Frame.MAX_MAX_LENGTH) if (maxFrameSize < Frame.DEFAULT_MAX_LENGTH || maxFrameSize > Frame.MAX_MAX_LENGTH)
{ {
onConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_settings_max_frame_size"); onConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_settings_max_frame_size");

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.server;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.HTTP2Connection; import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl; import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.ISession;
@ -36,7 +37,7 @@ import org.eclipse.jetty.server.Connector;
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
{ {
private int maxHeaderTableSize = 4096; private int maxHeaderTableSize = 4096;
private int initialWindowSize = 65535; private int initialWindowSize = FlowControl.DEFAULT_WINDOW_SIZE;
private int maxConcurrentStreams = -1; private int maxConcurrentStreams = -1;
public AbstractHTTP2ServerConnectionFactory() public AbstractHTTP2ServerConnectionFactory()