Refactored flow control, encapsulating it into a strategy.

The reason for this change is that the server will soon be serving both SPDY/2 and SPDY/3 browsers,
so the flow control strategy must be dynamically chosen depending on the SPDY version.
This commit is contained in:
Simone Bordet 2012-05-30 17:10:31 +02:00
parent 97fffb4e6a
commit 24f4631a06
24 changed files with 499 additions and 304 deletions

View File

@ -0,0 +1,90 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import org.eclipse.jetty.spdy.api.DataInfo;
// TODO: add methods that tell how much written and whether we're TCP congested ?
public interface FlowControlStrategy
{
public int getWindowSize(ISession session);
public void setWindowSize(ISession session, int windowSize);
public void onNewStream(ISession session, IStream stream);
public void onWindowUpdate(ISession session, IStream stream, int delta);
public void updateWindow(ISession session, IStream stream, int delta);
public void onDataReceived(ISession session, IStream stream, DataInfo dataInfo);
public void onDataConsumed(ISession session, IStream stream, DataInfo dataInfo, int delta);
public static class None implements FlowControlStrategy
{
private volatile int windowSize;
public None()
{
this(65536);
}
public None(int windowSize)
{
this.windowSize = windowSize;
}
@Override
public int getWindowSize(ISession session)
{
return windowSize;
}
@Override
public void setWindowSize(ISession session, int windowSize)
{
this.windowSize = windowSize;
}
@Override
public void onNewStream(ISession session, IStream stream)
{
stream.updateWindowSize(windowSize);
}
@Override
public void onWindowUpdate(ISession session, IStream stream, int delta)
{
}
@Override
public void updateWindow(ISession session, IStream stream, int delta)
{
}
@Override
public void onDataReceived(ISession session, IStream stream, DataInfo dataInfo)
{
}
@Override
public void onDataConsumed(ISession session, IStream stream, DataInfo dataInfo, int delta)
{
}
}
}

View File

