Reworked flow control implementation. Splitted HTTP2Flusher out of HTTP2Session.

Flow control window updates are now processed by the flusher, so that
it is the only component that handles window updates.
In the process of this refactoring, HTTP2Flusher was refactored out
of HTTP2Session.
This commit is contained in:
Simone Bordet 2014-08-06 15:42:36 +02:00
parent 2cd53831c0
commit abd139cc1b
6 changed files with 584 additions and 358 deletions

View File

@ -462,4 +462,73 @@ public class FlowControlTest extends AbstractTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertArrayEquals(data, bytes);
}
@Test
public void testServerTwoDataFramesWithStalledSession() throws Exception
{
// Frames in queue = DATA1, DATA2.
// Server writes part of DATA1, then stalls.
// A window update unstalls the session, verify that the data is correctly sent.
Random random = new Random();
final byte[] chunk1 = new byte[1024];
random.nextBytes(chunk1);
final byte[] chunk2 = new byte[1024];
random.nextBytes(chunk2);
final AtomicReference<CountDownLatch> settingsLatch = new AtomicReference<>(new CountDownLatch(1));
final CountDownLatch dataLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.get().countDown();
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(chunk1), false), Callback.Adapter.INSTANCE);
stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(chunk2), true), Callback.Adapter.INSTANCE);
dataLatch.countDown();
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, 0);
session.settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE);
Assert.assertTrue(settingsLatch.get().await(5, TimeUnit.SECONDS));
byte[] content = new byte[chunk1.length + chunk2.length];
final ByteBuffer buffer = ByteBuffer.wrap(content);
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, true);
final CountDownLatch responseLatch = new CountDownLatch(1);
session.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
buffer.put(frame.getData());
callback.succeeded();
if (frame.isEndStream())
responseLatch.countDown();
}
});
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
// Now we have the 2 DATA frames queued in the server.
// Partially unstall the first DATA frame.
settingsLatch.set(new CountDownLatch(1));
settings.clear();
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, chunk1.length / 2);
session.settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE);
Assert.assertTrue(settingsLatch.get().await(5, TimeUnit.SECONDS));
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -30,11 +30,13 @@ public interface FlowControl
public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame);
public void onDataReceived(ISession session, IStream stream, int length);
public void onDataReceived(IStream stream, int length);
public void onDataConsumed(ISession session, IStream stream, int length);
public void onDataConsumed(IStream stream, int length);
public void onDataSent(ISession session, IStream stream, int length);
public void onDataSending(IStream stream, int length);
public void onDataSent(IStream stream, int length);
public void onSessionStalled(ISession session);

View File

@ -52,21 +52,14 @@ public class HTTP2FlowControl implements FlowControl
{
int windowSize = this.initialWindowSize;
this.initialWindowSize = initialWindowSize;
int delta = initialWindowSize - windowSize;
// Update the sessions's window size.
int oldSize = session.updateWindowSize(delta);
if (LOG.isDebugEnabled())
LOG.debug("Updated session initial window {} -> {} for {}", oldSize, oldSize + delta, session);
// Update the session's window size.
session.onUpdateWindowSize(null, new WindowUpdateFrame(0, delta));
// Update the streams' window size.
for (Stream stream : session.getStreams())
{
oldSize = ((IStream)stream).updateWindowSize(delta);
if (LOG.isDebugEnabled())
LOG.debug("Updated stream initial window {} -> {} for {}", oldSize, oldSize + delta, stream);
}
session.onUpdateWindowSize((IStream)stream, new WindowUpdateFrame(stream.getId(), delta));
}
@Override
@ -92,12 +85,12 @@ public class HTTP2FlowControl implements FlowControl
}
@Override
public void onDataReceived(ISession session, IStream stream, int length)
public void onDataReceived(IStream stream, int length)
{
}
@Override
public void onDataConsumed(ISession session, IStream stream, int length)
public void onDataConsumed(IStream stream, int length)
{
// This is the algorithm for flow control.
// This method is called when a whole flow controlled frame has been consumed.
@ -107,28 +100,32 @@ public class HTTP2FlowControl implements FlowControl
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increasing window by {} for {}", length, stream);
// Negative streamId allow for generation of bytes for both stream and session
int streamId = stream != null ? -stream.getId() : 0;
WindowUpdateFrame frame = new WindowUpdateFrame(streamId, length);
session.control(stream, frame, Callback.Adapter.INSTANCE);
WindowUpdateFrame frame = new WindowUpdateFrame(-stream.getId(), length);
stream.getSession().control(stream, frame, Callback.Adapter.INSTANCE);
}
@Override
public void onDataSent(ISession session, IStream stream, int length)
public void onDataSending(IStream stream, int length)
{
if (length == 0)
return;
if (LOG.isDebugEnabled())
LOG.debug("Data sent, decreasing window by {}", length);
LOG.debug("Data sending, decreasing windows by {}", length);
ISession session = stream.getSession();
int oldSize = session.updateWindowSize(-length);
if (LOG.isDebugEnabled())
LOG.debug("Updated session window {} -> {} for {}", oldSize, oldSize - length, session);
if (stream != null)
{
oldSize = stream.updateWindowSize(-length);
if (LOG.isDebugEnabled())
LOG.debug("Updated stream window {} -> {} for {}", oldSize, oldSize - length, stream);
}
oldSize = stream.updateWindowSize(-length);
if (LOG.isDebugEnabled())
LOG.debug("Updated stream window {} -> {} for {}", oldSize, oldSize - length, stream);
}
@Override
public void onDataSent(IStream stream, int length)
{
}
@Override

