From fa72356d1de50aabfb71cedfeae705e1c682696a Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 5 Feb 2015 14:44:04 +0100 Subject: [PATCH] Fixed failure of stalled frames. --- .../jetty/http2/client/FlowControlTest.java | 52 +++++++ .../org/eclipse/jetty/http2/HTTP2Flusher.java | 133 +++++++++--------- .../org/eclipse/jetty/http2/HTTP2Session.java | 4 +- 3 files changed, 124 insertions(+), 65 deletions(-) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlTest.java index 3163e60c9b4..e80b1a2a6a4 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlTest.java @@ -43,6 +43,7 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.Callback; @@ -729,4 +730,55 @@ public class FlowControlTest extends AbstractTest // Expect the connection to be closed. Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testFlowControlWhenServerResetsStream() throws Exception + { + // On server, we don't consume the data and we immediately reset. + startServer(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE); + return null; + } + }); + + Session session = newClient(new Session.Listener.Adapter()); + MetaData.Request metaData = newRequest("POST", new HttpFields()); + HeadersFrame frame = new HeadersFrame(0, metaData, null, false); + FuturePromise streamPromise = new FuturePromise<>(); + final CountDownLatch resetLatch = new CountDownLatch(1); + session.newStream(frame, streamPromise, new Stream.Listener.Adapter() + { + @Override + public void onReset(Stream stream, ResetFrame frame) + { + resetLatch.countDown(); + } + }); + Stream stream = streamPromise.get(5, TimeUnit.SECONDS); + // Perform a big upload that will stall the flow control windows. + ByteBuffer data = ByteBuffer.allocate(5 * FlowControl.DEFAULT_WINDOW_SIZE); + final CountDownLatch dataLatch = new CountDownLatch(1); + stream.data(new DataFrame(stream.getId(), data, true), new Callback.Adapter() + { + @Override + public void failed(Throwable x) + { + dataLatch.countDown(); + } + }); + + Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(dataLatch.await(555, TimeUnit.SECONDS)); + + // Wait a little more for the window updates to be processed. + Thread.sleep(1000); + + // At this point the session window should be fully available. + HTTP2Session http2Session = (HTTP2Session)session; + Assert.assertEquals(FlowControl.DEFAULT_WINDOW_SIZE, http2Session.getSendWindow()); + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index aafc4accbf2..452afcb8569 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -45,18 +45,16 @@ public class HTTP2Flusher extends IteratingCallback private final Deque windows = new ArrayDeque<>(); private final ArrayQueue frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this); private final Map streams = new HashMap<>(); - private final List reset = new ArrayList<>(); + private final List resets = new ArrayList<>(); + private final List actives = new ArrayList<>(); + private final Queue completes = new ArrayDeque<>(); private final HTTP2Session session; private final ByteBufferPool.Lease lease; - private final List active; - private final Queue complete; public HTTP2Flusher(HTTP2Session session) { this.session = session; this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool()); - this.active = new ArrayList<>(); - this.complete = new ArrayDeque<>(); } public void window(IStream stream, WindowUpdateFrame frame) @@ -114,6 +112,17 @@ public class HTTP2Flusher extends IteratingCallback return !fail; } + private Entry remove(int index) + { + synchronized (this) + { + if (index == 0) + return frames.pollUnsafe(); + else + return frames.remove(index); + } + } + public int getQueueSize() { synchronized (this) @@ -148,10 +157,20 @@ public class HTTP2Flusher extends IteratingCallback while (index < size) { Entry entry = frames.get(index); - - // We need to compute how many frames fit in the windows. - IStream stream = entry.stream; + + // If the stream has been reset, don't send the frame. + if (stream != null && stream.isReset() && !entry.isProtocol()) + { + remove(index); + --size; + resets.add(entry); + if (LOG.isDebugEnabled()) + LOG.debug("Gathered for reset {}", entry); + continue; + } + + // Check if the frame fits in the flow control windows. int remaining = entry.dataRemaining(); if (remaining > 0) { @@ -163,64 +182,52 @@ public class HTTP2Flusher extends IteratingCallback continue; } - // The stream may have a smaller window than the session. - Integer streamWindow = streams.get(stream); - if (streamWindow == null) + if (stream != null) { - streamWindow = stream.getSendWindow(); - streams.put(stream, streamWindow); + // The stream may have a smaller window than the session. + Integer streamWindow = streams.get(stream); + if (streamWindow == null) + { + streamWindow = stream.getSendWindow(); + streams.put(stream, streamWindow); + } + + // Is it a frame belonging to an already stalled stream ? + if (streamWindow <= 0) + { + session.getFlowControl().onStreamStalled(stream); + ++index; + // There may be *non* flow controlled frames to send. + continue; + } } - // Is it a frame belonging to an already stalled stream ? - if (streamWindow <= 0) - { - session.getFlowControl().onStreamStalled(stream); - ++index; - // There may be *non* flow controlled frames to send. - continue; - } - } - - // We will be possibly writing this - // frame, remove it from the queue. - if (index == 0) - frames.pollUnsafe(); - else - frames.remove(index); - --size; - - // If the stream has been reset, don't send the frame. - if (stream != null && stream.isReset() && !entry.isProtocol()) - { - reset.add(entry); - continue; - } - - // Reduce the flow control windows. - if (remaining > 0) - { + // The frame fits both flow control windows, reduce them. sessionWindow -= remaining; - streams.put(stream, streams.get(stream) - remaining); + if (stream != null) + streams.put(stream, streams.get(stream) - remaining); } - // The frame will be written. - active.add(entry); + // The frame will be written, remove it from the queue. + remove(index); + --size; + actives.add(entry); if (LOG.isDebugEnabled()) - LOG.debug("Gathered {}", entry); + LOG.debug("Gathered for write {}", entry); } streams.clear(); } // Perform resets outside the sync block. - for (int i = 0; i < reset.size(); ++i) + for (int i = 0; i < resets.size(); ++i) { - Entry entry = reset.get(i); + Entry entry = resets.get(i); entry.reset(); } - reset.clear(); + resets.clear(); - if (active.isEmpty()) + if (actives.isEmpty()) { if (isClosed()) terminate(new ClosedChannelException()); @@ -231,9 +238,9 @@ public class HTTP2Flusher extends IteratingCallback return Action.IDLE; } - for (int i = 0; i < active.size(); ++i) + for (int i = 0; i < actives.size(); ++i) { - Entry entry = active.get(i); + Entry entry = actives.get(i); Throwable failure = entry.generate(lease); if (failure != null) { @@ -245,7 +252,7 @@ public class HTTP2Flusher extends IteratingCallback List byteBuffers = lease.getByteBuffers(); if (LOG.isDebugEnabled()) - LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), active.size(), active); + LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives); session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()])); return Action.SCHEDULED; } @@ -256,17 +263,17 @@ public class HTTP2Flusher extends IteratingCallback lease.recycle(); // Transfer active items to avoid reentrancy. - for (int i = 0; i < active.size(); ++i) - complete.add(active.get(i)); - active.clear(); + for (int i = 0; i < actives.size(); ++i) + completes.add(actives.get(i)); + actives.clear(); if (LOG.isDebugEnabled()) - LOG.debug("Written {} frames for {}", complete.size(), complete); + LOG.debug("Written {} frames for {}", completes.size(), completes); // Drain the frames one by one to avoid reentrancy. - while (!complete.isEmpty()) + while (!completes.isEmpty()) { - Entry entry = complete.poll(); + Entry entry = completes.poll(); entry.succeeded(); } @@ -288,14 +295,14 @@ public class HTTP2Flusher extends IteratingCallback lease.recycle(); // Transfer active items to avoid reentrancy. - for (int i = 0; i < active.size(); ++i) - complete.add(active.get(i)); - active.clear(); + for (int i = 0; i < actives.size(); ++i) + completes.add(actives.get(i)); + actives.clear(); // Drain the frames one by one to avoid reentrancy. - while (!complete.isEmpty()) + while (!completes.isEmpty()) { - Entry entry = complete.poll(); + Entry entry = completes.poll(); entry.failed(x); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index e3c86be2b71..c06d9199ab5 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -682,12 +682,12 @@ public abstract class HTTP2Session implements ISession, Parser.Listener return streams.get(streamId); } - protected int getSendWindow() + public int getSendWindow() { return sendWindow.get(); } - protected int getRecvWindow() + public int getRecvWindow() { return recvWindow.get(); }