Split the generation of frames into 2: flow-controlled and

non-flow-controlled.
This gives better code separation and proper removal of streams when
flow controlled frames complete.
This commit is contained in:
Simone Bordet 2014-06-13 16:28:54 +02:00
parent fb93973c9d
commit 388262227e
18 changed files with 140 additions and 86 deletions

View File

@ -44,6 +44,14 @@ import org.junit.Test;
public class FlowControlTest extends AbstractTest public class FlowControlTest extends AbstractTest
{ {
@Override
public void dispose() throws Exception
{
// Allow WINDOW_UPDATE frames to be sent/received to avoid exception stack traces.
Thread.sleep(1000);
super.dispose();
}
@Test @Test
public void testFlowControlWithConcurrentSettings() throws Exception public void testFlowControlWithConcurrentSettings() throws Exception
{ {

View File

@ -103,7 +103,7 @@ public class HTTP2FlowControl implements FlowControl
// Negative streamId allow for generation of bytes for both stream and session // Negative streamId allow for generation of bytes for both stream and session
int streamId = stream != null ? -stream.getId() : 0; int streamId = stream != null ? -stream.getId() : 0;
WindowUpdateFrame frame = new WindowUpdateFrame(streamId, length); WindowUpdateFrame frame = new WindowUpdateFrame(streamId, length);
session.frame(stream, frame, Callback.Adapter.INSTANCE); session.control(stream, frame, Callback.Adapter.INSTANCE);
} }
@Override @Override

View File

@ -117,14 +117,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (stream != null) if (stream != null)
{ {
stream.updateClose(frame.isEndStream(), false); stream.updateClose(frame.isEndStream(), false);
flowControl.onDataReceived(this, stream, frame.getFlowControlledLength()); final int length = frame.remaining();
flowControl.onDataReceived(this, stream, length);
return stream.process(frame, new Callback.Adapter() return stream.process(frame, new Callback.Adapter()
{ {
@Override @Override
public void succeeded() public void succeeded()
{ {
int consumed = frame.getFlowControlledLength(); flowControl.onDataConsumed(HTTP2Session.this, stream, length);
flowControl.onDataConsumed(HTTP2Session.this, stream, consumed);
} }
}); });
} }
@ -257,19 +257,19 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override @Override
public void settings(SettingsFrame frame, Callback callback) public void settings(SettingsFrame frame, Callback callback)
{ {
frame(null, frame, callback); control(null, frame, callback);
} }
@Override @Override
public void ping(PingFrame frame, Callback callback) public void ping(PingFrame frame, Callback callback)
{ {
frame(null, frame, callback); control(null, frame, callback);
} }
@Override @Override
public void reset(ResetFrame frame, Callback callback) public void reset(ResetFrame frame, Callback callback)
{ {
frame(null, frame, callback); control(null, frame, callback);
} }
@Override @Override
@ -278,15 +278,27 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8); byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8);
GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload); GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
LOG.debug("Sending {}: {}", frame.getType(), reason); LOG.debug("Sending {}: {}", frame.getType(), reason);
frame(null, frame, callback); control(null, frame, callback);
} }
@Override @Override
public void frame(IStream stream, Frame frame, Callback callback) public void control(IStream stream, Frame frame, Callback callback)
{ {
// We want to generate as late as possible to allow re-prioritization. // We want to generate as late as possible to allow re-prioritization.
FlusherEntry entry = new FlusherEntry(stream, frame, callback); frame(new FlusherEntry(stream, frame, callback));
LOG.debug("Sending {}", frame); }
@Override
public void data(IStream stream, DataFrame frame, Callback callback)
{
// We want to generate as late as possible to allow re-prioritization.
frame(new DataFlusherEntry(stream, frame, callback));
}
private void frame(FlusherEntry entry)
{
if (LOG.isDebugEnabled())
LOG.debug("Sending {}", entry.frame);
flusher.append(entry); flusher.append(entry);
flusher.iterate(); flusher.iterate();
} }
@ -495,33 +507,38 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{ {
FlusherEntry entry = queue.get(nonStalledIndex); FlusherEntry entry = queue.get(nonStalledIndex);
IStream stream = entry.stream; IStream stream = entry.stream;
int frameWindow = entry.frame.getFlowControlledLength(); int remaining = 0;
if (frameWindow > 0) if (entry.frame instanceof DataFrame)
{ {
// Is the session stalled ? DataFrame dataFrame = (DataFrame)entry.frame;
if (sessionWindow <= 0) remaining = dataFrame.remaining();
if (remaining > 0)
{ {
flowControl.onSessionStalled(HTTP2Session.this); // Is the session stalled ?
++nonStalledIndex; if (sessionWindow <= 0)
// There may be *non* flow controlled frames to send.
continue;
}
if (stream != null)
{
Integer streamWindow = streams.get(stream);
if (streamWindow == null)
{ {
streamWindow = stream.getWindowSize(); flowControl.onSessionStalled(HTTP2Session.this);
streams.put(stream, streamWindow); ++nonStalledIndex;
// There may be *non* flow controlled frames to send.
continue;
} }
// Is it a frame belonging to an already stalled stream ? if (stream != null)
if (streamWindow <= 0)
{ {
flowControl.onStreamStalled(stream); Integer streamWindow = streams.get(stream);
++nonStalledIndex; if (streamWindow == null)
continue; {
streamWindow = stream.getWindowSize();
streams.put(stream, streamWindow);
}
// Is it a frame belonging to an already stalled stream ?
if (streamWindow <= 0)
{
flowControl.onStreamStalled(stream);
++nonStalledIndex;
continue;
}
} }
} }
} }
@ -531,16 +548,16 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
--size; --size;
// If the stream has been reset, don't send flow controlled frames. // If the stream has been reset, don't send flow controlled frames.
if (stream != null && stream.isReset() && frameWindow > 0) if (stream != null && stream.isReset() && remaining > 0)
{ {
reset.add(entry); reset.add(entry);
continue; continue;
} }
// Reduce the flow control windows. // Reduce the flow control windows.
sessionWindow -= frameWindow; sessionWindow -= remaining;
if (stream != null && frameWindow > 0) if (stream != null && remaining > 0)
streams.put(stream, streams.get(stream) - frameWindow); streams.put(stream, streams.get(stream) - remaining);
active.add(entry); active.add(entry);
if (active.size() == maxGather) if (active.size() == maxGather)
@ -630,10 +647,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private class FlusherEntry implements Callback private class FlusherEntry implements Callback
{ {
private final IStream stream; protected final IStream stream;
private final Frame frame; protected final Frame frame;
private final Callback callback; protected final Callback callback;
private int length;
private FlusherEntry(IStream stream, Frame frame, Callback callback) private FlusherEntry(IStream stream, Frame frame, Callback callback)
{ {
@ -646,13 +662,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{ {
try try
{ {
int windowSize = stream == null ? getWindowSize() : stream.getWindowSize(); generator.control(lease, frame);
int frameLength = frame.getFlowControlledLength(); if (LOG.isDebugEnabled())
if (frameLength > 0) LOG.debug("Generated {}", frame);
this.length = Math.min(frameLength, windowSize);
generator.generate(lease, frame, windowSize);
LOG.debug("Generated {}, windowSize={}", frame, windowSize);
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -661,12 +673,46 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
} }
} }
@Override
public void succeeded()
{
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
callback.failed(x);
}
}
private class DataFlusherEntry extends FlusherEntry
{
private int length;
private DataFlusherEntry(IStream stream, DataFrame frame, Callback callback)
{
super(stream, frame, callback);
}
public void generate(ByteBufferPool.Lease lease)
{
DataFrame dataFrame = (DataFrame)frame;
int windowSize = stream.getWindowSize();
int frameLength = dataFrame.remaining();
this.length = Math.min(frameLength, windowSize);
generator.data(lease, dataFrame, length);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}, maxLength={}", dataFrame, length);
}
@Override @Override
public void succeeded() public void succeeded()
{ {
flowControl.onDataSent(HTTP2Session.this, stream, length); flowControl.onDataSent(HTTP2Session.this, stream, length);
// Do we have more to send ? // Do we have more to send ?
if (frame.getFlowControlledLength() > 0) DataFrame dataFrame = (DataFrame)frame;
if (dataFrame.remaining() > 0)
{ {
// We have written part of the frame, but there is more to write. // We have written part of the frame, but there is more to write.
// We need to keep the correct ordering of frames, to avoid that other // We need to keep the correct ordering of frames, to avoid that other
@ -675,19 +721,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
} }
else else
{ {
// Only now we can update the close state
// and eventually remove the stream.
stream.updateClose(dataFrame.isEndStream(), true);
if (stream.isClosed())
removeStream(stream, true);
callback.succeeded(); callback.succeeded();
// TODO: what below is needed ? YES IT IS.
// stream.updateCloseState(dataInfo.isClose(), true);
// if (stream.isClosed())
// removeStream(stream);
} }
} }
@Override
public void failed(Throwable x)
{
callback.failed(x);
}
} }
private class PromiseCallback<C> implements Callback private class PromiseCallback<C> implements Callback

View File

@ -63,13 +63,13 @@ public class HTTP2Stream implements IStream
@Override @Override
public void headers(HeadersFrame frame, Callback callback) public void headers(HeadersFrame frame, Callback callback)
{ {
session.frame(this, frame, callback); session.control(this, frame, callback);
} }
@Override @Override
public void data(DataFrame frame, Callback callback) public void data(DataFrame frame, Callback callback)
{ {
session.frame(this, frame, callback); session.data(this, frame, callback);
} }
@Override @Override

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2; package org.eclipse.jetty.http2;
import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -27,7 +28,9 @@ public interface ISession extends Session
@Override @Override
IStream getStream(int streamId); IStream getStream(int streamId);
public void frame(IStream stream, Frame frame, Callback callback); public void control(IStream stream, Frame frame, Callback callback);
public void data(IStream stream, DataFrame frame, Callback callback);
public int updateWindowSize(int delta); public int updateWindowSize(int delta);
} }

View File

@ -49,8 +49,7 @@ public class DataFrame extends Frame
return endStream; return endStream;
} }
@Override public int remaining()
public int getFlowControlledLength()
{ {
return data.remaining(); return data.remaining();
} }

