405364 spdy imeplement MAX_CONCURRENT_STREAMS

This commit is contained in:
Thomas Becker 2013-04-09 18:28:55 +02:00
parent aa4e79efe2
commit 6a6660bfc6
5 changed files with 236 additions and 24 deletions

View File

@ -110,7 +110,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
private final AtomicBoolean goAwaySent = new AtomicBoolean();
private final AtomicBoolean goAwayReceived = new AtomicBoolean();
private final AtomicInteger lastStreamId = new AtomicInteger();
private final AtomicInteger localStreamCount = new AtomicInteger(0);
private final FlowControlStrategy flowControlStrategy;
private volatile int maxConcurrentLocalStreams = -1;
private boolean flushing;
private Throwable failure;
@ -181,6 +183,8 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
// TODO: for SPDYv3 we need to support the "slot" argument
SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
IStream stream = createStream(synStream, listener, true, promise);
if (stream == null)
return;
generateAndEnqueueControlFrame(stream, synStream, synInfo.getTimeout(), synInfo.getUnit(), stream);
}
flush();
@ -535,15 +539,39 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
}
int streamId = stream.getId();
if (local)
{
while (true)
{
int oldStreamCountValue = localStreamCount.get();
int maxConcurrentStreams = maxConcurrentLocalStreams;
if (maxConcurrentStreams > -1 && oldStreamCountValue >= maxConcurrentStreams)
{
String msg = String.format("Max concurrent local streams (%d) exceeded.",
maxConcurrentStreams);
LOG.debug(msg);
promise.failed(new SPDYException(msg));
return null;
}
if (localStreamCount.compareAndSet(oldStreamCountValue, oldStreamCountValue + 1))
break;
}
}
if (streams.putIfAbsent(streamId, stream) != null)
{
//TODO: fail promise
if (local)
{
localStreamCount.decrementAndGet();
throw new IllegalStateException("Duplicate stream id " + streamId);
}
RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
LOG.debug("Duplicate stream, {}", rstInfo);
try
{
rst(rstInfo);
rst(rstInfo); //TODO: non blocking reset or find the reason why blocking is used
}
catch (InterruptedException | ExecutionException | TimeoutException e)
{
@ -554,8 +582,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
else
{
LOG.debug("Created {}", stream);
if (local)
notifyStreamCreated(stream);
notifyStreamCreated(stream);
return stream;
}
}
@ -590,10 +617,15 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
IStream removed = streams.remove(stream.getId());
if (removed != null)
{
assert removed == stream;
LOG.debug("Removed {}", stream);
notifyStreamClosed(stream);
if (streamIds.get() % 2 == stream.getId() % 2)
localStreamCount.decrementAndGet();
LOG.debug("Removed {}", stream);
notifyStreamClosed(stream);
}
}
private void notifyStreamClosed(IStream stream)
@ -666,6 +698,13 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
setWindowSize(windowSize);
LOG.debug("Updated session window size to {}", windowSize);
}
Settings.Setting maxConcurrentStreamsSetting = frame.getSettings().get(Settings.ID.MAX_CONCURRENT_STREAMS);
if (maxConcurrentStreamsSetting != null)
{
int maxConcurrentStreamsValue = maxConcurrentStreamsSetting.value();
maxConcurrentLocalStreams = maxConcurrentStreamsValue;
LOG.debug("Updated session maxConcurrentLocalStreams to {}", maxConcurrentStreamsValue);
}
SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
notifyOnSettings(listener, settingsInfo);
flush();

View File

@ -18,15 +18,6 @@
package org.eclipse.jetty.spdy;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.HashSet;
@ -47,12 +38,14 @@ import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Settings;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.DataFrame;
import org.eclipse.jetty.spdy.frames.SettingsFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.spdy.generator.Generator;
@ -74,6 +67,15 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class StandardSessionTest
{
@ -457,7 +459,30 @@ public class StandardSessionTest
stream.headers(new HeadersInfo(headers, true));
verify(controller, times(3)).write(any(ByteBuffer.class), any(Callback.class));
}
@Test
public void testMaxConcurrentStreams() throws InterruptedException
{
final CountDownLatch failedBecauseMaxConcurrentStreamsExceeded = new CountDownLatch(1);
Settings settings = new Settings();
settings.put(new Settings.Setting(Settings.ID.MAX_CONCURRENT_STREAMS, 0));
SettingsFrame settingsFrame = new SettingsFrame(VERSION, (byte)0, settings);
session.onControlFrame(settingsFrame);
PushSynInfo pushSynInfo = new PushSynInfo(1, new PushInfo(new Fields(), false));
session.syn(pushSynInfo, null, new Promise.Adapter<Stream>()
{
@Override
public void failed(Throwable x)
{
failedBecauseMaxConcurrentStreamsExceeded.countDown();
}
});
assertThat("Opening push stream failed because maxConcurrentStream is exceeded",
failedBecauseMaxConcurrentStreamsExceeded.await(5, TimeUnit.SECONDS), is(true));
}
@Test

View File

@ -101,14 +101,14 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
@Test
public void testPushHeadersAreValid() throws Exception
{
sendMainRequestAndCSSRequest();
sendMainRequestAndCSSRequest(null);
run2ndClientRequests(true, true);
}
@Test
public void testClientResetsPushStreams() throws Exception
{
sendMainRequestAndCSSRequest();
sendMainRequestAndCSSRequest(null);
final CountDownLatch pushDataLatch = new CountDownLatch(1);
final CountDownLatch pushSynHeadersValid = new CountDownLatch(1);
Session session = startClient(version, serverAddress, null);
@ -125,14 +125,14 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
public void testUserAgentBlackList() throws Exception
{
pushStrategy.setUserAgentBlacklist(Arrays.asList(".*(?i)firefox/16.*"));
sendMainRequestAndCSSRequest();
sendMainRequestAndCSSRequest(null);
run2ndClientRequests(false, false);
}
@Test
public void testReferrerPushPeriod() throws Exception
{
Session session = sendMainRequestAndCSSRequest();
Session session = sendMainRequestAndCSSRequest(null);
// Sleep for pushPeriod This should prevent application.js from being mapped as pushResource
Thread.sleep(referrerPushPeriod + 1);
@ -148,13 +148,38 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
connector.addConnectionFactory(defaultFactory);
connector.setDefaultProtocol(defaultFactory.getProtocol()); // TODO I don't think this is right
Session session = sendMainRequestAndCSSRequest();
Session session = sendMainRequestAndCSSRequest(null);
sendRequest(session, associatedJSRequestHeaders, null, null);
run2ndClientRequests(false, true);
}
@Test
public void testMaxConcurrentStreamsToDisablePush() throws Exception
{
final CountDownLatch pushReceivedLatch = new CountDownLatch(1);
Session session = sendMainRequestAndCSSRequest(new SessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
pushReceivedLatch.countDown();
return null;
}
});
// Settings settings = new Settings();
// settings.put(new Settings.Setting(Settings.ID.MAX_CONCURRENT_STREAMS, 0));
// SettingsInfo settingsInfo = new SettingsInfo(settings);
//
// session.settings(settingsInfo);
sendRequest(session, mainRequestHeaders, null, null);
assertThat(pushReceivedLatch.await(1, TimeUnit.SECONDS), is(false));
}
private InetSocketAddress createServer() throws Exception
{
GzipHandler gzipHandler = new GzipHandler();
@ -177,9 +202,9 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
return startHTTPServer(version, gzipHandler);
}
private Session sendMainRequestAndCSSRequest() throws Exception
private Session sendMainRequestAndCSSRequest(SessionFrameListener sessionFrameListener) throws Exception
{
Session session = startClient(version, serverAddress, null);
Session session = startClient(version, serverAddress, sessionFrameListener);
sendRequest(session, mainRequestHeaders, null, null);
sendRequest(session, associatedCSSRequestHeaders, null, null);
@ -197,7 +222,8 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid);
if (pushSynHeadersValid != null)
validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid);
assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true));
assertThat("URI header ends with css", pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))

View File

@ -53,13 +53,15 @@ public abstract class AbstractTest
}
};
protected final short version = SPDY.V2;
protected Server server;
protected SPDYClient.Factory clientFactory;
protected SPDYServerConnector connector;
protected InetSocketAddress startServer(ServerSessionFrameListener listener) throws Exception
{
return startServer(SPDY.V2, listener);
return startServer(version, listener);
}
protected InetSocketAddress startServer(short version, ServerSessionFrameListener listener) throws Exception
@ -99,7 +101,7 @@ public abstract class AbstractTest
protected Session startClient(InetSocketAddress socketAddress, SessionFrameListener listener) throws Exception
{
return startClient(SPDY.V2, socketAddress, listener);
return startClient(version, socketAddress, listener);
}
protected Session startClient(short version, InetSocketAddress socketAddress, SessionFrameListener listener) throws Exception

View File

@ -0,0 +1,120 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.spdy.server;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Settings;
import org.eclipse.jetty.spdy.api.SettingsInfo;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Fields;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public class MaxConcurrentStreamTest extends AbstractTest
{
@Test
public void testMaxConcurrentStreamsSetByServer() throws Exception, ExecutionException
{
final CountDownLatch settingsReceivedLatch = new CountDownLatch(1);
final CountDownLatch dataReceivedLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public void onConnect(Session session)
{
Settings settings = new Settings();
settings.put(new Settings.Setting(Settings.ID.MAX_CONCURRENT_STREAMS, 1));
try
{
session.settings(new SettingsInfo(settings));
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
e.printStackTrace();
}
}
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
try
{
stream.reply(new ReplyInfo(true));
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
e.printStackTrace();
}
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataReceivedLatch.countDown();
}
};
}
}), new SessionFrameListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsInfo settingsInfo)
{
settingsReceivedLatch.countDown();
}
});
assertThat("Settings frame received", settingsReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
SynInfo synInfo = new SynInfo(new Fields(), false);
Stream stream = session.syn(synInfo, null);
boolean failed = false;
try
{
session.syn(synInfo, null);
}
catch (ExecutionException | InterruptedException | TimeoutException e)
{
failed = true;
}
assertThat("Opening second stream failed", failed, is(true));
stream.data(new ByteBufferDataInfo(BufferUtil.EMPTY_BUFFER, true));
assertThat("Data has been received on first stream.", dataReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
session.syn(synInfo, null);
}
}