423974 - Optimize flow control.

Implemented a buffering FlowControlStrategy that reduces the amount
of WindowUpdate exchanges.
This commit is contained in:
Simone Bordet 2015-02-06 15:18:58 +01:00
parent fa72356d1d
commit 7066f65e8c
16 changed files with 600 additions and 135 deletions

View File

@ -23,10 +23,10 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.SimpleFlowControlStrategy;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
@ -50,7 +50,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
public static final String SESSION_LISTENER_CONTEXT_KEY = "http2.client.sessionListener";
public static final String SESSION_PROMISE_CONTEXT_KEY = "http2.client.sessionPromise";
private int initialSessionWindow = FlowControl.DEFAULT_WINDOW_SIZE;
private int initialSessionWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
@ -64,11 +64,17 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
Promise<Session> promise = (Promise<Session>)context.get(SESSION_PROMISE_CONTEXT_KEY);
Generator generator = new Generator(byteBufferPool, 4096);
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, new HTTP2FlowControl(FlowControl.DEFAULT_WINDOW_SIZE));
FlowControlStrategy flowControl = newFlowControlStrategy();
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
return new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, 8192, promise, listener);
}
protected FlowControlStrategy newFlowControlStrategy()
{
return new SimpleFlowControlStrategy();
}
public int getInitialSessionWindow()
{
return initialSessionWindow;
@ -103,7 +109,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
PrefaceFrame prefaceFrame = new PrefaceFrame();
SettingsFrame settingsFrame = new SettingsFrame(settings, false);
int windowDelta = getInitialSessionWindow() - FlowControl.DEFAULT_WINDOW_SIZE;
int windowDelta = getInitialSessionWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE;
if (windowDelta > 0)
getSession().control(null, this, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta));
else

View File

