diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index d81debdfa2d..129a38c2983 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -46,6 +46,8 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.FlowControlStrategy; +import org.eclipse.jetty.http2.HTTP2Flusher; +import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Session; @@ -54,6 +56,7 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpConfiguration; @@ -631,6 +634,55 @@ public class StreamResetTest extends AbstractTest Assert.assertTrue(writeLatch.await(5, TimeUnit.SECONDS)); } + @Test + public void testResetAfterBlockingWrite() throws Exception + { + int windowSize = FlowControlStrategy.DEFAULT_WINDOW_SIZE; + CountDownLatch writeLatch = new CountDownLatch(1); + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + try + { + ServletOutputStream output = response.getOutputStream(); + output.write(new byte[10 * windowSize]); + } + catch (IOException e) + { + writeLatch.countDown(); + } + } + }); + + AtomicLong received = new AtomicLong(); + CountDownLatch latch = new CountDownLatch(1); + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, true); + FuturePromise promise = new FuturePromise<>(); + client.newStream(frame, promise, new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (received.addAndGet(frame.getData().remaining()) == windowSize) + latch.countDown(); + } + }); + Stream stream = promise.get(5, TimeUnit.SECONDS); + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + + // Reset. + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); + Assert.assertTrue(writeLatch.await(5, TimeUnit.SECONDS)); + + HTTP2Session session = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class).getBean(HTTP2Session.class); + HTTP2Flusher flusher = session.getBean(HTTP2Flusher.class); + Assert.assertEquals(0, flusher.getFrameQueueSize()); + } + @Test public void testResetAfterAsyncRequestAsyncWriteStalledByFlowControl() throws Exception { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index c9ae1b9919c..b1839af5535 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -116,7 +116,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable } } - private int getFrameQueueSize() + public int getFrameQueueSize() { synchronized (this) { @@ -141,15 +141,12 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable entry.perform(); } - if (!frames.isEmpty()) + for (Entry entry : frames) { - for (Entry entry : frames) - { - entries.offer(entry); - actives.add(entry); - } - frames.clear(); + entries.offer(entry); + actives.add(entry); } + frames.clear(); } @@ -166,11 +163,11 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable if (LOG.isDebugEnabled()) LOG.debug("Processing {}", entry); - // If the stream has been reset, don't send the frame. - if (entry.reset()) + // If the stream has been reset or removed, don't send the frame. + if (entry.isStale()) { if (LOG.isDebugEnabled()) - LOG.debug("Resetting {}", entry); + LOG.debug("Stale {}", entry); continue; } @@ -328,7 +325,6 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable { protected final Frame frame; protected final IStream stream; - private boolean reset; protected Entry(Frame frame, IStream stream, Callback callback) { @@ -346,7 +342,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable private void complete() { - if (reset) + if (!isProtocol() && stream != null && stream.isReset()) failed(new EofException("reset")); else succeeded(); @@ -363,23 +359,31 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable super.failed(x); } - private boolean reset() + private boolean isStale() { - return this.reset = stream != null && stream.isReset() && !isProtocol(); + return !isProtocol() && stream != null && (stream.isReset() || stream.getSession().getStream(stream.getId()) == null); } private boolean isProtocol() { switch (frame.getType()) { + case DATA: + case HEADERS: + case PUSH_PROMISE: + case CONTINUATION: + return false; case PRIORITY: case RST_STREAM: + case SETTINGS: + case PING: case GO_AWAY: case WINDOW_UPDATE: + case PREFACE: case DISCONNECT: return true; default: - return false; + throw new IllegalStateException(); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index b1eeec41337..32e7c136323 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -282,7 +282,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio IStream stream = getStream(frame.getStreamId()); if (stream != null) - stream.process(frame, Callback.NOOP); + stream.process(frame, new ResetCallback()); else notifyReset(this, frame); } @@ -753,8 +753,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio IStream removed = streams.remove(stream.getId()); if (removed != null) { - assert removed == stream; - boolean local = stream.isLocal(); if (local) localStreamCount.decrementAndGet(); @@ -1329,6 +1327,32 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } + private class ResetCallback implements Callback + { + @Override + public void succeeded() + { + complete(); + } + + @Override + public void failed(Throwable x) + { + complete(); + } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + + private void complete() + { + flusher.iterate(); + } + } + private class CloseCallback implements Callback { private final int error; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index bd4b8a0f425..5f896929036 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -88,9 +88,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa @Override public void headers(HeadersFrame frame, Callback callback) { - if (!startWrite(callback)) - return; - session.frames(this, this, frame, Frame.EMPTY_ARRAY); + if (startWrite(callback)) + session.frames(this, this, frame, Frame.EMPTY_ARRAY); } @Override @@ -102,9 +101,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa @Override public void data(DataFrame frame, Callback callback) { - if (!startWrite(callback)) - return; - session.data(this, this, frame); + if (startWrite(callback)) + session.data(this, this, frame); } @Override @@ -292,8 +290,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa remoteReset = true; close(); session.removeStream(this); - callback.succeeded(); - notifyReset(this, frame); + notifyReset(this, frame, callback); } private void onPush(PushPromiseFrame frame, Callback callback) @@ -377,8 +374,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa @Override public void close() { - closeState.set(CloseState.CLOSED); - onClose(); + if (closeState.getAndSet(CloseState.CLOSED) != CloseState.CLOSED) + onClose(); } @Override @@ -417,14 +414,14 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa } } - private void notifyReset(Stream stream, ResetFrame frame) + private void notifyReset(Stream stream, ResetFrame frame, Callback callback) { final Listener listener = this.listener; if (listener == null) return; try { - listener.onReset(stream, frame); + listener.onReset(stream, frame, callback); } catch (Throwable x) { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java index 57e98024260..55dddf0ce99 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java @@ -163,6 +163,19 @@ public interface Stream */ public void onData(Stream stream, DataFrame frame, Callback callback); + public default void onReset(Stream stream, ResetFrame frame, Callback callback) + { + try + { + onReset(stream, frame); + callback.succeeded(); + } + catch (Throwable x) + { + callback.failed(x); + } + } + /** *

Callback method invoked when a RST_STREAM frame has been received for this stream.

* @@ -170,7 +183,9 @@ public interface Stream * @param frame the RST_FRAME received * @see Session.Listener#onReset(Session, ResetFrame) */ - public void onReset(Stream stream, ResetFrame frame); + public default void onReset(Stream stream, ResetFrame frame) + { + } /** *

Callback method invoked when the stream exceeds its idle timeout.

diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java index 10c654e7bc9..fb544fc494c 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java @@ -163,12 +163,12 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF } @Override - public void onReset(Stream stream, ResetFrame frame) + public void onReset(Stream stream, ResetFrame frame, Callback callback) { ErrorCode error = ErrorCode.from(frame.getError()); if (error == null) error = ErrorCode.CANCEL_STREAM_ERROR; - getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error), Callback.NOOP); + getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error), callback); } @Override