Updated flow control implementation to detect when senders exceed

allowed windows.
This commit is contained in:
Simone Bordet 2014-08-12 16:37:41 +02:00
parent 4f4c3604a2
commit e147ce9528
22 changed files with 356 additions and 127 deletions

View File

@ -28,7 +28,9 @@ import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
@ -48,6 +50,8 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
public static final String SESSION_LISTENER_CONTEXT_KEY = "http2.client.sessionListener";
public static final String SESSION_PROMISE_CONTEXT_KEY = "http2.client.sessionPromise";
private int initialSessionWindow = FlowControl.DEFAULT_WINDOW_SIZE;
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
@ -65,6 +69,16 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
return new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, 8192, promise, listener);
}
public int getInitialSessionWindow()
{
return initialSessionWindow;
}
public void setInitialSessionWindow(int initialSessionWindow)
{
this.initialSessionWindow = initialSessionWindow;
}
private class HTTP2ClientConnection extends HTTP2Connection implements Callback
{
private final HTTP2Client client;
@ -86,7 +100,14 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
Map<Integer, Integer> settings = listener.onPreface(getSession());
if (settings == null)
settings = Collections.emptyMap();
getSession().settings(new SettingsFrame(settings, false, true), this);
PrefaceFrame prefaceFrame = new PrefaceFrame();
SettingsFrame settingsFrame = new SettingsFrame(settings, false);
int windowDelta = getInitialSessionWindow() - FlowControl.DEFAULT_WINDOW_SIZE;
if (windowDelta > 0)
getSession().control(null, this, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta));
else
getSession().control(null, this, prefaceFrame, settingsFrame);
}
@Override

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.client;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
@ -32,13 +33,18 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
@ -474,7 +480,7 @@ public class FlowControlTest extends AbstractTest
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(latch.await(15, TimeUnit.SECONDS));
Assert.assertArrayEquals(data, bytes);
}
@ -617,4 +623,89 @@ public class FlowControlTest extends AbstractTest
responseContent.flip();
Assert.assertArrayEquals(requestData, responseData);
}
@Test
public void testClientExceedingSessionWindow() throws Exception
{
// On server, we don't consume the data.
startServer(new ServerSessionListener.Adapter());
final CountDownLatch closeLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (frame.getError() == ErrorCodes.FLOW_CONTROL_ERROR)
closeLatch.countDown();
}
});
// Consume the whole session and stream window.
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter());
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControl.DEFAULT_WINDOW_SIZE);
stream.data(new DataFrame(stream.getId(), data, false), Callback.Adapter.INSTANCE);
// Now the client is supposed to not send more frames, but what if it does ?
HTTP2Session http2Session = (HTTP2Session)session;
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(connector.getByteBufferPool());
ByteBuffer extraData = ByteBuffer.allocate(1024);
http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining());
List<ByteBuffer> buffers = lease.getByteBuffers();
http2Session.getEndPoint().write(Callback.Adapter.INSTANCE, buffers.toArray(new ByteBuffer[buffers.size()]));
// Expect the connection to be closed.
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testClientExceedingStreamWindow() throws Exception
{
// On server, we don't consume the data.
startServer(new ServerSessionListener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
{
// Enlarge the session window.
((ISession)session).updateRecvWindow(FlowControl.DEFAULT_WINDOW_SIZE);
return super.onPreface(session);
}
});
final CountDownLatch closeLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (frame.getError() == ErrorCodes.FLOW_CONTROL_ERROR)
closeLatch.countDown();
}
});
// Consume the whole stream window.
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter());
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControl.DEFAULT_WINDOW_SIZE);
stream.data(new DataFrame(stream.getId(), data, false), Callback.Adapter.INSTANCE);
// Now the client is supposed to not send more frames, but what if it does ?
HTTP2Session http2Session = (HTTP2Session)session;
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(connector.getByteBufferPool());
ByteBuffer extraData = ByteBuffer.allocate(1024);
http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining());
List<ByteBuffer> buffers = lease.getByteBuffers();
http2Session.getEndPoint().write(Callback.Adapter.INSTANCE, buffers.toArray(new ByteBuffer[buffers.size()]));
// Expect the connection to be closed.
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -26,9 +26,7 @@ public interface FlowControl
public void onNewStream(IStream stream);
public int getInitialWindowSize();
public void updateInitialWindowSize(ISession session, int initialWindowSize);
public void updateInitialStreamWindow(ISession session, int initialStreamWindow);
public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame);

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
@ -28,35 +29,30 @@ public class HTTP2FlowControl implements FlowControl
{
private static final Logger LOG = Log.getLogger(HTTP2FlowControl.class);
private volatile int initialWindowSize;
private int initialStreamWindow;
public HTTP2FlowControl(int initialWindowSize)
public HTTP2FlowControl(int initialStreamWindow)
{
this.initialWindowSize = initialWindowSize;
this.initialStreamWindow = initialStreamWindow;
}
@Override
public void onNewStream(IStream stream)
{
stream.updateWindowSize(initialWindowSize);
stream.updateSendWindow(initialStreamWindow);
stream.updateRecvWindow(FlowControl.DEFAULT_WINDOW_SIZE);
}
@Override
public int getInitialWindowSize()
public void updateInitialStreamWindow(ISession session, int initialStreamWindow)
{
return initialWindowSize;
}
@Override
public void updateInitialWindowSize(ISession session, int initialWindowSize)
{
int windowSize = this.initialWindowSize;
this.initialWindowSize = initialWindowSize;
int delta = initialWindowSize - windowSize;
int initialWindow = this.initialStreamWindow;
this.initialStreamWindow = initialStreamWindow;
int delta = initialStreamWindow - initialWindow;
// SPEC: updates of the initial window size only affect stream windows, not session's.
for (Stream stream : session.getStreams())
session.onUpdateWindowSize((IStream)stream, new WindowUpdateFrame(stream.getId(), delta));
session.onWindowUpdate((IStream)stream, new WindowUpdateFrame(stream.getId(), delta));
}
@Override
@ -68,22 +64,30 @@ public class HTTP2FlowControl implements FlowControl
// The stream may have been reset concurrently.
if (stream != null)
{
int oldSize = stream.updateWindowSize(delta);
int oldSize = stream.updateSendWindow(delta);
if (LOG.isDebugEnabled())
LOG.debug("Updated stream window {} -> {} for {}", oldSize, oldSize + delta, stream);
LOG.debug("Updated stream send window {} -> {} for {}", oldSize, oldSize + delta, stream);
}
}
else
{
int oldSize = session.updateWindowSize(delta);
int oldSize = session.updateSendWindow(delta);
if (LOG.isDebugEnabled())
LOG.debug("Updated session window {} -> {} for {}", oldSize, oldSize + delta, session);
LOG.debug("Updated session send window {} -> {} for {}", oldSize, oldSize + delta, session);
}
}
@Override
public void onDataReceived(IStream stream, int length)
{
ISession session = stream.getSession();
int oldSize = session.updateRecvWindow(-length);
if (LOG.isDebugEnabled())
LOG.debug("Updated session recv window {} -> {} for {}", oldSize, oldSize - length, session);
oldSize = stream.updateRecvWindow(-length);
if (LOG.isDebugEnabled())
LOG.debug("Updated stream recv window {} -> {} for {}", oldSize, oldSize - length, stream);
}
@Override
@ -95,13 +99,17 @@ public class HTTP2FlowControl implements FlowControl
// Other policies may send the WindowUpdate only upon reaching a threshold.
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increasing window by {} for {}", length, stream);
LOG.debug("Data consumed, increasing windows by {} for {}", length, stream);
if (length > 0)
{
// Negative streamId allow for generation of bytes for both stream and session
ISession session = stream.getSession();
session.updateRecvWindow(length);
stream.updateRecvWindow(length);
// Negative streamId allow for generation of bytes for both stream and session.
WindowUpdateFrame frame = new WindowUpdateFrame(-stream.getId(), length);
stream.getSession().control(stream, frame, Callback.Adapter.INSTANCE);
session.control(stream, Callback.Adapter.INSTANCE, frame, Frame.EMPTY_ARRAY);
}
}
@ -111,17 +119,14 @@ public class HTTP2FlowControl implements FlowControl
if (length == 0)
return;
if (LOG.isDebugEnabled())
LOG.debug("Data sending, decreasing windows by {}", length);
ISession session = stream.getSession();
int oldSize = session.updateWindowSize(-length);
int oldSize = session.updateSendWindow(-length);
if (LOG.isDebugEnabled())
LOG.debug("Updated session window {} -> {} for {}", oldSize, oldSize - length, session);
LOG.debug("Updated session send window {} -> {} for {}", oldSize, oldSize - length, session);
oldSize = stream.updateWindowSize(-length);
oldSize = stream.updateSendWindow(-length);
if (LOG.isDebugEnabled())
LOG.debug("Updated stream window {} -> {} for {}", oldSize, oldSize - length, stream);
LOG.debug("Updated stream send window {} -> {} for {}", oldSize, oldSize - length, stream);
}
@Override