View File

@ -35,11 +35,6 @@ public abstract class Frame
return type; return type;
} }
public int getFlowControlledLength()
{
return 0;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -27,14 +27,15 @@ import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
public class DataGenerator extends FrameGenerator public class DataGenerator
{ {
private final HeaderGenerator headerGenerator;
public DataGenerator(HeaderGenerator headerGenerator) public DataGenerator(HeaderGenerator headerGenerator)
{ {
super(headerGenerator); this.headerGenerator = headerGenerator;
} }
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength) public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength)
{ {
DataFrame dataFrame = (DataFrame)frame; DataFrame dataFrame = (DataFrame)frame;
@ -82,7 +83,7 @@ public class DataGenerator extends FrameGenerator
if (last) if (last)
flags |= Flag.END_STREAM; flags |= Flag.END_STREAM;
ByteBuffer header = generateHeader(lease, FrameType.DATA, length, flags, streamId); ByteBuffer header = headerGenerator.generate(lease, FrameType.DATA, Frame.HEADER_LENGTH + length, length, flags, streamId);
BufferUtil.flipToFlush(header, 0); BufferUtil.flipToFlush(header, 0);
lease.append(header, true); lease.append(header, true);

View File

@ -33,7 +33,7 @@ public abstract class FrameGenerator
this.headerGenerator = headerGenerator; this.headerGenerator = headerGenerator;
} }
public abstract void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength); public abstract void generate(ByteBufferPool.Lease lease, Frame frame);
protected ByteBuffer generateHeader(ByteBufferPool.Lease lease, FrameType frameType, int length, int flags, int streamId) protected ByteBuffer generateHeader(ByteBufferPool.Lease lease, FrameType frameType, int length, int flags, int streamId)
{ {

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.http2.generator; package org.eclipse.jetty.http2.generator;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.hpack.HpackEncoder; import org.eclipse.jetty.http2.hpack.HpackEncoder;
@ -28,6 +29,7 @@ public class Generator
private final ByteBufferPool byteBufferPool; private final ByteBufferPool byteBufferPool;
private final int headerTableSize; private final int headerTableSize;
private final FrameGenerator[] generators; private final FrameGenerator[] generators;
private final DataGenerator dataGenerator;
public Generator(ByteBufferPool byteBufferPool) public Generator(ByteBufferPool byteBufferPool)
{ {
@ -43,7 +45,6 @@ public class Generator
HpackEncoder encoder = new HpackEncoder(headerTableSize); HpackEncoder encoder = new HpackEncoder(headerTableSize);
this.generators = new FrameGenerator[FrameType.values().length]; this.generators = new FrameGenerator[FrameType.values().length];
this.generators[FrameType.DATA.getType()] = new DataGenerator(headerGenerator);
this.generators[FrameType.HEADERS.getType()] = new HeadersGenerator(headerGenerator, encoder); this.generators[FrameType.HEADERS.getType()] = new HeadersGenerator(headerGenerator, encoder);
this.generators[FrameType.PRIORITY.getType()] = new PriorityGenerator(headerGenerator); this.generators[FrameType.PRIORITY.getType()] = new PriorityGenerator(headerGenerator);
this.generators[FrameType.RST_STREAM.getType()] = new ResetGenerator(headerGenerator); this.generators[FrameType.RST_STREAM.getType()] = new ResetGenerator(headerGenerator);
@ -56,6 +57,7 @@ public class Generator
this.generators[FrameType.ALTSVC.getType()] = null; // TODO this.generators[FrameType.ALTSVC.getType()] = null; // TODO
this.generators[FrameType.BLOCKED.getType()] = null; // TODO this.generators[FrameType.BLOCKED.getType()] = null; // TODO
this.dataGenerator = new DataGenerator(headerGenerator);
} }
public ByteBufferPool getByteBufferPool() public ByteBufferPool getByteBufferPool()
@ -68,8 +70,13 @@ public class Generator
return headerTableSize; return headerTableSize;
} }
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength) public void control(ByteBufferPool.Lease lease, Frame frame)
{ {
generators[frame.getType().getType()].generate(lease, frame, maxLength); generators[frame.getType().getType()].generate(lease, frame);
}
public void data(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
{
dataGenerator.generate(lease, frame, maxLength);
} }
} }

