Fixes #4714 - Low setMaxConcurrentStreams causes "1/unexpected_data_frame" errors.

Added test case.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-03-26 11:52:39 +01:00
parent 37cfb2fa86
commit 7445449d77
1 changed files with 42 additions and 8 deletions

View File

@ -18,11 +18,13 @@
package org.eclipse.jetty.http2.client;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
@ -35,11 +37,14 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -81,7 +86,7 @@ public class StreamCountTest extends AbstractTest
}
});
final CountDownLatch settingsLatch = new CountDownLatch(1);
CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
@ -97,7 +102,7 @@ public class StreamCountTest extends AbstractTest
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame1 = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
final CountDownLatch responseLatch = new CountDownLatch(1);
CountDownLatch responseLatch = new CountDownLatch(1);
session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter()
{
@Override
@ -123,7 +128,6 @@ public class StreamCountTest extends AbstractTest
@Test
public void testServerAllowsOneStreamEnforcedByServer() throws Exception
{
final CountDownLatch resetLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
@ -152,13 +156,21 @@ public class StreamCountTest extends AbstractTest
}
});
Session session = newClient(new Session.Listener.Adapter());
CountDownLatch sessionResetLatch = new CountDownLatch(2);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onReset(Session session, ResetFrame frame)
{
sessionResetLatch.countDown();
}
});
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame1 = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
final CountDownLatch responseLatch = new CountDownLatch(1);
CountDownLatch responseLatch = new CountDownLatch(1);
session.newStream(frame1, streamPromise1, new Stream.Listener.Adapter()
{
@Override
@ -173,17 +185,39 @@ public class StreamCountTest extends AbstractTest
HeadersFrame frame2 = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> streamPromise2 = new FuturePromise<>();
AtomicReference<CountDownLatch> resetLatch = new AtomicReference<>(new CountDownLatch(1));
session.newStream(frame2, streamPromise2, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
resetLatch.get().countDown();
}
});
streamPromise2.get(5, TimeUnit.SECONDS);
assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
Stream stream2 = streamPromise2.get(5, TimeUnit.SECONDS);
assertTrue(resetLatch.get().await(5, TimeUnit.SECONDS));
// Reset the latch and send a DATA frame, it should be dropped
// by the client because the stream has already been reset.
resetLatch.set(new CountDownLatch(1));
stream2.data(new DataFrame(stream2.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
// Must not receive another RST_STREAM.
assertFalse(resetLatch.get().await(1, TimeUnit.SECONDS));
// Simulate a client sending both HEADERS and DATA frames at the same time.
// The server should send a RST_STREAM for the HEADERS.
// For the server, dropping the DATA frame is too costly so it sends another RST_STREAM.
int streamId3 = stream2.getId() + 2;
HeadersFrame frame3 = new HeadersFrame(streamId3, metaData, null, false);
DataFrame data3 = new DataFrame(streamId3, BufferUtil.EMPTY_BUFFER, true);
Generator generator = ((HTTP2Session)session).getGenerator();
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(generator.getByteBufferPool());
generator.control(lease, frame3);
generator.data(lease, data3, data3.remaining());
((HTTP2Session)session).getEndPoint().write(Callback.NOOP, lease.getByteBuffers().toArray(new ByteBuffer[0]));
// Expect 2 RST_STREAM frames.
assertTrue(sessionResetLatch.await(5, TimeUnit.SECONDS));
stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));