375083 - Flow control should take in account window size changes from concurrent SETTINGS

This commit is contained in:
Simone Bordet 2012-03-22 17:08:11 +01:00
parent 875bcb5bb8
commit bb66cfc877
4 changed files with 74 additions and 13 deletions

View File

@ -35,6 +35,4 @@ public interface ISession extends Session
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context); public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context);
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context); public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context);
public int getWindowSize();
} }

View File

@ -18,10 +18,11 @@ package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.InterruptedByTimeoutException;
import java.util.ArrayList;
import java.util.Deque; import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -253,9 +254,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
} }
@Override @Override
public List<Stream> getStreams() public Set<Stream> getStreams()
{ {
List<Stream> result = new ArrayList<>(); Set<Stream> result = new HashSet<>();
result.addAll(streams.values()); result.addAll(streams.values());
return result; return result;
} }
@ -540,7 +541,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE); Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
if (windowSizeSetting != null) if (windowSizeSetting != null)
{ {
int prevWindowSize = windowSize;
windowSize = windowSizeSetting.value(); windowSize = windowSizeSetting.value();
for (IStream stream : streams.values())
stream.updateWindowSize(windowSize - prevWindowSize);
logger.debug("Updated window size to {}", windowSize); logger.debug("Updated window size to {}", windowSize);
} }
@ -774,12 +778,6 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
threadPool.execute(task); threadPool.execute(task);
} }
@Override
public int getWindowSize()
{
return windowSize;
}
@Override @Override
public void flush() public void flush()
{ {

View File

@ -17,7 +17,7 @@
package org.eclipse.jetty.spdy.api; package org.eclipse.jetty.spdy.api;
import java.util.EventListener; import java.util.EventListener;
import java.util.List; import java.util.Set;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -181,7 +181,7 @@ public interface Session
/** /**
* @return the streams currently active in this session * @return the streams currently active in this session
*/ */
public List<Stream> getStreams(); public Set<Stream> getStreams();
/** /**
* <p>Super interface for listeners with callbacks that are invoked on specific session events.</p> * <p>Super interface for listeners with callbacks that are invoked on specific session events.</p>

View File

@ -23,6 +23,7 @@ import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
@ -41,6 +42,70 @@ import org.junit.Test;
public class FlowControlTest extends AbstractTest public class FlowControlTest extends AbstractTest
{ {
@Test
public void testFlowControlWithConcurrentSettings() throws Exception
{
// Initial window is 64 KiB. We allow the client to send 1024 B
// then we change the window to 512 B. At this point, the client
// must stop sending data (although the initial window allows it)
final int size = 512;
final AtomicReference<DataInfo> dataInfoRef = new AtomicReference<>();
final CountDownLatch dataLatch = new CountDownLatch(2);
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(true));
return new StreamFrameListener.Adapter()
{
private final AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
int dataFrameCount = dataFrames.incrementAndGet();
if (dataFrameCount == 1)
{
dataInfoRef.set(dataInfo);
Settings settings = new Settings();
settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, size));
stream.getSession().settings(new SettingsInfo(settings));
}
else if (dataFrameCount > 1)
{
dataInfo.consume(dataInfo.length());
dataLatch.countDown();
}
}
};
}
}), new SessionFrameListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsInfo settingsInfo)
{
settingsLatch.countDown();
}
});
Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
stream.data(new BytesDataInfo(new byte[size * 2], false));
settingsLatch.await(5, TimeUnit.SECONDS);
// Send the second chunk of data, must not arrive since we're flow control stalled now
stream.data(new BytesDataInfo(new byte[size * 2], true));
Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
// Consume the data arrived to server, this will resume flow control
DataInfo dataInfo = dataInfoRef.get();
dataInfo.consume(dataInfo.length());
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
@Test @Test
public void testServerFlowControlOneBigWrite() throws Exception public void testServerFlowControlOneBigWrite() throws Exception
{ {