View File

@ -139,7 +139,7 @@ public class HTTP2Flusher extends IteratingCallback
// Now the window sizes cannot change.
// Window updates that happen concurrently will
// be queued and processed on the next iteration.
int sessionWindow = session.getWindowSize();
int sessionWindow = session.getSendWindow();
int index = 0;
int size = frames.size();
@ -165,7 +165,7 @@ public class HTTP2Flusher extends IteratingCallback
Integer streamWindow = streams.get(stream);
if (streamWindow == null)
{
streamWindow = stream.getWindowSize();
streamWindow = stream.getSendWindow();
streams.put(stream, streamWindow);
}

View File

@ -72,7 +72,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final AtomicInteger lastStreamId = new AtomicInteger();
private final AtomicInteger localStreamCount = new AtomicInteger();
private final AtomicInteger remoteStreamCount = new AtomicInteger();
private final AtomicInteger windowSize = new AtomicInteger();
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final AtomicBoolean closed = new AtomicBoolean();
private final Scheduler scheduler;
private final EndPoint endPoint;
@ -94,7 +95,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
this.maxLocalStreams = maxStreams;
this.maxRemoteStreams = maxStreams;
this.streamIds.set(initialStreamId);
this.windowSize.set(flowControl.getInitialWindowSize());
this.sendWindow.set(FlowControl.DEFAULT_WINDOW_SIZE);
this.recvWindow.set(FlowControl.DEFAULT_WINDOW_SIZE);
}
public FlowControl getFlowControl()
@ -132,9 +134,17 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (stream != null)
{
stream.updateClose(frame.isEndStream(), false);
// The flow control length includes the padding bytes.
final int flowControlLength = frame.remaining() + frame.padding();
flowControl.onDataReceived(stream, flowControlLength);
if (getRecvWindow() < 0)
{
close(ErrorCodes.FLOW_CONTROL_ERROR, "session_window_exceeded", disconnectOnFailure);
return false;
}
boolean result = stream.process(frame, new Callback.Adapter()
{
@Override
@ -143,6 +153,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
flowControl.onDataConsumed(stream, flowControlLength);
}
});
if (stream.isClosed())
removeStream(stream, false);
return result;
@ -207,8 +218,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
if (settings.containsKey(SettingsFrame.INITIAL_WINDOW_SIZE))
{
int windowSize = settings.get(SettingsFrame.INITIAL_WINDOW_SIZE);
flowControl.updateInitialWindowSize(this, windowSize);
int initialWindow = settings.get(SettingsFrame.INITIAL_WINDOW_SIZE);
flowControl.updateInitialStreamWindow(this, initialWindow);
}
if (settings.containsKey(SettingsFrame.MAX_FRAME_SIZE))
{
@ -250,7 +261,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
else
{
PingFrame reply = new PingFrame(frame.getPayload(), true);
control(null, reply, disconnectOnFailure());
control(null, disconnectOnFailure(), reply);
}
return false;
}
@ -297,11 +308,11 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
IStream stream = getStream(streamId);
if (stream != null)
onUpdateWindowSize(stream, frame);
onWindowUpdate(stream, frame);
}
else
{
onUpdateWindowSize(null, frame);
onWindowUpdate(null, frame);
}
return false;
}
@ -340,7 +351,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public void settings(SettingsFrame frame, Callback callback)
{
control(null, frame, callback);
control(null, callback, frame);
}
@Override
@ -349,13 +360,13 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (frame.isReply())
callback.failed(new IllegalArgumentException());
else
control(null, frame, callback);
control(null, callback, frame);
}
@Override
public void reset(ResetFrame frame, Callback callback)
{
control(getStream(frame.getStreamId()), frame, callback);
control(getStream(frame.getStreamId()), callback, frame);
}
@Override
@ -367,25 +378,33 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
if (LOG.isDebugEnabled())
LOG.debug("Sending {}: {}", frame.getType(), reason);
control(null, frame, callback);
control(null, callback, frame);
}
}
private void control(IStream stream, Callback callback, Frame frame)
{
control(stream, callback, frame, Frame.EMPTY_ARRAY);
}
@Override
public void control(IStream stream, Frame frame, Callback callback)
public void control(IStream stream, Callback callback, Frame frame, Frame... frames)
{
// We want to generate as late as possible to allow re-prioritization.
frame(new ControlEntry(frame, stream, callback));
int length = frames.length;
frame(new ControlEntry(frame, stream, callback), length == 0);
for (int i = 1; i <= length; ++i)
frame(new ControlEntry(frames[i - 1], stream, callback), i == length);
}
@Override
public void data(IStream stream, DataFrame frame, Callback callback)
public void data(IStream stream, Callback callback, DataFrame frame)
{
// We want to generate as late as possible to allow re-prioritization.
frame(new DataEntry(frame, stream, callback));
frame(new DataEntry(frame, stream, callback), true);
}
private void frame(HTTP2Flusher.Entry entry)
private void frame(HTTP2Flusher.Entry entry, boolean flush)
{
if (LOG.isDebugEnabled())
LOG.debug("Sending {}", entry.frame);
@ -394,6 +413,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
flusher.prepend(entry);
else
flusher.append(entry);
if (flush)
flusher.iterate();
}
@ -502,19 +522,30 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
return streams.get(streamId);
}
protected int getWindowSize()
protected int getSendWindow()
{
return windowSize.get();
return sendWindow.get();
}
protected int getRecvWindow()
{
return recvWindow.get();
}
@Override
public int updateWindowSize(int delta)
public int updateSendWindow(int delta)
{
return windowSize.getAndAdd(delta);
return sendWindow.getAndAdd(delta);
}
@Override
public void onUpdateWindowSize(IStream stream, WindowUpdateFrame frame)
public int updateRecvWindow(int delta)
{
return recvWindow.getAndAdd(delta);
}
@Override
public void onWindowUpdate(IStream stream, WindowUpdateFrame frame)
{
// WindowUpdateFrames arrive concurrently with writes.
// Increasing (or reducing) the window size concurrently
@ -618,8 +649,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public String toString()
{
return String.format("%s@%x{queueSize=%d,windowSize=%s,streams=%d}", getClass().getSimpleName(),
hashCode(), flusher.getQueueSize(), windowSize, streams.size());
return String.format("%s@%x{queueSize=%d,sendWindow=%s,recvWindow=%s,streams=%d}", getClass().getSimpleName(),
hashCode(), flusher.getQueueSize(), sendWindow, recvWindow, streams.size());
}
private class ControlEntry extends HTTP2Flusher.Entry
@ -697,19 +728,19 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
int flowControlLength = dataRemaining();
int sessionWindowSize = getWindowSize();
if (sessionWindowSize < 0)
int sessionSendWindow = getSendWindow();
if (sessionSendWindow < 0)
throw new IllegalStateException();
int streamWindowSize = stream.getWindowSize();
if (streamWindowSize < 0)
int streamSendWindow = stream.getSendWindow();
if (streamSendWindow < 0)
throw new IllegalStateException();
int windowSize = Math.min(streamWindowSize, sessionWindowSize);
int window = Math.min(streamSendWindow, sessionSendWindow);
int length = this.length = Math.min(flowControlLength, windowSize);
int length = this.length = Math.min(flowControlLength, window);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}, length/window={}/{}", frame, length, windowSize);
LOG.debug("Generated {}, length/window={}/{}", frame, length, window);
generator.data(lease, (DataFrame)frame, length);
flowControl.onDataSending(stream, length);