@ -18,7 +18,7 @@
package org.eclipse.jetty.http2.client;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
@ -35,7 +35,7 @@ public class HTTP2ClientSession extends HTTP2Session
{
private static final Logger LOG = Log.getLogger(HTTP2ClientSession.class);
public HTTP2ClientSession(Scheduler scheduler, EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl)
public HTTP2ClientSession(Scheduler scheduler, EndPoint endPoint, Generator generator, Listener listener, FlowControlStrategy flowControl)
{
super(scheduler, endPoint, generator, listener, flowControl, 1);
}

View File

@ -0,0 +1,116 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.junit.Assert;
import org.junit.Test;
public class BufferingFlowControlStrategyTest extends FlowControlStrategyTest
{
@Override
protected FlowControlStrategy newFlowControlStrategy()
{
return new BufferingFlowControlStrategy(0.5F);
}
@Test
public void testFlowControlWhenServerResetsStream() throws Exception
{
// On server, we don't consume the data and we immediately reset.
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame frame = new HeadersFrame(0, metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
final CountDownLatch resetLatch = new CountDownLatch(1);
session.newStream(frame, streamPromise, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
// Perform a big upload that will stall the flow control windows.
ByteBuffer data = ByteBuffer.allocate(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, true), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
dataLatch.countDown();
}
});
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(555, TimeUnit.SECONDS));
// Wait a little more for the window updates to be processed.
Thread.sleep(1000);
// We sent 65535 data bytes to the server (the max allowed by the send window).
// The ratio being 50%, means that the regulator has a limit at 32767 bytes.
// This will be TCP written as:
// (9 + HEADERS) + 3 * (9 + DATA (16384)) + (9 + DATA (16383))
// since 16384 is the max frame length.
// One window update will be sent back of 32768 bytes.
// It does not matter how the TCP reads read the frames, since eventually
// the server will read 2 * 16384 data bytes (perhaps in multiple "fake"
// data frames).
// When 32768 bytes will be read and consumed, a window update will be
// sent to the client; the remaining 32767 will be read and consumed,
// but the server won't send the window update (it's exactly equal to
// the regulator limit), and the client won't send more bytes because
// the send has been reset.
// So the client send window must be 32768.
HTTP2Session http2Session = (HTTP2Session)session;
Assert.assertEquals(Frame.DEFAULT_MAX_LENGTH * 2, http2Session.getSendWindow());
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.http2.client;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
@ -30,11 +31,13 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session;
@ -43,23 +46,75 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class FlowControlTest extends AbstractTest
public abstract class FlowControlStrategyTest
{
@Override
protected ServerConnector connector;
protected HTTP2Client client;
protected Server server;
protected abstract FlowControlStrategy newFlowControlStrategy();
protected void start(ServerSessionListener listener) throws Exception
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server");
server = new Server(serverExecutor);
connector = new ServerConnector(server, new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener)
{
@Override
protected FlowControlStrategy newFlowControlStrategy()
{
return FlowControlStrategyTest.this.newFlowControlStrategy();
}
});
server.addConnector(connector);
server.start();
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName("client");
client = new HTTP2Client(clientExecutor);
client.start();
}
protected Session newClient(Session.Listener listener) throws Exception
{
String host = "localhost";
int port = connector.getLocalPort();
InetSocketAddress address = new InetSocketAddress(host, port);
FuturePromise<Session> promise = new FuturePromise<>();
client.connect(address, listener, promise);
return promise.get(5, TimeUnit.SECONDS);
}
protected MetaData.Request newRequest(String method, HttpFields fields)
{
String host = "localhost";
int port = connector.getLocalPort();
String authority = host + ":" + port;
return new MetaData.Request(method, HttpScheme.HTTP, new HostPortHttpField(authority), "/", HttpVersion.HTTP_2, fields);
}
@After
public void dispose() throws Exception
{
// Allow WINDOW_UPDATE frames to be sent/received to avoid exception stack traces.
Thread.sleep(1000);
super.dispose();
client.stop();
server.stop();
}
@Test
@ -74,7 +129,7 @@ public class FlowControlTest extends AbstractTest
// after the flow control window has been reduced.
final CountDownLatch dataLatch = new CountDownLatch(3);
final AtomicReference<Callback> callbackRef = new AtomicReference<>();
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
@ -147,7 +202,7 @@ public class FlowControlTest extends AbstractTest
final int windowSize = 1536;
final int length = 5 * windowSize;
final CountDownLatch settingsLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
@ -237,7 +292,7 @@ public class FlowControlTest extends AbstractTest
final Exchanger<Callback> exchanger = new Exchanger<>();
final CountDownLatch settingsLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
@ -342,7 +397,7 @@ public class FlowControlTest extends AbstractTest
public void testSessionStalledStallsNewStreams() throws Exception
{
final int windowSize = 1024;
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
@ -351,7 +406,7 @@ public class FlowControlTest extends AbstractTest
if ("POST".equalsIgnoreCase(request.getMethod()))
{
// Send data to consume the session window.
ByteBuffer data = ByteBuffer.allocate(FlowControl.DEFAULT_WINDOW_SIZE - windowSize);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE - windowSize);
DataFrame dataFrame = new DataFrame(stream.getId(), data, true);
stream.data(dataFrame, Callback.Adapter.INSTANCE);
return null;
@ -446,7 +501,7 @@ public class FlowControlTest extends AbstractTest
final byte[] data = new byte[1024 * 1024];
new Random().nextBytes(data);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
@ -500,7 +555,7 @@ public class FlowControlTest extends AbstractTest
final AtomicReference<CountDownLatch> settingsLatch = new AtomicReference<>(new CountDownLatch(1));
final CountDownLatch dataLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
@ -557,7 +612,7 @@ public class FlowControlTest extends AbstractTest
@Test
public void testClientSendingInitialSmallWindow() throws Exception
{
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -629,7 +684,7 @@ public class FlowControlTest extends AbstractTest
public void testClientExceedingSessionWindow() throws Exception
{
// On server, we don't consume the data.
startServer(new ServerSessionListener.Adapter());
start(new ServerSessionListener.Adapter());
final CountDownLatch closeLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
@ -648,7 +703,7 @@ public class FlowControlTest extends AbstractTest
FuturePromise<Stream> streamPromise = new FuturePromise<>();
session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter());
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControl.DEFAULT_WINDOW_SIZE);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, false), new Callback.Adapter()
{
@ -678,13 +733,13 @@ public class FlowControlTest extends AbstractTest
public void testClientExceedingStreamWindow() throws Exception
{
// On server, we don't consume the data.
startServer(new ServerSessionListener.Adapter()
start(new ServerSessionListener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
{
// Enlarge the session window.
((ISession)session).updateRecvWindow(FlowControl.DEFAULT_WINDOW_SIZE);
((ISession)session).updateRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
return super.onPreface(session);
}
});
@ -706,7 +761,7 @@ public class FlowControlTest extends AbstractTest
FuturePromise<Stream> streamPromise = new FuturePromise<>();
session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter());
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControl.DEFAULT_WINDOW_SIZE);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, false), new Callback.Adapter()
{
@ -718,6 +773,10 @@ public class FlowControlTest extends AbstractTest
});
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
// Wait for a while to allow flow control window frames
// to be exchanged before doing the "sneaky" write below.
Thread.sleep(1000);
// Now the client is supposed to not send more frames.
// If it does, the connection must be closed.
HTTP2Session http2Session = (HTTP2Session)session;
@ -730,55 +789,4 @@ public class FlowControlTest extends AbstractTest
// Expect the connection to be closed.
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testFlowControlWhenServerResetsStream() throws Exception
{
// On server, we don't consume the data and we immediately reset.
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame frame = new HeadersFrame(0, metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
final CountDownLatch resetLatch = new CountDownLatch(1);
session.newStream(frame, streamPromise, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
// Perform a big upload that will stall the flow control windows.
ByteBuffer data = ByteBuffer.allocate(5 * FlowControl.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, true), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
dataLatch.countDown();
}
});
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(555, TimeUnit.SECONDS));
// Wait a little more for the window updates to be processed.
Thread.sleep(1000);
// At this point the session window should be fully available.
HTTP2Session http2Session = (HTTP2Session)session;
Assert.assertEquals(FlowControl.DEFAULT_WINDOW_SIZE, http2Session.getSendWindow());
}
}