@ -16,14 +16,12 @@
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.DataFrame;
/**
* <p>The internal interface that represents a stream.</p>
@ -77,19 +75,18 @@ public interface IStream extends Stream
* for example by updating the stream's state or by calling listeners.</p>
*
* @param frame the control frame to process
* @see #process(DataFrame, ByteBuffer)
* @see #process(DataInfo)
*/
public void process(ControlFrame frame);
/**
* <p>Processes the given data frame along with the given byte buffer,
* <p>Processes the given {@code dataInfo},
* for example by updating the stream's state or by calling listeners.</p>
*
* @param frame the data frame to process
* @param data the byte buffer to process
* @param dataInfo the DataInfo to process
* @see #process(ControlFrame)
*/
public void process(DataFrame frame, ByteBuffer data);
public void process(DataInfo dataInfo);
/**
* <p>Associate the given {@link IStream} to this {@link IStream}.</p>

View File

@ -0,0 +1,87 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
public class SPDYv3FlowControlStrategy implements FlowControlStrategy
{
private volatile int windowSize;
@Override
public int getWindowSize(ISession session)
{
return windowSize;
}
@Override
public void setWindowSize(ISession session, int windowSize)
{
int prevWindowSize = this.windowSize;
this.windowSize = windowSize;
for (Stream stream : session.getStreams())
((IStream)stream).updateWindowSize(windowSize - prevWindowSize);
}
@Override
public void onNewStream(ISession session, IStream stream)
{
stream.updateWindowSize(windowSize);
}
@Override
public void onWindowUpdate(ISession session, IStream stream, int delta)
{
if (stream != null)
stream.updateWindowSize(delta);
}
@Override
public void updateWindow(ISession session, IStream stream, int delta)
{
stream.updateWindowSize(delta);
}
@Override
public void onDataReceived(ISession session, IStream stream, DataInfo dataInfo)
{
// Do nothing
}
@Override
public void onDataConsumed(ISession session, IStream stream, DataInfo dataInfo, int delta)
{
// This is the algorithm for flow control.
// This method may be called multiple times with delta=1, but we only send a window
// update when the whole dataInfo has been consumed.
// Other policies may be to send window updates when consumed() is greater than
// a certain threshold, etc. but for now the policy is not pluggable for simplicity.
// Note that the frequency of window updates depends on the read buffer, that
// should not be too smaller than the window size to avoid frequent window updates.
// Therefore, a pluggable policy should be able to modify the read buffer capacity.
int length = dataInfo.length();
if (dataInfo.consumed() == length && !stream.isClosed() && length > 0)
{
WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(session.getVersion(), stream.getId(), length);
session.control(stream, windowUpdateFrame, 0, TimeUnit.MILLISECONDS, null, null);
}
}
}

View File

@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
@ -93,13 +94,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private final AtomicBoolean goAwaySent = new AtomicBoolean();
private final AtomicBoolean goAwayReceived = new AtomicBoolean();
private final AtomicInteger lastStreamId = new AtomicInteger();
private final FlowControlStrategy flowControlStrategy;
private boolean flushing;
private boolean failed = false;
private volatile boolean flowControlEnabled = true;
private volatile int windowSize = 65536;
public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler,
Controller<FrameBytes> controller, IdleListener idleListener, int initialStreamId, SessionFrameListener listener, Generator generator)
Controller<FrameBytes> controller, IdleListener idleListener, int initialStreamId, SessionFrameListener listener,
Generator generator, FlowControlStrategy flowControlStrategy)
{
this.version = version;
this.bufferPool = bufferPool;
@ -111,6 +112,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
this.pingIds = new AtomicInteger(initialStreamId);
this.listener = listener;
this.generator = generator;
this.flowControlStrategy = flowControlStrategy;
}
@Override
@ -268,14 +270,14 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public void onControlFrame(ControlFrame frame)
{
notifyIdle(idleListener,false);
notifyIdle(idleListener, false);
try
{
logger.debug("Processing {}",frame);
logger.debug("Processing {}", frame);
if (goAwaySent.get())
{
logger.debug("Skipped processing of {}",frame);
logger.debug("Skipped processing of {}", frame);
return;
}
@ -334,7 +336,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
finally
{
notifyIdle(idleListener,true);
notifyIdle(idleListener, true);
}
}
@ -344,11 +346,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
notifyIdle(idleListener, false);
try
{
logger.debug("Processing {}, {} data bytes",frame,data.remaining());
logger.debug("Processing {}, {} data bytes", frame, data.remaining());
if (goAwaySent.get())
{
logger.debug("Skipped processing of {}",frame);
logger.debug("Skipped processing of {}", frame);
return;
}
@ -356,18 +358,18 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
IStream stream = streams.get(streamId);
if (stream == null)
{
RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM);
logger.debug("Unknown stream {}",rstInfo);
RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
logger.debug("Unknown stream {}", rstInfo);
rst(rstInfo);
}
else
{
processData(stream,frame,data);
processData(stream, frame, data);
}
}
finally
{
notifyIdle(idleListener,true);
notifyIdle(idleListener, true);
}
}
@ -377,9 +379,19 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
listener.onIdle(idle);
}
private void processData(IStream stream, DataFrame frame, ByteBuffer data)
private void processData(final IStream stream, DataFrame frame, ByteBuffer data)
{
stream.process(frame,data);
ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose(), frame.isCompress())
{
@Override
public void consume(int delta)
{
super.consume(delta);
flowControlStrategy.onDataConsumed(StandardSession.this, stream, this, delta);
}
};
flowControlStrategy.onDataReceived(this, stream, dataInfo);
stream.process(dataInfo);
updateLastStreamId(stream);
if (stream.isClosed())
removeStream(stream);
@ -455,7 +467,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private IStream newStream(SynStreamFrame frame)
{
IStream associatedStream = streams.get(frame.getAssociatedStreamId());
return new StandardStream(frame, this, windowSize, associatedStream);
IStream stream = new StandardStream(frame, this, associatedStream);
flowControlStrategy.onNewStream(this, stream);
return stream;
}
private void notifyStreamCreated(IStream stream)
@ -550,15 +564,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
if (windowSizeSetting != null)
{
int prevWindowSize = windowSize;
windowSize = windowSizeSetting.value();
for (IStream stream : streams.values())
stream.updateWindowSize(windowSize - prevWindowSize);
logger.debug("Updated window size to {}",windowSize);
int windowSize = windowSizeSetting.value();
setWindowSize(windowSize);
logger.debug("Updated session window size to {}", windowSize);
}
SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(),frame.isClearPersisted());
notifyOnSettings(listener,settingsInfo);
SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
notifyOnSettings(listener, settingsInfo);
flush();
}
@ -618,8 +629,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
int streamId = frame.getStreamId();
IStream stream = streams.get(streamId);
if (stream != null)
stream.process(frame);
flowControlStrategy.onWindowUpdate(this, stream, frame.getWindowDelta());
flush();
}
protected void close()
@ -993,6 +1004,16 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
public int getWindowSize()
{
return flowControlStrategy.getWindowSize(this);
}
public void setWindowSize(int initialWindowSize)
{
flowControlStrategy.setWindowSize(this, initialWindowSize);
}
public interface FrameBytes extends Comparable<FrameBytes>
{
public IStream getStream();
@ -1145,17 +1166,14 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
bufferPool.release(buffer);
IStream stream = getStream();
boolean flowControlEnabled = StandardSession.this.flowControlEnabled;
if (flowControlEnabled)
stream.updateWindowSize(-size);
flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
if (dataInfo.available() > 0)
{
// We have written a frame out of this DataInfo, but there is more to write.
// We need to keep the correct ordering of frames, to avoid that another
// DataInfo for the same stream is written before this one is finished.
prepend(this);
if (!flowControlEnabled)
flush();
flush();
}
else
{
@ -1172,14 +1190,4 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
return String.format("DATA bytes @%x available=%d consumed=%d on %s",dataInfo.hashCode(),dataInfo.available(),dataInfo.consumed(),getStream());
}
}
public boolean isFlowControlEnabled()
{
return flowControlEnabled;
}
public void setFlowControlEnabled(boolean flowControl)
{
this.flowControlEnabled = flowControl;
}
}

View File

@ -16,7 +16,6 @@
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@ -25,7 +24,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.HeadersInfo;
@ -37,11 +35,9 @@ import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.DataFrame;
import org.eclipse.jetty.spdy.frames.HeadersFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -52,18 +48,17 @@ public class StandardStream implements IStream
private final IStream associatedStream;
private final SynStreamFrame frame;
private final ISession session;
private final AtomicInteger windowSize;
private final AtomicInteger windowSize = new AtomicInteger();
private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
private volatile StreamFrameListener listener;
private volatile OpenState openState = OpenState.SYN_SENT;
private volatile CloseState closeState = CloseState.OPENED;
private volatile boolean reset = false;
public StandardStream(SynStreamFrame frame, ISession session, int windowSize, IStream associatedStream)
public StandardStream(SynStreamFrame frame, ISession session, IStream associatedStream)
{
this.frame = frame;
this.session = session;
this.windowSize = new AtomicInteger(windowSize);
this.associatedStream = associatedStream;
}
@ -113,7 +108,7 @@ public class StandardStream implements IStream
public void updateWindowSize(int delta)
{
int size = windowSize.addAndGet(delta);
logger.debug("Updated window size by {}, new window size {}",delta,size);
logger.debug("Updated window size {} -> {} for {}", size - delta, size, this);
}
@Override
@ -209,12 +204,6 @@ public class StandardStream implements IStream
notifyOnHeaders(headersInfo);
break;
}
case WINDOW_UPDATE:
{
WindowUpdateFrame windowUpdate = (WindowUpdateFrame)frame;
updateWindowSize(windowUpdate.getWindowDelta());
break;
}
case RST_STREAM:
{
reset = true;
@ -229,57 +218,28 @@ public class StandardStream implements IStream
}
@Override
public void process(DataFrame frame, ByteBuffer data)
public void process(DataInfo dataInfo)
{
// TODO: in v3 we need to send a rst instead of just ignoring
// ignore data frame if this stream is remotelyClosed already
if (isHalfClosed() && !isLocallyClosed())
if (isRemotelyClosed())
{
logger.debug("Ignoring received dataFrame as this stream is remotely closed: " + frame);
logger.debug("Stream is remotely closed, ignoring {}", dataInfo);
return;
}
if (!canReceive())
{
logger.debug("Can't receive. Sending rst: " + frame);
session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
logger.debug("Protocol error receiving {}, resetting" + dataInfo);
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
return;
}
updateCloseState(frame.isClose(),false);
ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data,frame.isClose(),frame.isCompress())
{
@Override
public void consume(int delta)
{
super.consume(delta);
// This is the algorithm for flow control.
// This method may be called multiple times with delta=1, but we only send a window
// update when the whole dataInfo has been consumed.
// Other policies may be to send window updates when consumed() is greater than
// a certain threshold, etc. but for now the policy is not pluggable for simplicity.
// Note that the frequency of window updates depends on the read buffer, that
// should not be too smaller than the window size to avoid frequent window updates.
// Therefore, a pluggable policy should be able to modify the read buffer capacity.
if (consumed() == length() && !isClosed())
windowUpdate(length());
}
};
updateCloseState(dataInfo.isClose(), false);
notifyOnData(dataInfo);
session.flush();
}
private void windowUpdate(int delta)
{
if (delta > 0)
{
WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(session.getVersion(),getId(),delta);
session.control(this,windowUpdateFrame,0,TimeUnit.MILLISECONDS,null,null);
}
}
private void notifyOnReply(ReplyInfo replyInfo)
{
final StreamFrameListener listener = this.listener;
@ -305,7 +265,7 @@ public class StandardStream implements IStream
if (listener != null)
{
logger.debug("Invoking headers callback with {} on listener {}",frame,listener);
listener.onHeaders(this,headersInfo);
listener.onHeaders(this, headersInfo);
}
}
catch (Exception x)
@ -322,8 +282,8 @@ public class StandardStream implements IStream
if (listener != null)
{
logger.debug("Invoking data callback with {} on listener {}",dataInfo,listener);
listener.onData(this,dataInfo);
logger.debug("Invoked data callback with {} on listener {}",dataInfo,listener);
listener.onData(this, dataInfo);
logger.debug("Invoked data callback with {} on listener {}", dataInfo, listener);
}
}
catch (Exception x)
@ -456,6 +416,12 @@ public class StandardStream implements IStream
return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED;
}
private boolean isRemotelyClosed()
{
CloseState closeState = this.closeState;
return closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
}
@Override
public String toString()
{

View File

@ -46,7 +46,7 @@ public class AsyncTimeoutTest
Executor threadPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator)
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator, new FlowControlStrategy.None())
{
@Override
public void flush()
@ -91,7 +91,7 @@ public class AsyncTimeoutTest
Executor threadPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator)
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator, new FlowControlStrategy.None())
{
@Override
protected void write(ByteBuffer buffer, Handler<FrameBytes> handler, FrameBytes frameBytes)

View File

@ -16,11 +16,6 @@
package org.eclipse.jetty.spdy;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertThat;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CountDownLatch;
@ -32,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.spdy.StandardSession.FrameBytes;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
@ -57,6 +53,15 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class StandardSessionTest
{
@ -77,7 +82,7 @@ public class StandardSessionTest
threadPool = Executors.newCachedThreadPool();
scheduler = Executors.newSingleThreadScheduledExecutor();
generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory.StandardCompressor());
session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,controller,null,1,null,generator);
session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,controller,null,1,null,generator,new FlowControlStrategy.None());
headers = new Headers();
}
@ -369,7 +374,7 @@ public class StandardSessionTest
IStream stream = (IStream)session.syn(new SynInfo(false),new StreamFrameListener.Adapter()).get();
stream.updateCloseState(true,false);
assertThat("stream is half closed from remote side",stream.isHalfClosed(),is(true));
stream.process(new DataFrame(stream.getId(),(byte)0,256),ByteBuffer.allocate(256));
stream.process(new ByteBufferDataInfo(ByteBuffer.allocate(256), true));
}
@Test
@ -400,8 +405,8 @@ public class StandardSessionTest
final CountDownLatch failedCalledLatch = new CountDownLatch(2);
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null);
IStream stream = new StandardStream(synStreamFrame,session,8192,null);
IStream stream = new StandardStream(synStreamFrame,session,null);
stream.updateWindowSize(8192);
Handler.Adapter<Void> handler = new Handler.Adapter<Void>()
{
@Override

View File

@ -14,17 +14,6 @@
package org.eclipse.jetty.spdy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -46,6 +35,17 @@ import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/* ------------------------------------------------------------ */
/**
@ -63,7 +63,7 @@ public class StandardStreamTest
@Test
public void testSyn()
{
Stream stream = new StandardStream(synStreamFrame,session,0,null);
Stream stream = new StandardStream(synStreamFrame,session,null);
Set<Stream> streams = new HashSet<>();
streams.add(stream);
when(synStreamFrame.isClose()).thenReturn(false);
@ -100,7 +100,7 @@ public class StandardStreamTest
@Test
public void testSynOnClosedStream(){
IStream stream = new StandardStream(synStreamFrame,session,0,null);
IStream stream = new StandardStream(synStreamFrame,session,null);
stream.updateCloseState(true,true);
stream.updateCloseState(true,false);
assertThat("stream expected to be closed",stream.isClosed(),is(true));
@ -121,7 +121,8 @@ public class StandardStreamTest
public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
{
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null);
IStream stream = new StandardStream(synStreamFrame,session,8192,null);
IStream stream = new StandardStream(synStreamFrame,session,null);
stream.updateWindowSize(8192);
stream.updateCloseState(synStreamFrame.isClose(),true);
assertThat("stream is half closed",stream.isHalfClosed(),is(true));
stream.data(new StringDataInfo("data on half closed stream",true));

View File

@ -29,7 +29,7 @@ public class ClientUsageTest
@Test
public void testClientRequestResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, 1, null, null, null);
session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
{
@ -48,7 +48,7 @@ public class ClientUsageTest
@Test
public void testClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, 1, null, null, null);
Stream stream = session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{
@ -69,7 +69,7 @@ public class ClientUsageTest
@Test
public void testAsyncClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, 1, null, null, null);
final String context = "context";
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
@ -104,7 +104,7 @@ public class ClientUsageTest
@Test
public void testAsyncClientRequestWithBodyAndResponseWithBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, 1, null, null, null);
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{

View File

@ -21,43 +21,39 @@ import java.io.IOException;
import org.eclipse.jetty.http.HttpSchemes;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.spdy.AsyncConnectionFactory;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HTTPSPDYServerConnector extends SPDYServerConnector
{
private final AsyncConnectionFactory defaultConnectionFactory;
private final PushStrategy pushStrategy = new PushStrategy.None();
public HTTPSPDYServerConnector()
{
this(null);
this(null, new PushStrategy.None());
}
public HTTPSPDYServerConnector(PushStrategy pushStrategy)
{
this(null, pushStrategy);
}
public HTTPSPDYServerConnector(SslContextFactory sslContextFactory)
{
super(null, sslContextFactory);
// Override the default connection factory for non-SSL connections
defaultConnectionFactory = new ServerHTTPAsyncConnectionFactory(this);
setFlowControlEnabled(false);
this(sslContextFactory, new PushStrategy.None());
}
@Override
protected void doStart() throws Exception
public HTTPSPDYServerConnector(SslContextFactory sslContextFactory, PushStrategy pushStrategy)
{
super.doStart();
// We pass a null ServerSessionFrameListener because for
// HTTP over SPDY we need one that references the endPoint
super(null, sslContextFactory);
// Override the default connection factory for non-SSL connections to speak plain HTTP
setDefaultAsyncConnectionFactory(new ServerHTTPAsyncConnectionFactory(this));
// Add the "http/1.1" protocol for browsers that support NPN but not SPDY
putAsyncConnectionFactory("http/1.1", getDefaultAsyncConnectionFactory());
// Override the "spdy/2" protocol by handling HTTP over SPDY
putAsyncConnectionFactory("spdy/2", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), this, pushStrategy));
// Add the "http/1.1" protocol for browsers that do not support NPN
putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(this));
}
@Override
protected AsyncConnectionFactory getDefaultAsyncConnectionFactory()
{
return defaultConnectionFactory;
// TODO: Override the "spdy/3" protocol to handle HTTP over SPDY
}
@Override

View File

@ -52,7 +52,7 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
}
@Override
protected ServerSessionFrameListener newServerSessionFrameListener(AsyncEndPoint endPoint, Object attachment)
protected ServerSessionFrameListener provideServerSessionFrameListener(AsyncEndPoint endPoint, Object attachment)
{
return new HTTPServerFrameListener(endPoint);
}

View File

@ -67,14 +67,10 @@ public abstract class AbstractHTTPSPDYTest
protected SPDYServerConnector newHTTPSPDYServerConnector()
{
// For these tests, we need the connector to speak HTTP over SPDY even in non-SSL
return new HTTPSPDYServerConnector()
{
@Override
protected AsyncConnectionFactory getDefaultAsyncConnectionFactory()
{
return new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), this, new PushStrategy.None());
}
};
SPDYServerConnector connector = new HTTPSPDYServerConnector();
AsyncConnectionFactory defaultFactory = new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, new PushStrategy.None());
connector.setDefaultAsyncConnectionFactory(defaultFactory);
return connector;
}
protected Session startClient(InetSocketAddress socketAddress, SessionFrameListener listener) throws Exception

View File

@ -27,7 +27,6 @@ import javax.net.ssl.SSLSocket;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.AsyncConnectionFactory;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
@ -109,9 +108,8 @@ public class ProtocolNegotiationTest
public String selectProtocol(List<String> strings)
{
Assert.assertNotNull(strings);
Assert.assertEquals(1, strings.size());
String protocol = strings.get(0);
Assert.assertEquals("http/1.1", protocol);
String protocol = "http/1.1";
Assert.assertTrue(strings.contains(protocol));
return protocol;
}
});
@ -166,11 +164,11 @@ public class ProtocolNegotiationTest
public String selectProtocol(List<String> strings)
{
Assert.assertNotNull(strings);
Assert.assertEquals(2, strings.size());
String spdyProtocol = strings.get(0);
Assert.assertEquals("spdy/2", spdyProtocol);
String httpProtocol = strings.get(1);
Assert.assertEquals("http/1.1", httpProtocol);
String spdyProtocol = "spdy/2";
Assert.assertTrue(strings.contains(spdyProtocol));
String httpProtocol = "http/1.1";
Assert.assertTrue(strings.contains(httpProtocol));
Assert.assertTrue(strings.indexOf(spdyProtocol) < strings.indexOf(httpProtocol));
return httpProtocol;
}
});
@ -198,14 +196,9 @@ public class ProtocolNegotiationTest
@Test
public void testServerAdvertisingSPDYAndHTTPSpeaksDefaultProtocolWhenNPNMissing() throws Exception
{
InetSocketAddress address = startServer(new SPDYServerConnector(null, newSslContextFactory())
{
@Override
protected AsyncConnectionFactory getDefaultAsyncConnectionFactory()
{
return new ServerHTTPAsyncConnectionFactory(connector);
}
});
SPDYServerConnector connector = new SPDYServerConnector(null, newSslContextFactory());
connector.setDefaultAsyncConnectionFactory(new ServerHTTPAsyncConnectionFactory(connector));
InetSocketAddress address = startServer(connector);
connector.putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(connector));
SslContextFactory sslContextFactory = newSslContextFactory();

View File

@ -30,17 +30,10 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
@Override
protected SPDYServerConnector newHTTPSPDYServerConnector()
{
return new HTTPSPDYServerConnector()
{
private final AsyncConnectionFactory defaultAsyncConnectionFactory =
new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), this, new ReferrerPushStrategy());
@Override
protected AsyncConnectionFactory getDefaultAsyncConnectionFactory()
{
return defaultAsyncConnectionFactory;
}
};
SPDYServerConnector connector = super.newHTTPSPDYServerConnector();
AsyncConnectionFactory defaultFactory = new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, new ReferrerPushStrategy());
connector.setDefaultAsyncConnectionFactory(defaultFactory);
return connector;
}
@Test

View File

@ -0,0 +1,39 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import org.eclipse.jetty.spdy.api.SPDY;
public class FlowControlStrategyFactory
{
private FlowControlStrategyFactory()
{
}
public static FlowControlStrategy newFlowControlStrategy(short version)
{
switch (version)
{
case SPDY.V2:
return new FlowControlStrategy.None();
case SPDY.V3:
return new SPDYv3FlowControlStrategy();
default:
throw new IllegalStateException();
}
}
}

View File

@ -223,6 +223,7 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
@Override
public void onIdleExpired(long idleForMs)
{
logger.debug("Idle timeout expired for {}", getEndPoint());
session.goAway();
}

View File

@ -64,7 +64,7 @@ public class SPDYClient
private final Factory factory;
private SocketAddress bindAddress;
private long maxIdleTime = -1;
private boolean flowControlEnabled = true;
private volatile int initialWindowSize = 65536;
protected SPDYClient(short version, Factory factory)
{
@ -119,14 +119,14 @@ public class SPDYClient
this.maxIdleTime = maxIdleTime;
}
public boolean isFlowControlEnabled()
public int getInitialWindowSize()
{
return flowControlEnabled;
return initialWindowSize;
}
public void setFlowControlEnabled(boolean flowControlEnabled)
public void setInitialWindowSize(int initialWindowSize)
{
this.flowControlEnabled = flowControlEnabled;
this.initialWindowSize = initialWindowSize;
}
protected String selectProtocol(List<String> serverProtocols)
@ -183,6 +183,11 @@ public class SPDYClient
return engine;
}
protected FlowControlStrategy newFlowControlStrategy()
{
return FlowControlStrategyFactory.newFlowControlStrategy(version);
}
public static class Factory extends AggregateLifeCycle
{
private final Map<String, AsyncConnectionFactory> factories = new ConcurrentHashMap<>();
@ -440,8 +445,10 @@ public class SPDYClient
SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, factory.bufferPool, parser, factory);
endPoint.setConnection(connection);
StandardSession session = new StandardSession(client.version, factory.bufferPool, factory.threadPool, factory.scheduler, connection, connection, 1, sessionPromise.listener, generator);
session.setFlowControlEnabled(client.isFlowControlEnabled());
FlowControlStrategy flowControlStrategy = client.newFlowControlStrategy();
StandardSession session = new StandardSession(client.version, factory.bufferPool, factory.threadPool, factory.scheduler, connection, connection, 1, sessionPromise.listener, generator, flowControlStrategy);
session.setWindowSize(client.getInitialWindowSize());
parser.addListener(session);
sessionPromise.completed(session);
connection.setSession(session);

View File

@ -27,6 +27,7 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
@ -53,11 +54,12 @@ public class SPDYServerConnector extends SelectChannelConnector
private final Map<String, AsyncConnectionFactory> factories = new LinkedHashMap<>();
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ByteBufferPool bufferPool = new StandardByteBufferPool();
private final Executor executor = new LazyExecutor();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ServerSessionFrameListener listener;
private final SslContextFactory sslContextFactory;
private AsyncConnectionFactory defaultConnectionFactory;
private volatile boolean flowControlEnabled = true;
private volatile AsyncConnectionFactory defaultConnectionFactory;
private volatile int initialWindowSize = 65536;
public SPDYServerConnector(ServerSessionFrameListener listener)
{
@ -70,6 +72,9 @@ public class SPDYServerConnector extends SelectChannelConnector
this.sslContextFactory = sslContextFactory;
if (sslContextFactory != null)
addBean(sslContextFactory);
defaultConnectionFactory = new ServerSPDYAsyncConnectionFactory(SPDY.V2, bufferPool, executor, scheduler, listener);
putAsyncConnectionFactory("spdy/2", defaultConnectionFactory);
putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, bufferPool, executor, scheduler, listener));
}
public ByteBufferPool getByteBufferPool()
@ -79,17 +84,7 @@ public class SPDYServerConnector extends SelectChannelConnector
public Executor getExecutor()
{
final ThreadPool threadPool = getThreadPool();
if (threadPool instanceof Executor)
return (Executor)threadPool;
return new Executor()
{
@Override
public void execute(Runnable command)
{
threadPool.dispatch(command);
}
};
return executor;
}
public ScheduledExecutorService getScheduler()
@ -97,6 +92,11 @@ public class SPDYServerConnector extends SelectChannelConnector
return scheduler;
}
public ServerSessionFrameListener getServerSessionFrameListener()
{
return listener;
}
public SslContextFactory getSslContextFactory()
{
return sslContextFactory;
@ -106,8 +106,6 @@ public class SPDYServerConnector extends SelectChannelConnector
protected void doStart() throws Exception
{
super.doStart();
defaultConnectionFactory = new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), scheduler, listener);
putAsyncConnectionFactory("spdy/2", defaultConnectionFactory);
logger.info("SPDY support is experimental. Please report feedback at jetty-dev@eclipse.org");
}
@ -166,11 +164,16 @@ public class SPDYServerConnector extends SelectChannelConnector
}
}
protected AsyncConnectionFactory getDefaultAsyncConnectionFactory()
public AsyncConnectionFactory getDefaultAsyncConnectionFactory()
{
return defaultConnectionFactory;
}
public void setDefaultAsyncConnectionFactory(AsyncConnectionFactory defaultConnectionFactory)
{
this.defaultConnectionFactory = defaultConnectionFactory;
}
@Override
protected AsyncConnection newConnection(final SocketChannel channel, AsyncEndPoint endPoint)
{
@ -275,13 +278,25 @@ public class SPDYServerConnector extends SelectChannelConnector
return Collections.unmodifiableCollection(sessions);
}
public boolean isFlowControlEnabled()
public int getInitialWindowSize()
{
return flowControlEnabled;
return initialWindowSize;
}
public void setFlowControlEnabled(boolean flowControl)
public void setInitialWindowSize(int initialWindowSize)
{
this.flowControlEnabled = flowControl;
this.initialWindowSize = initialWindowSize;
}
private class LazyExecutor implements Executor
{
@Override
public void execute(Runnable command)
{
ThreadPool threadPool = getThreadPool();
if (threadPool == null)
throw new RejectedExecutionException();
threadPool.dispatch(command);
}
}
}

View File

@ -57,17 +57,16 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
Parser parser = new Parser(compressionFactory.newDecompressor());
Generator generator = new Generator(bufferPool, compressionFactory.newCompressor());
ServerSessionFrameListener listener = this.listener;
if (listener == null)
listener = newServerSessionFrameListener(endPoint, attachment);
SPDYServerConnector connector = (SPDYServerConnector)attachment;
ServerSessionFrameListener listener = provideServerSessionFrameListener(endPoint, attachment);
SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, bufferPool, parser, listener, connector);
endPoint.setConnection(connection);
final StandardSession session = new StandardSession(version, bufferPool, threadPool, scheduler, connection, connection, 2, listener, generator);
session.setFlowControlEnabled(connector.isFlowControlEnabled());
FlowControlStrategy flowControlStrategy = newFlowControlStrategy(version);
StandardSession session = new StandardSession(version, bufferPool, threadPool, scheduler, connection, connection, 2, listener, generator, flowControlStrategy);
session.setWindowSize(connector.getInitialWindowSize());
parser.addListener(session);
connection.setSession(session);
@ -76,11 +75,16 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
return connection;
}
protected ServerSessionFrameListener newServerSessionFrameListener(AsyncEndPoint endPoint, Object attachment)
protected ServerSessionFrameListener provideServerSessionFrameListener(AsyncEndPoint endPoint, Object attachment)
{
return listener;
}
protected FlowControlStrategy newFlowControlStrategy(short version)
{
return FlowControlStrategyFactory.newFlowControlStrategy(version);
}
private static class ServerSPDYAsyncConnection extends SPDYAsyncConnection
{
private final ServerSessionFrameListener listener;

View File

@ -53,15 +53,17 @@ public abstract class AbstractTest
protected InetSocketAddress startServer(ServerSessionFrameListener listener) throws Exception
{
return startServer(listener,true);
return startServer(SPDY.V2, listener);
}
protected InetSocketAddress startServer(ServerSessionFrameListener listener, boolean flowControl) throws Exception
protected InetSocketAddress startServer(short version, ServerSessionFrameListener listener) throws Exception
{
if (connector == null)
connector = newSPDYServerConnector(listener);
if (listener == null)
listener = connector.getServerSessionFrameListener();
connector.setDefaultAsyncConnectionFactory(new ServerSPDYAsyncConnectionFactory(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), listener));
connector.setPort(0);
connector.setFlowControlEnabled(flowControl);
server = new Server();
server.addConnector(connector);
server.start();
@ -74,6 +76,11 @@ public abstract class AbstractTest
}
protected Session startClient(InetSocketAddress socketAddress, SessionFrameListener listener) throws Exception
{
return startClient(SPDY.V2, socketAddress, listener);
}
protected Session startClient(short version, InetSocketAddress socketAddress, SessionFrameListener listener) throws Exception
{
if (clientFactory == null)
{
@ -82,7 +89,7 @@ public abstract class AbstractTest
clientFactory = newSPDYClientFactory(threadPool);
clientFactory.start();
}
return clientFactory.newSPDYClient(SPDY.V2).connect(socketAddress, listener).get(5, TimeUnit.SECONDS);
return clientFactory.newSPDYClient(version).connect(socketAddress, listener).get(5, TimeUnit.SECONDS);
}
protected SPDYClient.Factory newSPDYClientFactory(Executor threadPool)

View File

@ -15,8 +15,6 @@
*/
package org.eclipse.jetty.spdy;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
@ -31,6 +29,7 @@ import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
@ -43,6 +42,9 @@ import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public class FlowControlTest extends AbstractTest
{
@Test
@ -56,7 +58,7 @@ public class FlowControlTest extends AbstractTest
final AtomicReference<DataInfo> dataInfoRef = new AtomicReference<>();
final CountDownLatch dataLatch = new CountDownLatch(2);
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
Session session = startClient(SPDY.V3, startServer(SPDY.V3, new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
@ -115,7 +117,7 @@ public class FlowControlTest extends AbstractTest
final int windowSize = 1536;
final int length = 5 * windowSize;
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
Session session = startClient(SPDY.V3, startServer(SPDY.V3, new ServerSessionFrameListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsInfo settingsInfo)
@ -218,7 +220,7 @@ public class FlowControlTest extends AbstractTest
final int windowSize = 1536;
final Exchanger<DataInfo> exchanger = new Exchanger<>();
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
Session session = startClient(SPDY.V3, startServer(SPDY.V3, new ServerSessionFrameListener.Adapter()
{
@Override
public void onConnect(Session session)
@ -325,7 +327,7 @@ public class FlowControlTest extends AbstractTest
{
final int windowSize = 1024;
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
Session session = startClient(SPDY.V3, startServer(SPDY.V3, new ServerSessionFrameListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsInfo settingsInfo)
@ -415,24 +417,22 @@ public class FlowControlTest extends AbstractTest
@Test
public void testSendBigFileWithoutFlowControl() throws Exception
{
boolean flowControlEnabled = false;
testSendBigFile(flowControlEnabled);
testSendBigFile(SPDY.V2);
}
@Test
public void testSendBigFileWithFlowControl() throws Exception
{
boolean flowControlEnabled = true;
testSendBigFile(flowControlEnabled);
testSendBigFile(SPDY.V3);
}
private void testSendBigFile(boolean flowControlEnabled) throws Exception, InterruptedException
private void testSendBigFile(short version) throws Exception
{
final int dataSize = 1024 * 1024;
final ByteBufferDataInfo bigByteBufferDataInfo = new ByteBufferDataInfo(ByteBuffer.allocate(dataSize),false);
final CountDownLatch allDataReceivedLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
Session session = startClient(version, startServer(version, new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
@ -441,7 +441,7 @@ public class FlowControlTest extends AbstractTest
stream.data(bigByteBufferDataInfo);
return null;
}
},flowControlEnabled),new SessionFrameListener.Adapter());
}),new SessionFrameListener.Adapter());
session.syn(new SynInfo(false),new StreamFrameListener.Adapter()
{
@ -457,7 +457,7 @@ public class FlowControlTest extends AbstractTest
}
});
assertThat("all data bytes have been received by the client",allDataReceivedLatch.await(5,TimeUnit.SECONDS),is(true));
assertThat("all data bytes have been received by the client", allDataReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
}
private void checkThatWeAreFlowControlStalled(final Exchanger<DataInfo> exchanger)

View File

@ -20,7 +20,6 @@ import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
@ -39,7 +38,6 @@ public class IdleTimeoutTest extends AbstractTest
@Test
public void testServerEnforcingIdleTimeout() throws Exception
{
server = new Server();
connector = newSPDYServerConnector(new ServerSessionFrameListener.Adapter()
{
@Override
@ -49,13 +47,11 @@ public class IdleTimeoutTest extends AbstractTest
return null;
}
});
server.addConnector(connector);
int maxIdleTime = 1000;
connector.setMaxIdleTime(maxIdleTime);
server.start();
final CountDownLatch latch = new CountDownLatch(1);
Session session = startClient(new InetSocketAddress("localhost", connector.getLocalPort()), new SessionFrameListener.Adapter()
Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
@ -72,15 +68,12 @@ public class IdleTimeoutTest extends AbstractTest
@Test
public void testServerEnforcingIdleTimeoutWithUnrespondedStream() throws Exception
{
server = new Server();
connector = newSPDYServerConnector(null);
server.addConnector(connector);
int maxIdleTime = 1000;
connector.setMaxIdleTime(maxIdleTime);
server.start();
final CountDownLatch latch = new CountDownLatch(1);
Session session = startClient(new InetSocketAddress("localhost", connector.getLocalPort()), new SessionFrameListener.Adapter()
Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
@ -99,7 +92,6 @@ public class IdleTimeoutTest extends AbstractTest
public void testServerNotEnforcingIdleTimeoutWithPendingStream() throws Exception
{
final int maxIdleTime = 1000;
server = new Server();
connector = newSPDYServerConnector(new ServerSessionFrameListener.Adapter()
{
@Override
@ -118,12 +110,10 @@ public class IdleTimeoutTest extends AbstractTest
}
}
});
server.addConnector(connector);
connector.setMaxIdleTime(maxIdleTime);
server.start();
final CountDownLatch latch = new CountDownLatch(1);
Session session = startClient(new InetSocketAddress("localhost", connector.getLocalPort()), new SessionFrameListener.Adapter()
Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)

View File

@ -16,12 +16,6 @@
package org.eclipse.jetty.spdy;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -65,6 +59,12 @@ import org.eclipse.jetty.spdy.parser.Parser.Listener;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public class PushStreamTest extends AbstractTest
{
@Test
@ -409,7 +409,7 @@ public class PushStreamTest extends AbstractTest
{
goAwayReceivedLatch.countDown();
}
}, flowControl);
}/*TODO, flowControl*/);
final SocketChannel channel = SocketChannel.open(serverAddress);
final Generator generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory.StandardCompressor());
@ -519,7 +519,7 @@ public class PushStreamTest extends AbstractTest
stream.syn(new SynInfo(false));
return null;
}
}, true),new SessionFrameListener.Adapter()
}),new SessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)

View File

@ -1,11 +1,5 @@
package org.eclipse.jetty.spdy;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -23,12 +17,18 @@ import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class ResetStreamTest extends AbstractTest
{
@Test
public void testResetStreamIsRemoved() throws Exception
{
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter(), true),null);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()/*TODO, true*/),null);
Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
session.rst(new RstInfo(stream.getId(),StreamStatus.CANCEL_STREAM)).get(5,TimeUnit.SECONDS);