View File

@ -49,7 +49,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream
};
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
private final AtomicInteger windowSize = new AtomicInteger();
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final ISession session;
private final HeadersFrame frame;
private volatile Listener listener;
@ -77,13 +78,13 @@ public class HTTP2Stream extends IdleTimeout implements IStream
@Override
public void headers(HeadersFrame frame, Callback callback)
{
session.control(this, frame, callback);
session.control(this, callback, frame, Frame.EMPTY_ARRAY);
}
@Override
public void data(DataFrame frame, Callback callback)
{
session.data(this, frame, callback);
session.data(this, callback, frame);
}
@Override
@ -175,6 +176,15 @@ public class HTTP2Stream extends IdleTimeout implements IStream
// TODO: handle cases where:
// TODO: A) stream already remotely close.
// TODO: B) DATA before HEADERS.
if (getRecvWindow() < 0)
{
// It's a bad client, it does not deserve to be
// treated gently by just resetting the stream.
session.close(ErrorCodes.FLOW_CONTROL_ERROR, "stream_window_exceeded", disconnectOnFailure);
return false;
}
notifyData(this, (DataFrame)frame, callback);
return false;
}
@ -237,15 +247,26 @@ public class HTTP2Stream extends IdleTimeout implements IStream
}
@Override
public int getWindowSize()
public int getSendWindow()
{
return windowSize.get();
return sendWindow.get();
}
protected int getRecvWindow()
{
return recvWindow.get();
}
@Override
public int updateWindowSize(int delta)
public int updateSendWindow(int delta)
{
return windowSize.getAndAdd(delta);
return sendWindow.getAndAdd(delta);
}
@Override
public int updateRecvWindow(int delta)
{
return recvWindow.getAndAdd(delta);
}
@Override
@ -288,8 +309,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream
@Override
public String toString()
{
return String.format("%s@%x{id=%d,windowSize=%s,reset=%b,%s}", getClass().getSimpleName(),
hashCode(), getId(), windowSize, reset, closeState);
return String.format("%s@%x{id=%d,sendWindow=%s,recvWindow=%s,reset=%b,%s}", getClass().getSimpleName(),
hashCode(), getId(), sendWindow, recvWindow, reset, closeState);
}
private enum CloseState