View File

@ -0,0 +1,100 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.SimpleFlowControlStrategy;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.junit.Assert;
import org.junit.Test;
public class SimpleFlowControlStrategyTest extends FlowControlStrategyTest
{
@Override
protected FlowControlStrategy newFlowControlStrategy()
{
return new SimpleFlowControlStrategy();
}
@Test
public void testFlowControlWhenServerResetsStream() throws Exception
{
// On server, we don't consume the data and we immediately reset.
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame frame = new HeadersFrame(0, metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
final CountDownLatch resetLatch = new CountDownLatch(1);
session.newStream(frame, streamPromise, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
// Perform a big upload that will stall the flow control windows.
ByteBuffer data = ByteBuffer.allocate(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, true), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
dataLatch.countDown();
}
});
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(555, TimeUnit.SECONDS));
// Wait a little more for the window updates to be processed.
Thread.sleep(1000);
// At this point the session window should be fully available.
HTTP2Session http2Session = (HTTP2Session)session;
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, http2Session.getSendWindow());
}
}

View File

@ -1,3 +1,3 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.http2.hpack.LEVEL=INFO
org.eclipse.jetty.http2.LEVEL=INFO
#org.eclipse.jetty.http2.LEVEL=DEBUG

View File

@ -19,32 +19,40 @@
package org.eclipse.jetty.http2;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HTTP2FlowControl implements FlowControl
public abstract class AbstractFlowControlStrategy implements FlowControlStrategy
{
private static final Logger LOG = Log.getLogger(HTTP2FlowControl.class);
protected static final Logger LOG = Log.getLogger(FlowControlStrategy.class);
private int initialStreamWindow;
public HTTP2FlowControl(int initialStreamWindow)
public AbstractFlowControlStrategy(int initialStreamWindow)
{
this.initialStreamWindow = initialStreamWindow;
}
protected int getInitialStreamWindow()
{
return initialStreamWindow;
}
@Override
public void onNewStream(IStream stream)
{
stream.updateSendWindow(initialStreamWindow);
stream.updateRecvWindow(FlowControl.DEFAULT_WINDOW_SIZE);
stream.updateRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
}
@Override
public void updateInitialStreamWindow(ISession session, int initialStreamWindow)
public void onStreamTerminated(IStream stream)
{
}
@Override
public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local)
{
int initialWindow = this.initialStreamWindow;
this.initialStreamWindow = initialStreamWindow;
@ -52,7 +60,18 @@ public class HTTP2FlowControl implements FlowControl
// SPEC: updates of the initial window size only affect stream windows, not session's.
for (Stream stream : session.getStreams())
session.onWindowUpdate((IStream)stream, new WindowUpdateFrame(stream.getId(), delta));
{
if (local)
{
((IStream)stream).updateRecvWindow(delta);
if (LOG.isDebugEnabled())
LOG.debug("Updated initial stream recv window {} -> {} for {}", initialWindow, initialStreamWindow, stream);
}
else
{
session.onWindowUpdate((IStream)stream, new WindowUpdateFrame(stream.getId(), delta));
}
}
}
@Override
@ -61,7 +80,7 @@ public class HTTP2FlowControl implements FlowControl
int delta = frame.getWindowDelta();
if (frame.getStreamId() > 0)
{
// The stream may have been reset concurrently.
// The stream may have been removed concurrently.
if (stream != null)
{
int oldSize = stream.updateSendWindow(delta);
@ -92,35 +111,6 @@ public class HTTP2FlowControl implements FlowControl
}
}
@Override
public void onDataConsumed(ISession session, IStream stream, int length)
{
// This is the algorithm for flow control.
// This method is called when a whole flow controlled frame has been consumed.
// We currently send a WindowUpdate every time, even if the frame was very small.
// Other policies may send the WindowUpdate only upon reaching a threshold.
if (length > 0)
{
WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length);
session.updateRecvWindow(length);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased session recv window by {} for {}", length, session);
Frame[] streamFrame = null;
if (stream != null)
{
streamFrame = new Frame[1];
streamFrame[0] = new WindowUpdateFrame(stream.getId(), length);
stream.updateRecvWindow(length);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased stream recv window by {} for {}", length, stream);
}
session.control(stream, Callback.Adapter.INSTANCE, sessionFrame, streamFrame == null ? Frame.EMPTY_ARRAY : streamFrame);
}
}
@Override
public void onDataSending(IStream stream, int length)
{

View File

@ -0,0 +1,159 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback;
/**
* <p>A flow control strategy that accumulates updates and emits window control
* frames when the accumulated value reaches a threshold.</p>
* <p>The sender flow control window is represented in the receiver as two
* buckets: a bigger bucket, initially full, that is drained when data is
* received, and a smaller bucket, initially empty, that is filled when data is
* consumed. Only the smaller bucket can refill the bigger bucket.</p>
* <p>The smaller bucket is defined as a fraction of the bigger bucket.</p>
* <p>For a more visual representation, see the
* <a href="http://en.wikipedia.org/wiki/Shishi-odoshi">rocking bamboo fountain</a>.</p>
* <p>The algorithm works in this way.</p>
* <p>The initial bigger bucket (BB) capacity is 100, and let's imagine the smaller
* bucket (SB) being 40% of the bigger bucket: 40.</p>
* <p>The receiver receives a data frame of 60, so now BB=40; the data frame is
* passed to the application that consumes 25, so now SB=25. Since SB is not full,
* no window control frames are emitted.</p>
* <p>The application consumes other 20, so now SB=45. Since SB is full, its 45
* are transferred to BB, which is now BB=85, and a window control frame is sent
* with delta=45.</p>
* <p>The application consumes the remaining 15, so now SB=15, and no window
* control frame is emitted.</p>
*/
public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
{
private final AtomicInteger initialSessionWindow = new AtomicInteger(DEFAULT_WINDOW_SIZE);
private final AtomicInteger sessionLevel = new AtomicInteger();
private final Map<IStream, AtomicInteger> streamLevels = new ConcurrentHashMap<>();
private final float bufferRatio;
public BufferingFlowControlStrategy(float bufferRatio)
{
this(DEFAULT_WINDOW_SIZE, bufferRatio);
}
public BufferingFlowControlStrategy(int initialStreamWindow, float bufferRatio)
{
super(initialStreamWindow);
this.bufferRatio = bufferRatio;
}
@Override
public void onNewStream(IStream stream)
{
super.onNewStream(stream);
streamLevels.put(stream, new AtomicInteger());
}
@Override
public void onStreamTerminated(IStream stream)
{
streamLevels.remove(stream);
super.onStreamTerminated(stream);
}
@Override
public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
{
// Window updates cannot be negative.
// The SettingsFrame.INITIAL_WINDOW_SIZE setting only influences
// the stream initial window size.
// Therefore the session window can only be enlarged, and here we
// keep track of its max value.
super.onWindowUpdate(session, stream, frame);
if (frame.getStreamId() == 0)
{
int sessionWindow = session.updateSendWindow(0);
Atomics.updateMax(initialSessionWindow, sessionWindow);
}
}
@Override
public void onDataConsumed(ISession session, IStream stream, int length)
{
if (length > 0)
{
WindowUpdateFrame windowFrame = null;
int level = sessionLevel.addAndGet(length);
int maxLevel = (int)(initialSessionWindow.get() * bufferRatio);
if (level > maxLevel)
{
level = sessionLevel.getAndSet(0);
session.updateRecvWindow(level);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, updated session recv window by {} for {}", level, session);
windowFrame = new WindowUpdateFrame(0, level);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, session recv window level {}/{} for {}", level, maxLevel, session);
}
Frame[] windowFrames = Frame.EMPTY_ARRAY;
if (stream != null)
{
if (stream.isClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, ignoring update stream recv window by {} for closed {}", length, stream);
}
else
{
AtomicInteger streamLevel = streamLevels.get(stream);
level = streamLevel.addAndGet(length);
maxLevel = (int)(getInitialStreamWindow() * bufferRatio);
if (level > maxLevel)
{
level = streamLevel.getAndSet(0);
stream.updateRecvWindow(length);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, updated stream recv window by {} for {}", length, stream);
WindowUpdateFrame frame = new WindowUpdateFrame(stream.getId(), level);
if (windowFrame == null)
windowFrame = frame;
else
windowFrames = new Frame[]{frame};
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, stream recv window level {}/{} for {}", level, maxLevel, session);
}
}
}
if (windowFrame != null)
session.control(stream, Callback.Adapter.INSTANCE, windowFrame, windowFrames);
}
}
}

View File

@ -20,13 +20,15 @@ package org.eclipse.jetty.http2;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
public interface FlowControl
public interface FlowControlStrategy
{
public static int DEFAULT_WINDOW_SIZE = 65535;
public void onNewStream(IStream stream);
public void updateInitialStreamWindow(ISession session, int initialStreamWindow);
public void onStreamTerminated(IStream stream);
public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local);
public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame);

