More flow control tests.

This commit is contained in:
Simone Bordet 2015-03-11 18:43:00 +01:00
parent 8673c6bba7
commit 36f317382f
3 changed files with 63 additions and 156 deletions

View File

@ -18,28 +18,8 @@
package org.eclipse.jetty.http2.client;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
public class BufferingFlowControlStrategyTest extends FlowControlStrategyTest
{
@ -48,71 +28,4 @@ public class BufferingFlowControlStrategyTest extends FlowControlStrategyTest
{
return new BufferingFlowControlStrategy(0.5F);
}
@Ignore // Race between sending data and receiving reset
@Test
public void testFlowControlWhenServerResetsStream() throws Exception
{
// On server, we don't consume the data and we immediately reset.
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame frame = new HeadersFrame(0, metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
final CountDownLatch resetLatch = new CountDownLatch(1);
session.newStream(frame, streamPromise, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
// Perform a big upload that will stall the flow control windows.
ByteBuffer data = ByteBuffer.allocate(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, true), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
dataLatch.countDown();
}
});
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
// Wait a little more for the window updates to be processed.
Thread.sleep(1000);
// We sent 65535 data bytes to the server (the max allowed by the send window).
// The ratio being 50%, means that the regulator has a limit at 32767 bytes.
// This will be TCP written as:
// (9 + HEADERS) + 3 * (9 + DATA (16384)) + (9 + DATA (16383))
// since 16384 is the max frame length.
// One window update will be sent back of 32768 bytes.
// It does not matter how the TCP reads read the frames, since eventually
// the server will read 2 * 16384 data bytes (perhaps in multiple "fake"
// data frames).
// When 32768 bytes will be read and consumed, a window update will be
// sent to the client; the remaining 32767 will be read and consumed,
// but the server won't send the window update (it's exactly equal to
// the regulator limit), and the client won't send more bytes because
// the send has been reset.
// So the client send window must be 32768.
HTTP2Session http2Session = (HTTP2Session)session;
Assert.assertEquals(Frame.DEFAULT_MAX_LENGTH * 2, http2Session.getSendWindow());
}
}

View File

@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
@ -48,6 +49,7 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ByteBufferPool;
@ -909,4 +911,65 @@ public abstract class FlowControlStrategyTest
// Expect the connection to be closed.
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testFlowControlWhenServerResetsStream() throws Exception
{
// On server, don't consume the data and immediately reset.
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
if (HttpMethod.GET.is(request.getMethod()))
return new Stream.Listener.Adapter();
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
// Fail the callback to enlarge the session window.
// More data frames will be discarded because the
// stream is reset, and automatically consumed to
// keep the session window large for other streams.
callback.failed(new Throwable());
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
}
};
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame frame = new HeadersFrame(0, metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
final CountDownLatch resetLatch = new CountDownLatch(1);
session.newStream(frame, streamPromise, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
// Perform a big upload that will stall the flow control windows.
ByteBuffer data = ByteBuffer.allocate(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, true), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
dataLatch.countDown();
}
});
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -18,26 +18,8 @@
package org.eclipse.jetty.http2.client;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.SimpleFlowControlStrategy;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.junit.Assert;
import org.junit.Test;
public class SimpleFlowControlStrategyTest extends FlowControlStrategyTest
{
@ -46,55 +28,4 @@ public class SimpleFlowControlStrategyTest extends FlowControlStrategyTest
{
return new SimpleFlowControlStrategy();
}
@Test
public void testFlowControlWhenServerResetsStream() throws Exception
{
// On server, we don't consume the data and we immediately reset.
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame frame = new HeadersFrame(0, metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
final CountDownLatch resetLatch = new CountDownLatch(1);
session.newStream(frame, streamPromise, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
// Perform a big upload that will stall the flow control windows.
ByteBuffer data = ByteBuffer.allocate(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, true), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
dataLatch.countDown();
}
});
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
// Wait a little more for the window updates to be processed.
Thread.sleep(1000);
// At this point the session window should be fully available.
HTTP2Session http2Session = (HTTP2Session)session;
Assert.assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE, http2Session.getSendWindow());
}
}