View File

@ -29,13 +29,15 @@ public interface ISession extends Session
@Override
public IStream getStream(int streamId);
public void control(IStream stream, Frame frame, Callback callback);
public void control(IStream stream, Callback callback, Frame frame, Frame... frames);
public void data(IStream stream, DataFrame frame, Callback callback);
public void data(IStream stream, Callback callback, DataFrame frame);
public int updateWindowSize(int delta);
public int updateSendWindow(int delta);
public void onUpdateWindowSize(IStream stream, WindowUpdateFrame frame);
public int updateRecvWindow(int delta);
public void onWindowUpdate(IStream stream, WindowUpdateFrame frame);
public void shutdown();

View File

@ -44,9 +44,11 @@ public interface IStream extends Stream
*/
public void updateClose(boolean update, boolean local);
public int getWindowSize();
public int getSendWindow();
public int updateWindowSize(int delta);
public int updateSendWindow(int delta);
public int updateRecvWindow(int delta);
public void close();
}

View File

@ -23,6 +23,7 @@ public abstract class Frame
public static final int HEADER_LENGTH = 9;
public static final int DEFAULT_MAX_LENGTH = 0x40_00;
public static final int MAX_MAX_LENGTH = 0xFF_FF_FF;
public static final Frame[] EMPTY_ARRAY = new Frame[0];
private final FrameType type;