View File

@ -174,9 +174,10 @@ public class HTTP2Flusher extends IteratingCallback
int remaining = entry.dataRemaining();
if (remaining > 0)
{
FlowControlStrategy flowControl = session.getFlowControlStrategy();
if (sessionWindow <= 0)
{
session.getFlowControl().onSessionStalled(session);
flowControl.onSessionStalled(session);
++index;
// There may be *non* flow controlled frames to send.
continue;
@ -195,7 +196,7 @@ public class HTTP2Flusher extends IteratingCallback
// Is it a frame belonging to an already stalled stream ?
if (streamWindow <= 0)
{
session.getFlowControl().onStreamStalled(stream);
flowControl.onStreamStalled(stream);
++index;
// There may be *non* flow controlled frames to send.
continue;
@ -399,7 +400,7 @@ public class HTTP2Flusher extends IteratingCallback
public void perform()
{
FlowControl flowControl = session.getFlowControl();
FlowControlStrategy flowControl = session.getFlowControlStrategy();
flowControl.onWindowUpdate(session, stream, frame);
}
}

View File

@ -71,14 +71,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final EndPoint endPoint;
private final Generator generator;
private final Listener listener;
private final FlowControl flowControl;
private final FlowControlStrategy flowControl;
private final HTTP2Flusher flusher;
private int maxLocalStreams;
private int maxRemoteStreams;
private long streamIdleTimeout;
private boolean pushEnabled;
public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialStreamId)
public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Listener listener, FlowControlStrategy flowControl, int initialStreamId)
{
this.scheduler = scheduler;
this.endPoint = endPoint;
@ -90,12 +90,12 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
this.maxRemoteStreams = -1;
this.streamIds.set(initialStreamId);
this.streamIdleTimeout = endPoint.getIdleTimeout();
this.sendWindow.set(FlowControl.DEFAULT_WINDOW_SIZE);
this.recvWindow.set(FlowControl.DEFAULT_WINDOW_SIZE);
this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
this.pushEnabled = true; // SPEC: by default, push is enabled.
}
public FlowControl getFlowControl()
public FlowControlStrategy getFlowControlStrategy()
{
return flowControl;
}
@ -263,7 +263,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
if (LOG.isDebugEnabled())
LOG.debug("Update initial window size to {}", value);
flowControl.updateInitialStreamWindow(this, value);
flowControl.updateInitialStreamWindow(this, value, false);
break;
}
case SettingsFrame.MAX_FRAME_SIZE:
@ -663,6 +663,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
else
remoteStreamCount.decrementAndGet();
flowControl.onStreamTerminated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Removed {}", stream);
}
@ -975,6 +977,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
removeStream(stream, true);
break;
}
case SETTINGS:
{
SettingsFrame settings = (SettingsFrame)frame;
Integer initialWindow = settings.getSettings().get(SettingsFrame.INITIAL_WINDOW_SIZE);
if (initialWindow != null)
flowControl.updateInitialStreamWindow(HTTP2Session.this, initialWindow, true);
break;
}
case GO_AWAY:
{
// We just sent a GO_AWAY, only shutdown the

View File

@ -387,7 +387,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream
@Override
public String toString()
{
return String.format("%s@%x{id=%d,sendWindow=%s,recvWindow=%s,reset=%b,%s}", getClass().getSimpleName(),
return String.format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b,%s}", getClass().getSimpleName(),
hashCode(), getId(), sendWindow, recvWindow, isReset(), closeState);
}
}

