diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCountTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCountTest.java index ecd0549434c..215a8232526 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCountTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCountTest.java @@ -18,11 +18,13 @@ package org.eclipse.jetty.http2.client; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpVersion; @@ -35,11 +37,14 @@ import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; +import org.eclipse.jetty.http2.generator.Generator; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FuturePromise; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -81,7 +86,7 @@ public class StreamCountTest extends AbstractTest } }); - final CountDownLatch settingsLatch = new CountDownLatch(1); + CountDownLatch settingsLatch = new CountDownLatch(1); Session session = newClient(new Session.Listener.Adapter() { @Override @@ -97,7 +102,7 @@ public class StreamCountTest extends AbstractTest MetaData.Request metaData = newRequest("GET", fields); HeadersFrame frame1 = new HeadersFrame(metaData, null, false); FuturePromise streamPromise1 = new FuturePromise<>(); - final CountDownLatch responseLatch = new CountDownLatch(1); + CountDownLatch responseLatch = new CountDownLatch(1); session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter() { @Override @@ -123,7 +128,6 @@ public class StreamCountTest extends AbstractTest @Test public void testServerAllowsOneStreamEnforcedByServer() throws Exception { - final CountDownLatch resetLatch = new CountDownLatch(1); start(new ServerSessionListener.Adapter() { @Override @@ -152,13 +156,21 @@ public class StreamCountTest extends AbstractTest } }); - Session session = newClient(new Session.Listener.Adapter()); + CountDownLatch sessionResetLatch = new CountDownLatch(2); + Session session = newClient(new Session.Listener.Adapter() + { + @Override + public void onReset(Session session, ResetFrame frame) + { + sessionResetLatch.countDown(); + } + }); HttpFields fields = new HttpFields(); MetaData.Request metaData = newRequest("GET", fields); HeadersFrame frame1 = new HeadersFrame(metaData, null, false); FuturePromise streamPromise1 = new FuturePromise<>(); - final CountDownLatch responseLatch = new CountDownLatch(1); + CountDownLatch responseLatch = new CountDownLatch(1); session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter() { @Override @@ -173,17 +185,39 @@ public class StreamCountTest extends AbstractTest HeadersFrame frame2 = new HeadersFrame(metaData, null, false); FuturePromise streamPromise2 = new FuturePromise<>(); + AtomicReference resetLatch = new AtomicReference<>(new CountDownLatch(1)); session.newStream(frame2, streamPromise2, new Stream.Listener.Adapter() { @Override public void onReset(Stream stream, ResetFrame frame) { - resetLatch.countDown(); + resetLatch.get().countDown(); } }); - streamPromise2.get(5, TimeUnit.SECONDS); - assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + Stream stream2 = streamPromise2.get(5, TimeUnit.SECONDS); + assertTrue(resetLatch.get().await(5, TimeUnit.SECONDS)); + + // Reset the latch and send a DATA frame, it should be dropped + // by the client because the stream has already been reset. + resetLatch.set(new CountDownLatch(1)); + stream2.data(new DataFrame(stream2.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); + // Must not receive another RST_STREAM. + assertFalse(resetLatch.get().await(1, TimeUnit.SECONDS)); + + // Simulate a client sending both HEADERS and DATA frames at the same time. + // The server should send a RST_STREAM for the HEADERS. + // For the server, dropping the DATA frame is too costly so it sends another RST_STREAM. + int streamId3 = stream2.getId() + 2; + HeadersFrame frame3 = new HeadersFrame(streamId3, metaData, null, false); + DataFrame data3 = new DataFrame(streamId3, BufferUtil.EMPTY_BUFFER, true); + Generator generator = ((HTTP2Session)session).getGenerator(); + ByteBufferPool.Lease lease = new ByteBufferPool.Lease(generator.getByteBufferPool()); + generator.control(lease, frame3); + generator.data(lease, data3, data3.remaining()); + ((HTTP2Session)session).getEndPoint().write(Callback.NOOP, lease.getByteBuffers().toArray(new ByteBuffer[0])); + // Expect 2 RST_STREAM frames. + assertTrue(sessionResetLatch.await(5, TimeUnit.SECONDS)); stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index ef3ff260812..2248ed0990e 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -772,6 +772,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio int maxCount = getMaxRemoteStreams(); if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount) { + updateLastRemoteStreamId(streamId); reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP); return null; }