View File

@ -32,7 +32,9 @@ public enum FrameType
PING(6),
GO_AWAY(7),
WINDOW_UPDATE(8),
CONTINUATION(9);
CONTINUATION(9),
// Synthetic frames only needed by the implementation.
PREFACE(10);
public static FrameType from(int type)
{

View File

@ -0,0 +1,34 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.frames;
public class PrefaceFrame extends Frame
{
public static final byte[] PREFACE_BYTES = new byte[]
{
0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54,
0x54, 0x50, 0x2f, 0x32, 0x2e, 0x30, 0x0d, 0x0a,
0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a, 0x0d, 0x0a
};
public PrefaceFrame()
{
super(FrameType.PREFACE);
}
}

View File

@ -30,19 +30,12 @@ public class SettingsFrame extends Frame
private final Map<Integer, Integer> settings;
private final boolean reply;
private boolean preface;
public SettingsFrame(Map<Integer, Integer> settings, boolean reply)
{
this(settings, reply, false);
}
public SettingsFrame(Map<Integer, Integer> settings, boolean reply, boolean preface)
{
super(FrameType.SETTINGS);
this.settings = settings;
this.reply = reply;
this.preface = preface;
}
public Map<Integer, Integer> getSettings()
@ -54,9 +47,4 @@ public class SettingsFrame extends Frame
{
return reply;
}
public boolean isPreface()
{
return preface;
}
}

