diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/ISession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/ISession.java index 10e64130726..c69af218385 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/ISession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/ISession.java @@ -35,6 +35,4 @@ public interface ISession extends Session public void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler handler, C context); public void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler handler, C context); - - public int getWindowSize(); } diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index 314695336ac..f8fc892b1cb 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -18,10 +18,11 @@ package org.eclipse.jetty.spdy; import java.nio.ByteBuffer; import java.nio.channels.InterruptedByTimeoutException; -import java.util.ArrayList; import java.util.Deque; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -253,9 +254,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler getStreams() + public Set getStreams() { - List result = new ArrayList<>(); + Set result = new HashSet<>(); result.addAll(streams.values()); return result; } @@ -540,7 +541,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler getStreams(); + public Set getStreams(); /** *

Super interface for listeners with callbacks that are invoked on specific session events.

diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java index 1ee5f8a96c0..526c9a8d81b 100644 --- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; @@ -41,6 +42,70 @@ import org.junit.Test; public class FlowControlTest extends AbstractTest { + @Test + public void testFlowControlWithConcurrentSettings() throws Exception + { + // Initial window is 64 KiB. We allow the client to send 1024 B + // then we change the window to 512 B. At this point, the client + // must stop sending data (although the initial window allows it) + + final int size = 512; + final AtomicReference dataInfoRef = new AtomicReference<>(); + final CountDownLatch dataLatch = new CountDownLatch(2); + final CountDownLatch settingsLatch = new CountDownLatch(1); + Session session = startClient(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.reply(new ReplyInfo(true)); + return new StreamFrameListener.Adapter() + { + private final AtomicInteger dataFrames = new AtomicInteger(); + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + int dataFrameCount = dataFrames.incrementAndGet(); + if (dataFrameCount == 1) + { + dataInfoRef.set(dataInfo); + Settings settings = new Settings(); + settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, size)); + stream.getSession().settings(new SettingsInfo(settings)); + } + else if (dataFrameCount > 1) + { + dataInfo.consume(dataInfo.length()); + dataLatch.countDown(); + } + } + }; + } + }), new SessionFrameListener.Adapter() + { + @Override + public void onSettings(Session session, SettingsInfo settingsInfo) + { + settingsLatch.countDown(); + } + }); + + Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS); + stream.data(new BytesDataInfo(new byte[size * 2], false)); + settingsLatch.await(5, TimeUnit.SECONDS); + + // Send the second chunk of data, must not arrive since we're flow control stalled now + stream.data(new BytesDataInfo(new byte[size * 2], true)); + Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS)); + + // Consume the data arrived to server, this will resume flow control + DataInfo dataInfo = dataInfoRef.get(); + dataInfo.consume(dataInfo.length()); + + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + } + @Test public void testServerFlowControlOneBigWrite() throws Exception {