View File

@ -0,0 +1,64 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Callback;
public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy
{
public SimpleFlowControlStrategy()
{
this(DEFAULT_WINDOW_SIZE);
}
public SimpleFlowControlStrategy(int initialStreamWindow)
{
super(initialStreamWindow);
}
@Override
public void onDataConsumed(ISession session, IStream stream, int length)
{
// This is the simple algorithm for flow control.
// This method is called when a whole flow controlled frame has been consumed.
// We send a WindowUpdate every time, even if the frame was very small.
if (length > 0)
{
WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length);
session.updateRecvWindow(length);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased session recv window by {} for {}", length, session);
Frame[] streamFrame = null;
if (stream != null)
{
streamFrame = new Frame[1];
streamFrame[0] = new WindowUpdateFrame(stream.getId(), length);
stream.updateRecvWindow(length);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased stream recv window by {} for {}", length, stream);
}
session.control(stream, Callback.Adapter.INSTANCE, sessionFrame, streamFrame == null ? Frame.EMPTY_ARRAY : streamFrame);
}
}
}

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.http2.frames;
import org.eclipse.jetty.http2.ErrorCode;
public class ResetFrame extends Frame
{
private final int streamId;
@ -43,6 +45,8 @@ public class ResetFrame extends Frame
@Override
public String toString()
{
return String.format("%s#%d,error=%d", super.toString(), streamId, error);
ErrorCode errorCode = ErrorCode.from(error);
String reason = errorCode == null ? "error=" + error : errorCode.name().toLowerCase();
return String.format("%s#%d{%s}", super.toString(), streamId, reason);
}
}

