Merge branch 'master' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project
This commit is contained in:
commit
241997b449
|
@ -69,7 +69,7 @@
|
|||
-->
|
||||
<!--
|
||||
<Call name="createRegistry" class="java.rmi.registry.LocateRegistry">
|
||||
<Arg type="java.lang.Integer">1099</Arg>
|
||||
<Arg type="java.lang.Integer"><SystemProperty name="jetty.jmxrmiport" default="1099"/></Arg>
|
||||
<Call name="sleep" class="java.lang.Thread">
|
||||
<Arg type="java.lang.Integer">1000</Arg>
|
||||
</Call>
|
||||
|
@ -91,11 +91,11 @@
|
|||
<New class="javax.management.remote.JMXServiceURL">
|
||||
<Arg type="java.lang.String">rmi</Arg>
|
||||
<Arg type="java.lang.String" />
|
||||
<Arg type="java.lang.Integer">0</Arg>
|
||||
<Arg type="java.lang.String">/jndi/rmi://localhost:1099/jettyjmx</Arg>
|
||||
<Arg type="java.lang.Integer"><SystemProperty name="jetty.jmxrmiport" default="1099"/></Arg>
|
||||
<Arg type="java.lang.String">/jndi/rmi://<SystemProperty name="jetty.jmxrmihost" default="localhost"/>:<SystemProperty name="jetty.jmxrmiport" default="1099"/>/jmxrmi</Arg>
|
||||
</New>
|
||||
</Arg>
|
||||
<Arg>org.eclipse.jetty:name=rmiconnectorserver</Arg>
|
||||
<Arg>org.eclipse.jetty.jmx:name=rmiconnectorserver</Arg>
|
||||
<Call name="start" />
|
||||
</New>
|
||||
-->
|
||||
|
|
|
@ -265,7 +265,7 @@ public class Server extends HandlerWrapper implements Attributes
|
|||
mex.add(e);
|
||||
}
|
||||
|
||||
if (_connectors!=null)
|
||||
if (_connectors!=null && mex.size()==0)
|
||||
{
|
||||
for (int i=0;i<_connectors.length;i++)
|
||||
{
|
||||
|
|
|
@ -35,6 +35,4 @@ public interface ISession extends Session
|
|||
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context);
|
||||
|
||||
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context);
|
||||
|
||||
public int getWindowSize();
|
||||
}
|
||||
|
|
|
@ -18,10 +18,11 @@ package org.eclipse.jetty.spdy;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.InterruptedByTimeoutException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Deque;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
@ -253,9 +254,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<Stream> getStreams()
|
||||
public Set<Stream> getStreams()
|
||||
{
|
||||
List<Stream> result = new ArrayList<>();
|
||||
Set<Stream> result = new HashSet<>();
|
||||
result.addAll(streams.values());
|
||||
return result;
|
||||
}
|
||||
|
@ -540,7 +541,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
|
|||
Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
|
||||
if (windowSizeSetting != null)
|
||||
{
|
||||
int prevWindowSize = windowSize;
|
||||
windowSize = windowSizeSetting.value();
|
||||
for (IStream stream : streams.values())
|
||||
stream.updateWindowSize(windowSize - prevWindowSize);
|
||||
logger.debug("Updated window size to {}", windowSize);
|
||||
}
|
||||
|
||||
|
@ -774,12 +778,6 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
|
|||
threadPool.execute(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getWindowSize()
|
||||
{
|
||||
return windowSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush()
|
||||
{
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
package org.eclipse.jetty.spdy.api;
|
||||
|
||||
import java.util.EventListener;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -181,7 +181,7 @@ public interface Session
|
|||
/**
|
||||
* @return the streams currently active in this session
|
||||
*/
|
||||
public List<Stream> getStreams();
|
||||
public Set<Stream> getStreams();
|
||||
|
||||
/**
|
||||
* <p>Super interface for listeners with callbacks that are invoked on specific session events.</p>
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.Exchanger;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.spdy.api.BytesDataInfo;
|
||||
import org.eclipse.jetty.spdy.api.DataInfo;
|
||||
|
@ -41,6 +42,70 @@ import org.junit.Test;
|
|||
|
||||
public class FlowControlTest extends AbstractTest
|
||||
{
|
||||
@Test
|
||||
public void testFlowControlWithConcurrentSettings() throws Exception
|
||||
{
|
||||
// Initial window is 64 KiB. We allow the client to send 1024 B
|
||||
// then we change the window to 512 B. At this point, the client
|
||||
// must stop sending data (although the initial window allows it)
|
||||
|
||||
final int size = 512;
|
||||
final AtomicReference<DataInfo> dataInfoRef = new AtomicReference<>();
|
||||
final CountDownLatch dataLatch = new CountDownLatch(2);
|
||||
final CountDownLatch settingsLatch = new CountDownLatch(1);
|
||||
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
|
||||
{
|
||||
stream.reply(new ReplyInfo(true));
|
||||
return new StreamFrameListener.Adapter()
|
||||
{
|
||||
private final AtomicInteger dataFrames = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataInfo dataInfo)
|
||||
{
|
||||
int dataFrameCount = dataFrames.incrementAndGet();
|
||||
if (dataFrameCount == 1)
|
||||
{
|
||||
dataInfoRef.set(dataInfo);
|
||||
Settings settings = new Settings();
|
||||
settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, size));
|
||||
stream.getSession().settings(new SettingsInfo(settings));
|
||||
}
|
||||
else if (dataFrameCount > 1)
|
||||
{
|
||||
dataInfo.consume(dataInfo.length());
|
||||
dataLatch.countDown();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}), new SessionFrameListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onSettings(Session session, SettingsInfo settingsInfo)
|
||||
{
|
||||
settingsLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
|
||||
stream.data(new BytesDataInfo(new byte[size * 2], false));
|
||||
settingsLatch.await(5, TimeUnit.SECONDS);
|
||||
|
||||
// Send the second chunk of data, must not arrive since we're flow control stalled now
|
||||
stream.data(new BytesDataInfo(new byte[size * 2], true));
|
||||
Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
|
||||
|
||||
// Consume the data arrived to server, this will resume flow control
|
||||
DataInfo dataInfo = dataInfoRef.get();
|
||||
dataInfo.consume(dataInfo.length());
|
||||
|
||||
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerFlowControlOneBigWrite() throws Exception
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue