Merge pull request #4718 from eclipse/jetty-9.4.x-4714-unexpected_data_frame

Jetty 9.4.x 4714 unexpected data frame
This commit is contained in:
Simone Bordet 2020-03-26 17:50:33 +01:00 committed by GitHub
commit 6ab4cd306d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 8 deletions

View File

@ -18,11 +18,13 @@
package org.eclipse.jetty.http2.client; package org.eclipse.jetty.http2.client;
import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
@ -35,11 +37,14 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -81,7 +86,7 @@ public class StreamCountTest extends AbstractTest
} }
}); });
final CountDownLatch settingsLatch = new CountDownLatch(1); CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter() Session session = newClient(new Session.Listener.Adapter()
{ {
@Override @Override
@ -97,7 +102,7 @@ public class StreamCountTest extends AbstractTest
MetaData.Request metaData = newRequest("GET", fields); MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame1 = new HeadersFrame(metaData, null, false); HeadersFrame frame1 = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> streamPromise1 = new FuturePromise<>(); FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
final CountDownLatch responseLatch = new CountDownLatch(1); CountDownLatch responseLatch = new CountDownLatch(1);
session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter() session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter()
{ {
@Override @Override
@ -123,7 +128,6 @@ public class StreamCountTest extends AbstractTest
@Test @Test
public void testServerAllowsOneStreamEnforcedByServer() throws Exception public void testServerAllowsOneStreamEnforcedByServer() throws Exception
{ {
final CountDownLatch resetLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter() start(new ServerSessionListener.Adapter()
{ {
@Override @Override
@ -152,13 +156,21 @@ public class StreamCountTest extends AbstractTest
} }
}); });
Session session = newClient(new Session.Listener.Adapter()); CountDownLatch sessionResetLatch = new CountDownLatch(2);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onReset(Session session, ResetFrame frame)
{
sessionResetLatch.countDown();
}
});
HttpFields fields = new HttpFields(); HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields); MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame1 = new HeadersFrame(metaData, null, false); HeadersFrame frame1 = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> streamPromise1 = new FuturePromise<>(); FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
final CountDownLatch responseLatch = new CountDownLatch(1); CountDownLatch responseLatch = new CountDownLatch(1);
session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter() session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter()
{ {
@Override @Override
@ -173,17 +185,39 @@ public class StreamCountTest extends AbstractTest
HeadersFrame frame2 = new HeadersFrame(metaData, null, false); HeadersFrame frame2 = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> streamPromise2 = new FuturePromise<>(); FuturePromise<Stream> streamPromise2 = new FuturePromise<>();
AtomicReference<CountDownLatch> resetLatch = new AtomicReference<>(new CountDownLatch(1));
session.newStream(frame2, streamPromise2, new Stream.Listener.Adapter() session.newStream(frame2, streamPromise2, new Stream.Listener.Adapter()
{ {
@Override @Override
public void onReset(Stream stream, ResetFrame frame) public void onReset(Stream stream, ResetFrame frame)
{ {
resetLatch.countDown(); resetLatch.get().countDown();
} }
}); });
streamPromise2.get(5, TimeUnit.SECONDS); Stream stream2 = streamPromise2.get(5, TimeUnit.SECONDS);
assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); assertTrue(resetLatch.get().await(5, TimeUnit.SECONDS));
// Reset the latch and send a DATA frame, it should be dropped
// by the client because the stream has already been reset.
resetLatch.set(new CountDownLatch(1));
stream2.data(new DataFrame(stream2.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
// Must not receive another RST_STREAM.
assertFalse(resetLatch.get().await(1, TimeUnit.SECONDS));
// Simulate a client sending both HEADERS and DATA frames at the same time.
// The server should send a RST_STREAM for the HEADERS.
// For the server, dropping the DATA frame is too costly so it sends another RST_STREAM.
int streamId3 = stream2.getId() + 2;
HeadersFrame frame3 = new HeadersFrame(streamId3, metaData, null, false);
DataFrame data3 = new DataFrame(streamId3, BufferUtil.EMPTY_BUFFER, true);
Generator generator = ((HTTP2Session)session).getGenerator();
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(generator.getByteBufferPool());
generator.control(lease, frame3);
generator.data(lease, data3, data3.remaining());
((HTTP2Session)session).getEndPoint().write(Callback.NOOP, lease.getByteBuffers().toArray(new ByteBuffer[0]));
// Expect 2 RST_STREAM frames.
assertTrue(sessionResetLatch.await(5, TimeUnit.SECONDS));
stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); assertTrue(responseLatch.await(5, TimeUnit.SECONDS));

View File

@ -772,6 +772,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
int maxCount = getMaxRemoteStreams(); int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount) if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{ {
updateLastRemoteStreamId(streamId);
reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP); reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP);
return null; return null;
} }