View File

@ -18,9 +18,9 @@
package org.eclipse.jetty.http2.server;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.SimpleFlowControlStrategy;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
@ -36,7 +36,7 @@ import org.eclipse.jetty.util.annotation.Name;
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
{
private int maxDynamicTableSize = 4096;
private int initialStreamWindow = FlowControl.DEFAULT_WINDOW_SIZE;
private int initialStreamWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
private int maxConcurrentStreams = -1;
private final HttpConfiguration httpConfiguration;
@ -98,8 +98,8 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
ServerSessionListener listener = newSessionListener(connector, endPoint);
Generator generator = new Generator(connector.getByteBufferPool(), getMaxDynamicTableSize());
HTTP2ServerSession session = new HTTP2ServerSession(connector.getScheduler(), endPoint, generator, listener,
new HTTP2FlowControl(getInitialStreamWindow()));
FlowControlStrategy flowControl = newFlowControlStrategy();
HTTP2ServerSession session = new HTTP2ServerSession(connector.getScheduler(), endPoint, generator, listener, flowControl);
session.setMaxLocalStreams(getMaxConcurrentStreams());
session.setMaxRemoteStreams(getMaxConcurrentStreams());
long idleTimeout = endPoint.getIdleTimeout();
@ -114,6 +114,11 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
return configure(connection, connector, endPoint);
}
protected FlowControlStrategy newFlowControlStrategy()
{
return new SimpleFlowControlStrategy(getInitialStreamWindow());
}
protected abstract ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint);
protected abstract ServerParser newServerParser(ByteBufferPool byteBufferPool, ServerParser.Listener listener);

View File

@ -23,7 +23,7 @@ import java.util.Map;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
@ -47,7 +47,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
private final ServerSessionListener listener;
public HTTP2ServerSession(Scheduler scheduler, EndPoint endPoint, Generator generator, ServerSessionListener listener, FlowControl flowControl)
public HTTP2ServerSession(Scheduler scheduler, EndPoint endPoint, Generator generator, ServerSessionListener listener, FlowControlStrategy flowControl)
{
super(scheduler, endPoint, generator, listener, flowControl, 2);
this.listener = listener;