View File

@ -0,0 +1,391 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HTTP2Flusher extends IteratingCallback
{
private static final Logger LOG = Log.getLogger(HTTP2Flusher.class);
private final Deque<WindowEntry> windows = new ArrayDeque<>();
private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this);
private final Map<IStream, Integer> streams = new HashMap<>();
private final List<Entry> reset = new ArrayList<>();
private final HTTP2Session session;
private final ByteBufferPool.Lease lease;
private final List<Entry> active;
private final Queue<Entry> complete;
public HTTP2Flusher(HTTP2Session session)
{
this.session = session;
this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool());
this.active = new ArrayList<>();
this.complete = new ArrayDeque<>();
}
public void window(IStream stream, WindowUpdateFrame frame)
{
synchronized (this)
{
if (!isClosed())
{
windows.offer(new WindowEntry(stream, frame));
// Flush stalled data.
iterate();
}
}
}
public void prepend(Entry entry)
{
boolean fail = false;
synchronized (this)
{
if (isClosed())
{
fail = true;
}
else
{
frames.add(0, entry);
if (LOG.isDebugEnabled())
LOG.debug("Prepended {}, frames={}", entry, frames.size());
}
}
if (fail)
closed(entry, new ClosedChannelException());
}
public void append(Entry entry)
{
boolean fail = false;
synchronized (this)
{
if (isClosed())
{
fail = true;
}
else
{
frames.offer(entry);
if (LOG.isDebugEnabled())
LOG.debug("Appended {}, frames={}", entry, frames.size());
}
}
if (fail)
closed(entry, new ClosedChannelException());
}
public int getQueueSize()
{
synchronized (this)
{
return frames.size();
}
}
@Override
protected Action process() throws Exception
{
if (LOG.isDebugEnabled())
LOG.debug("Flushing {}", session);
synchronized (this)
{
// First thing, update the window sizes, so we can
// reason about the frames to remove from the queue.
while (!windows.isEmpty())
{
WindowEntry entry = windows.poll();
entry.perform();
}
// Now the window sizes cannot change.
// Window updates that happen concurrently will
// be queued and processed on the next iteration.
int sessionWindow = session.getWindowSize();
int index = 0;
int size = frames.size();
while (index < size)
{
Entry entry = frames.get(index);
// We need to compute how many frames fit in the windows.
IStream stream = entry.stream;
int remaining = entry.dataRemaining();
if (remaining > 0)
{
if (sessionWindow <= 0)
{
session.getFlowControl().onSessionStalled(session);
++index;
// There may be *non* flow controlled frames to send.
continue;
}
// The stream may have a smaller window than the session.
Integer streamWindow = streams.get(stream);
if (streamWindow == null)
{
streamWindow = stream.getWindowSize();
streams.put(stream, streamWindow);
}
// Is it a frame belonging to an already stalled stream ?
if (streamWindow <= 0)
{
session.getFlowControl().onStreamStalled(stream);
++index;
// There may be *non* flow controlled frames to send.
continue;
}
}
// We will be possibly writing this
// frame, remove it from the queue.
if (index == 0)
frames.pollUnsafe();
else
frames.remove(index);
--size;
// If the stream has been reset, don't send the frame.
if (stream != null && stream.isReset())
{
reset.add(entry);
continue;
}
// Reduce the flow control windows.
if (remaining > 0)
{
sessionWindow -= remaining;
streams.put(stream, streams.get(stream) - remaining);
}
// The frame will be written.
active.add(entry);
if (LOG.isDebugEnabled())
LOG.debug("Gathered {}", entry);
}
streams.clear();
}
// Perform resets outside the sync block.
for (int i = 0; i < reset.size(); ++i)
{
Entry entry = reset.get(i);
entry.reset();
}
reset.clear();
if (active.isEmpty())
{
if (isClosed())
terminate(new ClosedChannelException());
if (LOG.isDebugEnabled())
LOG.debug("Flushed {}", session);
return Action.IDLE;
}
for (int i = 0; i < active.size(); ++i)
{
Entry entry = active.get(i);
Throwable failure = entry.generate(lease);
if (failure != null)
{
// Failure to generate the entry is catastrophic.
failed(failure);
return Action.SUCCEEDED;
}
}
List<ByteBuffer> byteBuffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), active.size(), active);
session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
lease.recycle();
// Transfer active items to avoid reentrancy.
for (int i = 0; i < active.size(); ++i)
complete.add(active.get(i));
active.clear();
if (LOG.isDebugEnabled())
LOG.debug("Written {} frames for {}", complete.size(), complete);
// Drain the frames one by one to avoid reentrancy.
while (!complete.isEmpty())
{
Entry entry = complete.poll();
entry.succeeded();
}
super.succeeded();
}
@Override
protected void onCompleteSuccess()
{
throw new IllegalStateException();
}
@Override
protected void onCompleteFailure(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
lease.recycle();
// Transfer active items to avoid reentrancy.
for (int i = 0; i < active.size(); ++i)
complete.add(active.get(i));
active.clear();
// Drain the frames one by one to avoid reentrancy.
while (!complete.isEmpty())
{
Entry entry = complete.poll();
entry.failed(x);
}
terminate(x);
}
@Override
public void close()
{
super.close();
iterate();
}
private void terminate(Throwable x)
{
Queue<Entry> queued;
synchronized (this)
{
queued = new ArrayDeque<>(frames);
frames.clear();
}
if (LOG.isDebugEnabled())
LOG.debug("Terminating, queued={}", queued.size());
for (Entry entry : queued)
closed(entry, x);
session.disconnect();
}
private void closed(Entry entry, Throwable failure)
{
entry.failed(failure);
}
public static abstract class Entry implements Callback
{
protected final Frame frame;
protected final IStream stream;
protected final Callback callback;
protected Entry(Frame frame, IStream stream, Callback callback)
{
this.frame = frame;
this.stream = stream;
this.callback = callback;
}
public int dataRemaining()
{
return 0;
}
public Throwable generate(ByteBufferPool.Lease lease)
{
return null;
}
public void reset()
{
failed(new EOFException("reset"));
}
@Override
public void failed(Throwable x)
{
callback.failed(x);
}
@Override
public String toString()
{
return frame.toString();
}
}
private class WindowEntry
{
private final IStream stream;
private final WindowUpdateFrame frame;
public WindowEntry(IStream stream, WindowUpdateFrame frame)
{
this.stream = stream;
this.frame = frame;
}
public void perform()
{
FlowControl flowControl = session.getFlowControl();
flowControl.onWindowUpdate(session, stream, frame);
}
}
}

View File

@ -18,18 +18,13 @@
package org.eclipse.jetty.http2;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@ -52,11 +47,9 @@ import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -86,7 +79,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final Generator generator;
private final Listener listener;
private final FlowControl flowControl;
private final Flusher flusher;
private final HTTP2Flusher flusher;
private int maxLocalStreams;
private int maxRemoteStreams;
@ -97,13 +90,18 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
this.generator = generator;
this.listener = listener;
this.flowControl = flowControl;
this.flusher = new Flusher(4);
this.flusher = new HTTP2Flusher(this);
this.maxLocalStreams = maxStreams;
this.maxRemoteStreams = maxStreams;
this.streamIds.set(initialStreamId);
this.windowSize.set(flowControl.getInitialWindowSize());
}
public FlowControl getFlowControl()
{
return flowControl;
}
public int getMaxRemoteStreams()
{
return maxRemoteStreams;
@ -114,6 +112,11 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
this.maxRemoteStreams = maxRemoteStreams;
}
public EndPoint getEndPoint()
{
return endPoint;
}
public Generator getGenerator()
{
return generator;
@ -131,13 +134,13 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
stream.updateClose(frame.isEndStream(), false);
// The flow control length includes the padding bytes.
final int flowControlLength = frame.remaining() + frame.padding();
flowControl.onDataReceived(this, stream, flowControlLength);
flowControl.onDataReceived(stream, flowControlLength);
boolean result = stream.process(frame, new Callback.Adapter()
{
@Override
public void succeeded()
{
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
flowControl.onDataConsumed(stream, flowControlLength);
}
});
if (stream.isClosed())
@ -263,7 +266,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
flusher.close();
disconnect();
notifyClose(this, frame);
@ -291,13 +293,16 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
int streamId = frame.getStreamId();
IStream stream = null;
if (streamId > 0)
stream = getStream(streamId);
flowControl.onWindowUpdate(this, stream, frame);
// Flush stalled data.
flusher.iterate();
{
IStream stream = getStream(streamId);
if (stream != null)
onUpdateWindowSize(stream, frame);
}
else
{
onUpdateWindowSize(null, frame);
}
return false;
}
@ -325,7 +330,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
stream.updateClose(frame.isEndStream(), true);
stream.setListener(listener);
FlusherEntry entry = new FlusherEntry(stream, frame, new PromiseCallback<>(promise, stream));
ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
flusher.append(entry);
}
// Iterate outside the synchronized block.
@ -373,17 +378,17 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
public void control(IStream stream, Frame frame, Callback callback)
{
// We want to generate as late as possible to allow re-prioritization.
frame(new FlusherEntry(stream, frame, callback));
frame(new ControlEntry(frame, stream, callback));
}
@Override
public void data(IStream stream, DataFrame frame, Callback callback)
{
// We want to generate as late as possible to allow re-prioritization.
frame(new DataFlusherEntry(stream, frame, callback));
frame(new DataEntry(frame, stream, callback));
}
private void frame(FlusherEntry entry)
private void frame(HTTP2Flusher.Entry entry)
{
if (LOG.isDebugEnabled())
LOG.debug("Sending {}", entry.frame);
@ -494,6 +499,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
return result;
}
@Override
public IStream getStream(int streamId)
{
return streams.get(streamId);
@ -510,17 +516,30 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
return windowSize.getAndAdd(delta);
}
@Override
public void onUpdateWindowSize(IStream stream, WindowUpdateFrame frame)
{
// WindowUpdateFrames arrive concurrently with writes.
// Increasing (or reducing) the window size concurrently
// with writes requires coordination with the flusher, that
// decides how many frames to write depending on the available
// window sizes. If the window sizes vary concurrently, the
// flusher may take non-optimal or wrong decisions.
// Here, we "queue" window updates to the flusher, so it will
// be the only component responsible for window updates, for
// both increments and reductions.
flusher.window(stream, frame);
}
@Override
public void shutdown()
{
if (LOG.isDebugEnabled())
LOG.debug("Shutting down");
// Append a fake FlusherEntry that disconnects when the queue is drained.
flusher.append(new ShutdownFlusherEntry());
flusher.iterate();
flusher.close();
}
@Override
public void disconnect()
{
if (LOG.isDebugEnabled())
@ -606,265 +625,30 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
hashCode(), flusher.getQueueSize(), windowSize, streams.size());
}
private class Flusher extends IteratingCallback
private class ControlEntry extends HTTP2Flusher.Entry
{
private final ArrayQueue<FlusherEntry> queue = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH);
private final Map<IStream, Integer> streams = new HashMap<>();
private final List<FlusherEntry> reset = new ArrayList<>();
private final ByteBufferPool.Lease lease = new ByteBufferPool.Lease(generator.getByteBufferPool());
private final int maxGather;
private final List<FlusherEntry> active;
private final Queue<FlusherEntry> complete;
private Flusher(int maxGather)
private ControlEntry(Frame frame, IStream stream, Callback callback)
{
this.maxGather = maxGather;
this.active = new ArrayList<>(maxGather);
this.complete = new ArrayDeque<>(maxGather);
super(frame, stream, callback);
}
private void append(FlusherEntry entry)
{
boolean fail = false;
synchronized (queue)
{
if (isClosed())
fail = true;
else
queue.offer(entry);
if (LOG.isDebugEnabled())
LOG.debug("Appended {}, queue={}", entry, queue.size());
}
if (fail)
closed(entry);
}
private void prepend(FlusherEntry entry)
{
boolean fail = false;
synchronized (queue)
{
if (isClosed())
fail = true;
else
queue.add(0, entry);
}
if (fail)
closed(entry);
}
private int getQueueSize()
{
synchronized (queue)
{
return queue.size();
}
}
@Override
protected Action process() throws Exception
{
synchronized (queue)
{
// The session window size may vary concurrently upon receipt of
// WINDOW_UPDATE or SETTINGS so we read it here and not in the loop.
// The stream window size is read in the loop, but it's always
// capped by the session window size.
int sessionWindow = getWindowSize();
int nonStalledIndex = 0;
int size = queue.size();
while (nonStalledIndex < size)
{
FlusherEntry entry = queue.get(nonStalledIndex);
IStream stream = entry.stream;
int remaining = 0;
if (entry.frame instanceof DataFrame)
{
DataFrame dataFrame = (DataFrame)entry.frame;
remaining = dataFrame.remaining();
if (remaining > 0)
{
// Is the session stalled ?
if (sessionWindow <= 0)
{
flowControl.onSessionStalled(HTTP2Session.this);
++nonStalledIndex;
// There may be *non* flow controlled frames to send.
continue;
}
// The stream may have a smaller window than the session.
Integer streamWindow = streams.get(stream);
if (streamWindow == null)
{
streamWindow = stream.getWindowSize();
streams.put(stream, streamWindow);
}
// Is it a frame belonging to an already stalled stream ?
if (streamWindow <= 0)
{
flowControl.onStreamStalled(stream);
++nonStalledIndex;
// There may be *non* flow controlled frames to send.
continue;
}
}
}
// We will be possibly writing this frame.
queue.remove(nonStalledIndex);
--size;
// If the stream has been reset, don't send flow controlled frames.
if (stream != null && stream.isReset() && remaining > 0)
{
reset.add(entry);
continue;
}
// Reduce the flow control windows.
sessionWindow -= remaining;
if (stream != null && remaining > 0)
streams.put(stream, streams.get(stream) - remaining);
active.add(entry);
if (active.size() == maxGather)
break;
}
streams.clear();
}
for (int i = 0; i < reset.size(); ++i)
{
FlusherEntry entry = reset.get(i);
entry.reset();
}
reset.clear();
if (active.isEmpty())
return Action.IDLE;
for (int i = 0; i < active.size(); ++i)
{
FlusherEntry entry = active.get(i);
entry.generate(lease);
}
List<ByteBuffer> byteBuffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), active.size(), active);
endPoint.write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
lease.recycle();
// Transfer active items to avoid reentrancy.
for (int i = 0; i < active.size(); ++i)
complete.add(active.get(i));
active.clear();
if (LOG.isDebugEnabled())
LOG.debug("Written {} frames for {}", complete.size(), complete);
// Drain the queue one by one to avoid reentrancy.
while (!complete.isEmpty())
{
FlusherEntry entry = complete.poll();
entry.succeeded();
}
super.succeeded();
}
@Override
protected void onCompleteSuccess()
{
throw new IllegalStateException();
}
@Override
protected void onCompleteFailure(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
lease.recycle();
// Transfer active items to avoid reentrancy.
for (int i = 0; i < active.size(); ++i)
complete.add(active.get(i));
active.clear();
// Drain the queue one by one to avoid reentrancy.
while (!complete.isEmpty())
{
FlusherEntry entry = complete.poll();
entry.failed(x);
}
}
public void close()
{
super.close();
Queue<FlusherEntry> queued;
synchronized (queue)
{
queued = new ArrayDeque<>(queue);
queue.clear();
}
if (LOG.isDebugEnabled())
LOG.debug("Closing, queued={}", queued.size());
for (FlusherEntry item : queued)
closed(item);
}
protected void closed(FlusherEntry item)
{
item.failed(new ClosedChannelException());
}
}
private class FlusherEntry implements Callback
{
protected final IStream stream;
protected final Frame frame;
protected final Callback callback;
private FlusherEntry(IStream stream, Frame frame, Callback callback)
{
this.stream = stream;
this.frame = frame;
this.callback = callback;
}
public void generate(ByteBufferPool.Lease lease)
public Throwable generate(ByteBufferPool.Lease lease)
{
try
{
generator.control(lease, frame);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}", frame);
return null;
}
catch (Throwable x)
{
LOG.debug("Frame generation failure", x);
failed(x);
if (LOG.isDebugEnabled())
LOG.debug("Failure generating frame " + frame, x);
return x;
}
}
public void reset()
{
callback.failed(new EOFException("reset"));
}
@Override
public void succeeded()
{
@ -888,51 +672,64 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
if (stream != null)
stream.close();
close(ErrorCodes.INTERNAL_ERROR, "generator_error", Adapter.INSTANCE);
callback.failed(x);
}
@Override
public String toString()
{
return frame.toString();
}
}
private class DataFlusherEntry extends FlusherEntry
private class DataEntry extends HTTP2Flusher.Entry
{
private int length;
private DataFlusherEntry(IStream stream, DataFrame frame, Callback callback)
private DataEntry(DataFrame frame, IStream stream, Callback callback)
{
super(stream, frame, callback);
super(frame, stream, callback);
}
public void generate(ByteBufferPool.Lease lease)
@Override
public int dataRemaining()
{
DataFrame dataFrame = (DataFrame)frame;
int flowControlLength = dataFrame.remaining() + dataFrame.padding();
// We don't do any padding, so the flow control length is
// always the data remaining. This simplifies the handling
// of data frames that cannot be completely written due to
// the flow control window exhausting, since in that case
// we would have to count the padding only once.
return ((DataFrame)frame).remaining();
}
int streamWindowSize = stream.getWindowSize();
int sessionWindowSize = getWindowSize();
int windowSize = Math.min(streamWindowSize, sessionWindowSize);
public Throwable generate(ByteBufferPool.Lease lease)
{
try
{
int flowControlLength = dataRemaining();
length = Math.min(flowControlLength, windowSize);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}, maxLength={}", dataFrame, length);
generator.data(lease, dataFrame, length);
int sessionWindowSize = getWindowSize();
if (sessionWindowSize < 0)
throw new IllegalStateException();
int streamWindowSize = stream.getWindowSize();
if (streamWindowSize < 0)
throw new IllegalStateException();
int windowSize = Math.min(streamWindowSize, sessionWindowSize);
int length = this.length = Math.min(flowControlLength, windowSize);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}, length/window={}/{}", frame, length, windowSize);
generator.data(lease, (DataFrame)frame, length);
flowControl.onDataSending(stream, length);
return null;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Failure generating frame " + frame, x);
return x;
}
}
@Override
public void succeeded()
{
flowControl.onDataSent(HTTP2Session.this, stream, length);
flowControl.onDataSent(stream, length);
// Do we have more to send ?
DataFrame dataFrame = (DataFrame)frame;
if (dataFrame.remaining() > 0)
@ -954,39 +751,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
private class ShutdownFlusherEntry extends FlusherEntry
{
public ShutdownFlusherEntry()
{
super(null, null, Adapter.INSTANCE);
}
@Override
public void generate(ByteBufferPool.Lease lease)
{
}
@Override
public void succeeded()
{
flusher.close();
disconnect();
}
@Override
public void failed(Throwable x)
{
flusher.close();
disconnect();
}
@Override
public String toString()
{
return String.format("%s@%x", "ShutdownFrame", hashCode());
}
}
private class PromiseCallback<C> implements Callback
{
private final Promise<C> promise;

View File

@ -21,12 +21,13 @@ package org.eclipse.jetty.http2;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Callback;
public interface ISession extends Session
{
@Override
IStream getStream(int streamId);
public IStream getStream(int streamId);
public void control(IStream stream, Frame frame, Callback callback);
@ -34,6 +35,8 @@ public interface ISession extends Session
public int updateWindowSize(int delta);
public void onUpdateWindowSize(IStream stream, WindowUpdateFrame frame);
public void shutdown();
public void disconnect();