View File

@ -54,6 +54,7 @@ public class Generator
this.generators[FrameType.GO_AWAY.getType()] = new GoAwayGenerator(headerGenerator);
this.generators[FrameType.WINDOW_UPDATE.getType()] = new WindowUpdateGenerator(headerGenerator);
this.generators[FrameType.CONTINUATION.getType()] = null; // TODO
this.generators[FrameType.PREFACE.getType()] = new PrefaceGenerator();
this.dataGenerator = new DataGenerator(headerGenerator);
}

View File

@ -0,0 +1,39 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.io.ByteBufferPool;
public class PrefaceGenerator extends FrameGenerator
{
public PrefaceGenerator()
{
super(null);
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
lease.append(ByteBuffer.wrap(PrefaceFrame.PREFACE_BYTES), false);
}
}

View File

@ -25,7 +25,6 @@ import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.parser.PrefaceParser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
@ -40,14 +39,11 @@ public class SettingsGenerator extends FrameGenerator
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
SettingsFrame settingsFrame = (SettingsFrame)frame;
generateSettings(lease, settingsFrame.getSettings(), settingsFrame.isReply(), settingsFrame.isPreface());
generateSettings(lease, settingsFrame.getSettings(), settingsFrame.isReply());
}
public void generateSettings(ByteBufferPool.Lease lease, Map<Integer, Integer> settings, boolean reply, boolean preface)
public void generateSettings(ByteBufferPool.Lease lease, Map<Integer, Integer> settings, boolean reply)
{
if (preface)
lease.append(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false);
// Two bytes for the identifier, four bytes for the value.
int entryLength = 2 + 4;
int length = entryLength * settings.size();

View File

@ -21,17 +21,12 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class PrefaceParser
{
public static final byte[] PREFACE_BYTES = new byte[]
{
0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54,
0x54, 0x50, 0x2f, 0x32, 0x2e, 0x30, 0x0d, 0x0a,
0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a, 0x0d, 0x0a
};
private static final Logger LOG = Log.getLogger(PrefaceParser.class);
private final Parser.Listener listener;
@ -47,13 +42,13 @@ public class PrefaceParser
while (buffer.hasRemaining())
{
int currByte = buffer.get();
if (currByte != PREFACE_BYTES[cursor])
if (currByte != PrefaceFrame.PREFACE_BYTES[cursor])
{
notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_preface");
return false;
}
++cursor;
if (cursor == PREFACE_BYTES.length)
if (cursor == PrefaceFrame.PREFACE_BYTES.length)
{
cursor = 0;
if (LOG.isDebugEnabled())

View File

@ -87,7 +87,7 @@ public class SettingsGenerateParseTest
for (int i = 0; i < 2; ++i)
{
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generateSettings(lease, settings, true, false);
generator.generateSettings(lease, settings, true);
frames.clear();
for (ByteBuffer buffer : lease.getByteBuffers())
@ -120,7 +120,7 @@ public class SettingsGenerateParseTest
Map<Integer, Integer> settings1 = new HashMap<>();
settings1.put(13, 17);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generateSettings(lease, settings1, true, false);
generator.generateSettings(lease, settings1, true);
// Modify the length of the frame to make it invalid
ByteBuffer bytes = lease.getByteBuffers().get(0);
bytes.putShort(1, (short)(bytes.getShort(1) - 1));
@ -158,7 +158,7 @@ public class SettingsGenerateParseTest
settings1.put(key, value);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generateSettings(lease, settings1, true, false);
generator.generateSettings(lease, settings1, true);
for (ByteBuffer buffer : lease.getByteBuffers())
{

View File

@ -37,7 +37,7 @@ import org.eclipse.jetty.server.Connector;
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
{
private int maxHeaderTableSize = 4096;
private int initialWindowSize = FlowControl.DEFAULT_WINDOW_SIZE;
private int initialStreamWindow = FlowControl.DEFAULT_WINDOW_SIZE;
private int maxConcurrentStreams = -1;
public AbstractHTTP2ServerConnectionFactory()
@ -55,14 +55,14 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.maxHeaderTableSize = maxHeaderTableSize;
}
public int getInitialWindowSize()
public int getInitialStreamWindow()
{
return initialWindowSize;
return initialStreamWindow;
}
public void setInitialWindowSize(int initialWindowSize)
public void setInitialStreamWindow(int initialStreamWindow)
{
this.initialWindowSize = initialWindowSize;
this.initialStreamWindow = initialStreamWindow;
}
public int getMaxConcurrentStreams()
@ -82,7 +82,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
Generator generator = new Generator(connector.getByteBufferPool(), getMaxHeaderTableSize());
HTTP2ServerSession session = new HTTP2ServerSession(connector.getScheduler(), endPoint, generator, listener,
new HTTP2FlowControl(getInitialWindowSize()), getMaxConcurrentStreams());
new HTTP2FlowControl(getInitialStreamWindow()), getMaxConcurrentStreams());
Parser parser = newServerParser(connector.getByteBufferPool(), session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),

View File

@ -80,7 +80,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
{
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.HEADER_TABLE_SIZE, getMaxHeaderTableSize());
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, getInitialWindowSize());
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, getInitialStreamWindow());
int maxConcurrentStreams = getMaxConcurrentStreams();
if (maxConcurrentStreams >= 0)
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxConcurrentStreams);

View File

@ -27,6 +27,7 @@ import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
@ -57,7 +58,8 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
if (settings == null)
settings = Collections.emptyMap();
SettingsFrame frame = new SettingsFrame(settings, false);
settings(frame, disconnectOnFailure());
// TODO: consider sending a WINDOW_UPDATE to enlarge the session send window of the client.
control(null, disconnectOnFailure(), frame, Frame.EMPTY_ARRAY);
return false;
}

View File

@ -44,10 +44,10 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.http2.parser.PrefaceParser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.HttpConfiguration;
@ -152,7 +152,7 @@ public class HTTP2ServerTest
HeadersFrame request = new HeadersFrame(1, metaData, null, true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, request);
lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false);
lease.prepend(ByteBuffer.wrap(PrefaceFrame.PREFACE_BYTES), false);
try (Socket client = new Socket(host, port))
{
@ -215,7 +215,7 @@ public class HTTP2ServerTest
HeadersFrame request = new HeadersFrame(1, metaData, null, true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, request);
lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false);
lease.prepend(ByteBuffer.wrap(PrefaceFrame.PREFACE_BYTES), false);
try (Socket client = new Socket(host, port))
{
@ -280,7 +280,7 @@ public class HTTP2ServerTest
generator.control(lease, frame);
// Modify the length of the frame to a wrong one.
lease.getByteBuffers().get(0).putShort(0, (short)7);
lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false);
lease.prepend(ByteBuffer.wrap(PrefaceFrame.PREFACE_BYTES), false);
final CountDownLatch latch = new CountDownLatch(1);
try (Socket client = new Socket(host, port))
@ -320,7 +320,7 @@ public class HTTP2ServerTest
generator.control(lease, frame);
// Modify the streamId of the frame to non zero.
lease.getByteBuffers().get(0).putInt(4, 1);
lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false);
lease.prepend(ByteBuffer.wrap(PrefaceFrame.PREFACE_BYTES), false);
final CountDownLatch latch = new CountDownLatch(1);
try (Socket client = new Socket(host, port))