Fixed failure of stalled frames.

This commit is contained in:
Simone Bordet 2015-02-05 14:44:04 +01:00
parent 02b5732720
commit fa72356d1d
3 changed files with 124 additions and 65 deletions

View File

@ -43,6 +43,7 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
@ -729,4 +730,55 @@ public class FlowControlTest extends AbstractTest
// Expect the connection to be closed.
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testFlowControlWhenServerResetsStream() throws Exception
{
// On server, we don't consume the data and we immediately reset.
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame frame = new HeadersFrame(0, metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
final CountDownLatch resetLatch = new CountDownLatch(1);
session.newStream(frame, streamPromise, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
// Perform a big upload that will stall the flow control windows.
ByteBuffer data = ByteBuffer.allocate(5 * FlowControl.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, true), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
dataLatch.countDown();
}
});
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(555, TimeUnit.SECONDS));
// Wait a little more for the window updates to be processed.
Thread.sleep(1000);
// At this point the session window should be fully available.
HTTP2Session http2Session = (HTTP2Session)session;
Assert.assertEquals(FlowControl.DEFAULT_WINDOW_SIZE, http2Session.getSendWindow());
}
}

View File

@ -45,18 +45,16 @@ public class HTTP2Flusher extends IteratingCallback
private final Deque<WindowEntry> windows = new ArrayDeque<>();
private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this);
private final Map<IStream, Integer> streams = new HashMap<>();
private final List<Entry> reset = new ArrayList<>();
private final List<Entry> resets = new ArrayList<>();
private final List<Entry> actives = new ArrayList<>();
private final Queue<Entry> completes = new ArrayDeque<>();
private final HTTP2Session session;
private final ByteBufferPool.Lease lease;
private final List<Entry> active;
private final Queue<Entry> complete;
public HTTP2Flusher(HTTP2Session session)
{
this.session = session;
this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool());
this.active = new ArrayList<>();
this.complete = new ArrayDeque<>();
}
public void window(IStream stream, WindowUpdateFrame frame)
@ -114,6 +112,17 @@ public class HTTP2Flusher extends IteratingCallback
return !fail;
}
private Entry remove(int index)
{
synchronized (this)
{
if (index == 0)
return frames.pollUnsafe();
else
return frames.remove(index);
}
}
public int getQueueSize()
{
synchronized (this)
@ -148,10 +157,20 @@ public class HTTP2Flusher extends IteratingCallback
while (index < size)
{
Entry entry = frames.get(index);
// We need to compute how many frames fit in the windows.
IStream stream = entry.stream;
// If the stream has been reset, don't send the frame.
if (stream != null && stream.isReset() && !entry.isProtocol())
{
remove(index);
--size;
resets.add(entry);
if (LOG.isDebugEnabled())
LOG.debug("Gathered for reset {}", entry);
continue;
}
// Check if the frame fits in the flow control windows.
int remaining = entry.dataRemaining();
if (remaining > 0)
{
@ -163,64 +182,52 @@ public class HTTP2Flusher extends IteratingCallback
continue;
}
// The stream may have a smaller window than the session.
Integer streamWindow = streams.get(stream);
if (streamWindow == null)
if (stream != null)
{
streamWindow = stream.getSendWindow();
streams.put(stream, streamWindow);
// The stream may have a smaller window than the session.
Integer streamWindow = streams.get(stream);
if (streamWindow == null)
{
streamWindow = stream.getSendWindow();
streams.put(stream, streamWindow);
}
// Is it a frame belonging to an already stalled stream ?
if (streamWindow <= 0)
{
session.getFlowControl().onStreamStalled(stream);
++index;
// There may be *non* flow controlled frames to send.
continue;
}
}
// Is it a frame belonging to an already stalled stream ?
if (streamWindow <= 0)
{
session.getFlowControl().onStreamStalled(stream);
++index;
// There may be *non* flow controlled frames to send.
continue;
}
}
// We will be possibly writing this
// frame, remove it from the queue.
if (index == 0)
frames.pollUnsafe();
else
frames.remove(index);
--size;
// If the stream has been reset, don't send the frame.
if (stream != null && stream.isReset() && !entry.isProtocol())
{
reset.add(entry);
continue;
}
// Reduce the flow control windows.
if (remaining > 0)
{
// The frame fits both flow control windows, reduce them.
sessionWindow -= remaining;
streams.put(stream, streams.get(stream) - remaining);
if (stream != null)
streams.put(stream, streams.get(stream) - remaining);
}
// The frame will be written.
active.add(entry);
// The frame will be written, remove it from the queue.
remove(index);
--size;
actives.add(entry);
if (LOG.isDebugEnabled())
LOG.debug("Gathered {}", entry);
LOG.debug("Gathered for write {}", entry);
}
streams.clear();
}
// Perform resets outside the sync block.
for (int i = 0; i < reset.size(); ++i)
for (int i = 0; i < resets.size(); ++i)
{
Entry entry = reset.get(i);
Entry entry = resets.get(i);
entry.reset();
}
reset.clear();
resets.clear();
if (active.isEmpty())
if (actives.isEmpty())
{
if (isClosed())
terminate(new ClosedChannelException());
@ -231,9 +238,9 @@ public class HTTP2Flusher extends IteratingCallback
return Action.IDLE;
}
for (int i = 0; i < active.size(); ++i)
for (int i = 0; i < actives.size(); ++i)
{
Entry entry = active.get(i);
Entry entry = actives.get(i);
Throwable failure = entry.generate(lease);
if (failure != null)
{
@ -245,7 +252,7 @@ public class HTTP2Flusher extends IteratingCallback
List<ByteBuffer> byteBuffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), active.size(), active);
LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives);
session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
return Action.SCHEDULED;
}
@ -256,17 +263,17 @@ public class HTTP2Flusher extends IteratingCallback
lease.recycle();
// Transfer active items to avoid reentrancy.
for (int i = 0; i < active.size(); ++i)
complete.add(active.get(i));
active.clear();
for (int i = 0; i < actives.size(); ++i)
completes.add(actives.get(i));
actives.clear();
if (LOG.isDebugEnabled())
LOG.debug("Written {} frames for {}", complete.size(), complete);
LOG.debug("Written {} frames for {}", completes.size(), completes);
// Drain the frames one by one to avoid reentrancy.
while (!complete.isEmpty())
while (!completes.isEmpty())
{
Entry entry = complete.poll();
Entry entry = completes.poll();
entry.succeeded();
}
@ -288,14 +295,14 @@ public class HTTP2Flusher extends IteratingCallback
lease.recycle();
// Transfer active items to avoid reentrancy.
for (int i = 0; i < active.size(); ++i)
complete.add(active.get(i));
active.clear();
for (int i = 0; i < actives.size(); ++i)
completes.add(actives.get(i));
actives.clear();
// Drain the frames one by one to avoid reentrancy.
while (!complete.isEmpty())
while (!completes.isEmpty())
{
Entry entry = complete.poll();
Entry entry = completes.poll();
entry.failed(x);
}

View File

@ -682,12 +682,12 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
return streams.get(streamId);
}
protected int getSendWindow()
public int getSendWindow()
{
return sendWindow.get();
}
protected int getRecvWindow()
public int getRecvWindow()
{
return recvWindow.get();
}