View File

@ -35,7 +35,7 @@ public class GoAwayGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength) public void generate(ByteBufferPool.Lease lease, Frame frame)
{ {
GoAwayFrame goAwayFrame = (GoAwayFrame)frame; GoAwayFrame goAwayFrame = (GoAwayFrame)frame;
generateGoAway(lease, goAwayFrame.getLastStreamId(), goAwayFrame.getError(), goAwayFrame.getPayload()); generateGoAway(lease, goAwayFrame.getLastStreamId(), goAwayFrame.getError(), goAwayFrame.getPayload());

View File

@ -40,7 +40,7 @@ public class HeadersGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength) public void generate(ByteBufferPool.Lease lease, Frame frame)
{ {
HeadersFrame headersFrame = (HeadersFrame)frame; HeadersFrame headersFrame = (HeadersFrame)frame;
generate(lease, headersFrame.getStreamId(), headersFrame.getMetaData(), !headersFrame.isEndStream()); generate(lease, headersFrame.getStreamId(), headersFrame.getMetaData(), !headersFrame.isEndStream());

View File

@ -35,7 +35,7 @@ public class PingGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength) public void generate(ByteBufferPool.Lease lease, Frame frame)
{ {
PingFrame pingFrame = (PingFrame)frame; PingFrame pingFrame = (PingFrame)frame;
generatePing(lease, pingFrame.getPayload(), pingFrame.isReply()); generatePing(lease, pingFrame.getPayload(), pingFrame.isReply());

View File

@ -35,7 +35,7 @@ public class PriorityGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength) public void generate(ByteBufferPool.Lease lease, Frame frame)
{ {
PriorityFrame priorityFrame = (PriorityFrame)frame; PriorityFrame priorityFrame = (PriorityFrame)frame;
generatePriority(lease, priorityFrame.getStreamId(), priorityFrame.getDependentStreamId(), priorityFrame.getWeight(), priorityFrame.isExclusive()); generatePriority(lease, priorityFrame.getStreamId(), priorityFrame.getDependentStreamId(), priorityFrame.getWeight(), priorityFrame.isExclusive());

View File

@ -35,7 +35,7 @@ public class ResetGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength) public void generate(ByteBufferPool.Lease lease, Frame frame)
{ {
ResetFrame resetFrame = (ResetFrame)frame; ResetFrame resetFrame = (ResetFrame)frame;
generateReset(lease, resetFrame.getStreamId(), resetFrame.getError()); generateReset(lease, resetFrame.getStreamId(), resetFrame.getError());

View File

@ -36,7 +36,7 @@ public class SettingsGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength) public void generate(ByteBufferPool.Lease lease, Frame frame)
{ {
SettingsFrame settingsFrame = (SettingsFrame)frame; SettingsFrame settingsFrame = (SettingsFrame)frame;
generateSettings(lease, settingsFrame.getSettings(), settingsFrame.isReply()); generateSettings(lease, settingsFrame.getSettings(), settingsFrame.isReply());

View File

@ -35,7 +35,7 @@ public class WindowUpdateGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame, int maxLength) public void generate(ByteBufferPool.Lease lease, Frame frame)
{ {
WindowUpdateFrame windowUpdateFrame = (WindowUpdateFrame)frame; WindowUpdateFrame windowUpdateFrame = (WindowUpdateFrame)frame;
generateWindowUpdate(lease, windowUpdateFrame.getStreamId(), windowUpdateFrame.getWindowDelta()); generateWindowUpdate(lease, windowUpdateFrame.getStreamId(), windowUpdateFrame.getWindowDelta());

View File

@ -105,7 +105,7 @@ public class HTTP2ServerTest
host + ":" + port, host, port, path, fields); host + ":" + port, host, port, path, fields);
HeadersFrame request = new HeadersFrame(1, metaData, null, true); HeadersFrame request = new HeadersFrame(1, metaData, null, true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generate(lease, request, 0); generator.control(lease, request);
// No preface bytes // No preface bytes
@ -154,7 +154,7 @@ public class HTTP2ServerTest
host + ":" + port, host, port, path, fields); host + ":" + port, host, port, path, fields);
HeadersFrame request = new HeadersFrame(1, metaData, null, true); HeadersFrame request = new HeadersFrame(1, metaData, null, true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generate(lease, request, 0); generator.control(lease, request);
lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false); lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false);
try (Socket client = new Socket(host, port)) try (Socket client = new Socket(host, port))
@ -217,7 +217,7 @@ public class HTTP2ServerTest
host + ":" + port, host, port, path, fields); host + ":" + port, host, port, path, fields);
HeadersFrame request = new HeadersFrame(1, metaData, null, true); HeadersFrame request = new HeadersFrame(1, metaData, null, true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generate(lease, request, 0); generator.control(lease, request);
lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false); lease.prepend(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false);
try (Socket client = new Socket(host, port)) try (Socket client = new Socket(host, port))