Merge "spdy: improve errorHandling, additional tests for sending big data with/without flow control, test that no more frames are sent on reset pushstreams, test for failing controller.writer(), small improvements"

This commit is contained in:
Simone Bordet 2012-05-26 16:28:32 -04:00 committed by Gerrit Code Review @ Eclipse.org
commit bd4c93e441
17 changed files with 516 additions and 184 deletions

View File

@ -44,7 +44,8 @@ public class Promise<T> implements Handler<T>, Future<T>
latch.countDown();
}
public void failed(Throwable x)
@Override
public void failed(T context, Throwable x)
{
this.failure = x;
latch.countDown();

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.nio.channels.InterruptedByTimeoutException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@ -93,6 +94,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private final AtomicBoolean goAwayReceived = new AtomicBoolean();
private final AtomicInteger lastStreamId = new AtomicInteger();
private boolean flushing;
private boolean failed = false;
private volatile boolean flowControlEnabled = true;
private volatile int windowSize = 65536;
public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler,
@ -735,11 +738,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
try
{
if (stream != null)
{
updateLastStreamId(stream);
if (stream.isClosed())
removeStream(stream);
}
// Synchronization is necessary, since we may have concurrent replies
// and those needs to be generated and enqueued atomically in order
@ -759,9 +758,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
append(frameBytes);
}
}
catch (Throwable x)
catch (Exception x)
{
notifyHandlerFailed(handler, x);
notifyHandlerFailed(handler, context, x);
}
}
@ -787,9 +786,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
logger.debug("Queuing {} on {}",dataInfo,stream);
DataFrameBytes<C> frameBytes = new DataFrameBytes<>(stream,handler,context,dataInfo);
if (timeout > 0)
{
frameBytes.task = scheduler.schedule(frameBytes,timeout,unit);
}
append(frameBytes);
flush();
}
@ -822,9 +819,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
if (buffer != null)
{
queue.remove(i);
// TODO: stream.isUniDirectional() check here is only needed for pushStreams which send a syn with close=true --> find a better solution
if (stream != null && !streams.containsValue(stream) && !stream.isUnidirectional())
if (stream != null && stream.isReset())
{
frameBytes.fail(new StreamException(stream.getId(),StreamStatus.INVALID_STREAM));
return;
}
break;
}
@ -847,34 +846,50 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private void append(FrameBytes frameBytes)
{
boolean fail;
synchronized (queue)
{
int index = queue.size();
while (index > 0)
fail = failed;
if (!fail)
{
FrameBytes element = queue.get(index - 1);
if (element.compareTo(frameBytes) >= 0)
break;
--index;
int index = queue.size();
while (index > 0)
{
FrameBytes element = queue.get(index - 1);
if (element.compareTo(frameBytes) >= 0)
break;
--index;
}
queue.add(index,frameBytes);
}
queue.add(index,frameBytes);
}
if (fail)
frameBytes.fail(new SPDYException("Session failed"));
}
private void prepend(FrameBytes frameBytes)
{
boolean fail;
synchronized (queue)
{
int index = 0;
while (index < queue.size())
fail = failed;
if (!fail)
{
FrameBytes element = queue.get(index);
if (element.compareTo(frameBytes) <= 0)
break;
++index;
int index = 0;
while (index < queue.size())
{
FrameBytes element = queue.get(index);
if (element.compareTo(frameBytes) <= 0)
break;
++index;
}
queue.add(index,frameBytes);
}
queue.add(index,frameBytes);
}
if (fail)
frameBytes.fail(new SPDYException("Session failed"));
}
@Override
@ -889,9 +904,23 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public void failed(Throwable x)
public void failed(FrameBytes frameBytes, Throwable x)
{
throw new SPDYException(x);
List<FrameBytes> frameBytesToFail = new ArrayList<>();
frameBytesToFail.add(frameBytes);
synchronized (queue)
{
failed = true;
String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue",frameBytes,queue.size());
logger.debug(logMessage,x);
frameBytesToFail.addAll(queue);
queue.clear();
flushing = false;
}
for (FrameBytes fb : frameBytesToFail)
fb.fail(x);
}
protected void write(ByteBuffer buffer, Handler<FrameBytes> handler, FrameBytes frameBytes)
@ -951,12 +980,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
private <C> void notifyHandlerFailed(Handler<C> handler, Throwable x)
private <C> void notifyHandlerFailed(Handler<C> handler, C context, Throwable x)
{
try
{
if (handler != null)
handler.failed(x);
handler.failed(context, x);
}
catch (Exception xx)
{
@ -1013,7 +1042,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public void fail(Throwable x)
{
cancelTask();
notifyHandlerFailed(handler,x);
notifyHandlerFailed(handler,context,x);
StandardSession.this.flush();
}
private void cancelTask()
@ -1062,6 +1092,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
// Recipients will know the last good stream id and act accordingly.
close();
}
IStream stream = getStream();
if (stream != null && stream.isClosed())
removeStream(stream);
}
@Override
@ -1112,14 +1145,17 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
bufferPool.release(buffer);
IStream stream = getStream();
stream.updateWindowSize(-size);
boolean flowControlEnabled = StandardSession.this.flowControlEnabled;
if (flowControlEnabled)
stream.updateWindowSize(-size);
if (dataInfo.available() > 0)
{
// We have written a frame out of this DataInfo, but there is more to write.
// We need to keep the correct ordering of frames, to avoid that another
// DataInfo for the same stream is written before this one is finished.
prepend(this);
if (!flowControlEnabled)
flush();
}
else
{
@ -1136,4 +1172,14 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
return String.format("DATA bytes @%x available=%d consumed=%d on %s",dataInfo.hashCode(),dataInfo.available(),dataInfo.consumed(),getStream());
}
}
public boolean isFlowControlEnabled()
{
return flowControlEnabled;
}
public void setFlowControlEnabled(boolean flowControl)
{
this.flowControlEnabled = flowControl;
}
}

View File

@ -345,7 +345,7 @@ public class StandardStream implements IStream
{
if (isClosed() || isReset())
{
handler.failed(new StreamException(getId(),StreamStatus.STREAM_ALREADY_CLOSED));
handler.failed(this, new StreamException(getId(),StreamStatus.STREAM_ALREADY_CLOSED));
return;
}
PushSynInfo pushSynInfo = new PushSynInfo(getId(),synInfo);

View File

@ -28,16 +28,16 @@ public interface Handler<C>
* <p>Callback invoked when the operation completes.</p>
*
* @param context the context
* @see #failed(Throwable)
* @see #failed(Object, Throwable)
*/
public abstract void completed(C context);
/**
* <p>Callback invoked when the operation fails.</p>
*
* @param context the context
* @param x the reason for the operation failure
*/
public void failed(Throwable x);
public void failed(C context, Throwable x);
/**
* <p>Empty implementation of {@link Handler}</p>
@ -52,9 +52,8 @@ public interface Handler<C>
}
@Override
public void failed(Throwable x)
public void failed(C context, Throwable x)
{
throw new SPDYException(x);
}
}
}

View File

@ -72,7 +72,7 @@ public class AsyncTimeoutTest
}
@Override
public void failed(Throwable x)
public void failed(Stream stream, Throwable x)
{
failedLatch.countDown();
}
@ -120,7 +120,7 @@ public class AsyncTimeoutTest
}
@Override
public void failed(Throwable x)
public void failed(Void context, Throwable x)
{
failedLatch.countDown();
}

View File

@ -19,13 +19,10 @@ package org.eclipse.jetty.spdy;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@ -34,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.spdy.StandardSession.FrameBytes;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
@ -55,13 +53,16 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
@RunWith(MockitoJUnitRunner.class)
public class StandardSessionTest
{
@Mock
private ISession sessionMock;
private Controller<FrameBytes> controller;
private ByteBufferPool bufferPool;
private Executor threadPool;
private StandardSession session;
@ -76,13 +77,36 @@ public class StandardSessionTest
threadPool = Executors.newCachedThreadPool();
scheduler = Executors.newSingleThreadScheduledExecutor();
generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory.StandardCompressor());
session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,new TestController(),null,1,null,generator);
session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,controller,null,1,null,generator);
headers = new Headers();
}
@SuppressWarnings("unchecked")
private void setControllerWriteExpectationToFail(final boolean fail)
{
when(controller.write(any(ByteBuffer.class),any(Handler.class),any(StandardSession.FrameBytes.class))).thenAnswer(new Answer<Integer>()
{
public Integer answer(InvocationOnMock invocation)
{
Object[] args = invocation.getArguments();
Handler<StandardSession.FrameBytes> handler = (Handler<FrameBytes>)args[1];
FrameBytes context = (FrameBytes)args[2];
if (fail)
handler.failed(context,new ClosedChannelException());
else
handler.completed(context);
return 0;
}
});
}
@Test
public void testStreamIsRemovedFromSessionWhenReset() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
IStream stream = createStream();
assertThatStreamIsInSession(stream);
assertThat("stream is not reset",stream.isReset(),is(false));
@ -94,6 +118,8 @@ public class StandardSessionTest
@Test
public void testStreamIsAddedAndRemovedFromSession() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
IStream stream = createStream();
assertThatStreamIsInSession(stream);
stream.updateCloseState(true,true);
@ -105,6 +131,8 @@ public class StandardSessionTest
@Test
public void testStreamIsRemovedWhenHeadersWithCloseFlagAreSent() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
IStream stream = createStream();
assertThatStreamIsInSession(stream);
stream.updateCloseState(true,false);
@ -116,6 +144,8 @@ public class StandardSessionTest
@Test
public void testStreamIsUnidirectional() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
IStream stream = createStream();
assertThat("stream is not unidirectional",stream.isUnidirectional(),not(true));
Stream pushStream = createPushStream(stream);
@ -125,6 +155,8 @@ public class StandardSessionTest
@Test
public void testPushStreamCreation() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
Stream stream = createStream();
IStream pushStream = createPushStream(stream);
assertThat("Push stream must be associated to the first stream created",pushStream.getAssociatedStream().getId(),is(stream.getId()));
@ -134,6 +166,8 @@ public class StandardSessionTest
@Test
public void testPushStreamIsNotClosedWhenAssociatedStreamIsClosed() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
IStream stream = createStream();
Stream pushStream = createPushStream(stream);
assertThatStreamIsNotHalfClosed(stream);
@ -155,6 +189,8 @@ public class StandardSessionTest
@Test
public void testCreatePushStreamOnClosedStream() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
IStream stream = createStream();
stream.updateCloseState(true,true);
assertThatStreamIsHalfClosed(stream);
@ -167,15 +203,10 @@ public class StandardSessionTest
{
final CountDownLatch failedLatch = new CountDownLatch(1);
SynInfo synInfo = new SynInfo(headers,false,stream.getPriority());
stream.syn(synInfo,5,TimeUnit.SECONDS,new Handler<Stream>()
stream.syn(synInfo,5,TimeUnit.SECONDS,new Handler.Adapter<Stream>()
{
@Override
public void completed(Stream context)
{
}
@Override
public void failed(Throwable x)
public void failed(Stream stream, Throwable x)
{
failedLatch.countDown();
}
@ -186,6 +217,8 @@ public class StandardSessionTest
@Test
public void testPushStreamIsAddedAndRemovedFromParentAndSessionWhenClosed() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
IStream stream = createStream();
IStream pushStream = createPushStream(stream);
assertThatPushStreamIsHalfClosed(pushStream);
@ -200,6 +233,8 @@ public class StandardSessionTest
@Test
public void testPushStreamIsRemovedWhenReset() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
IStream stream = createStream();
IStream pushStream = (IStream)stream.syn(new SynInfo(false)).get();
assertThatPushStreamIsInSession(pushStream);
@ -212,6 +247,8 @@ public class StandardSessionTest
@Test
public void testPushStreamWithSynInfoClosedTrue() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
IStream stream = createStream();
SynInfo synInfo = new SynInfo(headers,true,stream.getPriority());
IStream pushStream = (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS);
@ -225,6 +262,8 @@ public class StandardSessionTest
public void testPushStreamSendHeadersWithCloseFlagIsRemovedFromSessionAndDisassociateFromParent() throws InterruptedException, ExecutionException,
TimeoutException
{
setControllerWriteExpectationToFail(false);
IStream stream = createStream();
SynInfo synInfo = new SynInfo(headers,false,stream.getPriority());
IStream pushStream = (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS);
@ -240,6 +279,8 @@ public class StandardSessionTest
@Test
public void testCreatedAndClosedListenersAreCalledForNewStream() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
final CountDownLatch createdListenerCalledLatch = new CountDownLatch(1);
final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
session.addListener(new TestStreamListener(createdListenerCalledLatch,closedListenerCalledLatch));
@ -253,6 +294,8 @@ public class StandardSessionTest
@Test
public void testListenerIsCalledForResetStream() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
session.addListener(new TestStreamListener(null,closedListenerCalledLatch));
IStream stream = createStream();
@ -263,6 +306,8 @@ public class StandardSessionTest
@Test
public void testCreatedAndClosedListenersAreCalledForNewPushStream() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
final CountDownLatch createdListenerCalledLatch = new CountDownLatch(2);
final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
session.addListener(new TestStreamListener(createdListenerCalledLatch,closedListenerCalledLatch));
@ -277,6 +322,8 @@ public class StandardSessionTest
@Test
public void testListenerIsCalledForResetPushStream() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
session.addListener(new TestStreamListener(null,closedListenerCalledLatch));
IStream stream = createStream();
@ -313,22 +360,12 @@ public class StandardSessionTest
}
}
@SuppressWarnings("unchecked")
@Test(expected = IllegalStateException.class)
public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
{
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null);
IStream stream = new StandardStream(synStreamFrame,sessionMock,8184,null);
stream.updateCloseState(synStreamFrame.isClose(),true);
assertThat("stream is half closed",stream.isHalfClosed(),is(true));
stream.data(new StringDataInfo("data on half closed stream",true));
verify(sessionMock,never()).data(any(IStream.class),any(DataInfo.class),anyInt(),any(TimeUnit.class),any(Handler.class),any(void.class));
}
@Test
@Ignore("In V3 we need to rst the stream if we receive data on a remotely half closed stream.")
public void receiveDataOnRemotelyHalfClosedStreamResetsStreamInV3() throws InterruptedException, ExecutionException
{
setControllerWriteExpectationToFail(false);
IStream stream = (IStream)session.syn(new SynInfo(false),new StreamFrameListener.Adapter()).get();
stream.updateCloseState(true,false);
assertThat("stream is half closed from remote side",stream.isHalfClosed(),is(true));
@ -338,6 +375,8 @@ public class StandardSessionTest
@Test
public void testReceiveDataOnRemotelyClosedStreamIsIgnored() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectationToFail(false);
final CountDownLatch onDataCalledLatch = new CountDownLatch(1);
Stream stream = session.syn(new SynInfo(false),new StreamFrameListener.Adapter()
{
@ -353,10 +392,39 @@ public class StandardSessionTest
assertThat("onData is never called",onDataCalledLatch.await(1,TimeUnit.SECONDS),not(true));
}
@SuppressWarnings("unchecked")
@Test
public void testControllerWriteFailsInEndPointFlush() throws InterruptedException
{
setControllerWriteExpectationToFail(true);
final CountDownLatch failedCalledLatch = new CountDownLatch(2);
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null);
IStream stream = new StandardStream(synStreamFrame,session,8192,null);
Handler.Adapter<Void> handler = new Handler.Adapter<Void>()
{
@Override
public void failed(Void context, Throwable x)
{
failedCalledLatch.countDown();
}
};
// first data frame should fail on controller.write()
stream.data(new StringDataInfo("data",false),5,TimeUnit.SECONDS,handler);
// second data frame should fail without controller.writer() as the connection is expected to be broken after first controller.write() call failed.
stream.data(new StringDataInfo("data",false),5,TimeUnit.SECONDS,handler);
verify(controller,times(1)).write(any(ByteBuffer.class),any(Handler.class),any(FrameBytes.class));
assertThat("Handler.failed has been called twice",failedCalledLatch.await(5,TimeUnit.SECONDS),is(true));
}
private IStream createStream() throws InterruptedException, ExecutionException, TimeoutException
{
SynInfo synInfo = new SynInfo(headers,false,(byte)0);
return (IStream)session.syn(synInfo,new StreamFrameListener.Adapter()).get(5,TimeUnit.SECONDS);
return (IStream)session.syn(synInfo,new StreamFrameListener.Adapter()).get(50,TimeUnit.SECONDS);
}
private IStream createPushStream(Stream stream) throws InterruptedException, ExecutionException, TimeoutException
@ -365,21 +433,6 @@ public class StandardSessionTest
return (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS);
}
private static class TestController implements Controller<StandardSession.FrameBytes>
{
@Override
public int write(ByteBuffer buffer, Handler<StandardSession.FrameBytes> handler, StandardSession.FrameBytes context)
{
handler.completed(context);
return buffer.remaining();
}
@Override
public void close(boolean onlyOutput)
{
}
}
private void assertThatStreamIsClosed(IStream stream)
{
assertThat("stream is closed",stream.isClosed(),is(true));

View File

@ -18,19 +18,26 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.junit.Test;
@ -101,7 +108,7 @@ public class StandardStreamTest
stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter<Stream>()
{
@Override
public void failed(Throwable x)
public void failed(Stream stream, Throwable x)
{
failedLatch.countDown();
}
@ -109,4 +116,15 @@ public class StandardStreamTest
assertThat("PushStream creation failed", failedLatch.getCount(), equalTo(0L));
}
@SuppressWarnings("unchecked")
@Test(expected = IllegalStateException.class)
public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
{
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null);
IStream stream = new StandardStream(synStreamFrame,session,8192,null);
stream.updateCloseState(synStreamFrame.isClose(),true);
assertThat("stream is half closed",stream.isHalfClosed(),is(true));
stream.data(new StringDataInfo("data on half closed stream",true));
verify(session,never()).data(any(IStream.class),any(DataInfo.class),anyInt(),any(TimeUnit.class),any(Handler.class),any(void.class));
}
}

View File

@ -41,6 +41,7 @@ public class HTTPSPDYServerConnector extends SPDYServerConnector
super(null, sslContextFactory);
// Override the default connection factory for non-SSL connections
defaultConnectionFactory = new ServerHTTPAsyncConnectionFactory(this);
setFlowControlEnabled(false);
}
@Override

View File

@ -122,7 +122,8 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
catch (Exception x)
{
close(false);
handler.failed(x);
handler.failed(context, x);
return -1;
}
finally
{

View File

@ -398,7 +398,7 @@ public class SPDYClient
}
catch (RuntimeException x)
{
sessionPromise.failed(x);
sessionPromise.failed(null,x);
throw x;
}
}

View File

@ -58,6 +58,7 @@ public class SPDYServerConnector extends SelectChannelConnector
private final ServerSessionFrameListener listener;
private final SslContextFactory sslContextFactory;
private AsyncConnectionFactory defaultConnectionFactory;
private volatile boolean flowControlEnabled = true;
public SPDYServerConnector(ServerSessionFrameListener listener)
{
@ -287,4 +288,14 @@ public class SPDYServerConnector extends SelectChannelConnector
{
return Collections.unmodifiableCollection(sessions);
}
public boolean isFlowControlEnabled()
{
return flowControlEnabled;
}
public void setFlowControlEnabled(boolean flowControl)
{
this.flowControlEnabled = flowControl;
}
}

View File

@ -67,6 +67,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
endPoint.setConnection(connection);
final StandardSession session = new StandardSession(version, bufferPool, threadPool, scheduler, connection, connection, 2, listener, generator);
session.setFlowControlEnabled(connector.isFlowControlEnabled());
parser.addListener(session);
connection.setSession(session);

View File

@ -52,10 +52,16 @@ public abstract class AbstractTest
protected SPDYServerConnector connector;
protected InetSocketAddress startServer(ServerSessionFrameListener listener) throws Exception
{
return startServer(listener,true);
}
protected InetSocketAddress startServer(ServerSessionFrameListener listener, boolean flowControl) throws Exception
{
if (connector == null)
connector = newSPDYServerConnector(listener);
connector.setPort(0);
connector.setFlowControlEnabled(flowControl);
server = new Server();
server.addConnector(connector);
server.start();

View File

@ -38,7 +38,6 @@ import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.DataFrame;
import org.eclipse.jetty.spdy.frames.GoAwayFrame;
import org.eclipse.jetty.spdy.frames.RstStreamFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
@ -145,14 +144,12 @@ public class ClosedStreamTest extends AbstractTest
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyReceivedLatch.countDown();
super.onReply(stream,replyInfo);
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
clientReceivedDataLatch.countDown();
super.onData(stream,dataInfo);
}
}).get();
assertThat("reply has been received by client",replyReceivedLatch.await(5,TimeUnit.SECONDS),is(true));
@ -204,7 +201,6 @@ public class ClosedStreamTest extends AbstractTest
public void onData(Stream stream, DataInfo dataInfo)
{
serverDataReceivedLatch.countDown();
super.onData(stream,dataInfo);
}
};
}
@ -250,13 +246,6 @@ public class ClosedStreamTest extends AbstractTest
{
clientResetReceivedLatch.countDown();
}
super.onControlFrame(frame);
}
@Override
public void onDataFrame(DataFrame frame, ByteBuffer data)
{
super.onDataFrame(frame,data);
}
});
ByteBuffer response = ByteBuffer.allocate(28);

View File

@ -15,6 +15,8 @@
*/
package org.eclipse.jetty.spdy;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
@ -25,6 +27,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
@ -183,43 +186,22 @@ public class FlowControlTest extends AbstractTest
});
DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
// Check that we are flow control stalled
expectException(TimeoutException.class, new Callable<DataInfo>()
{
@Override
public DataInfo call() throws Exception
{
return exchanger.exchange(null, 1, TimeUnit.SECONDS);
}
});
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(windowSize, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
// Check that we are flow control stalled
expectException(TimeoutException.class, new Callable<DataInfo>()
{
@Override
public DataInfo call() throws Exception
{
return exchanger.exchange(null, 1, TimeUnit.SECONDS);
}
});
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(0, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.consume(dataInfo.length());
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
// Check that we are flow control stalled
expectException(TimeoutException.class, new Callable<DataInfo>()
{
@Override
public DataInfo call() throws Exception
{
return exchanger.exchange(null, 1, TimeUnit.SECONDS);
}
});
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed());
dataInfo.asByteBuffer(true);
@ -312,43 +294,22 @@ public class FlowControlTest extends AbstractTest
stream.data(new BytesDataInfo(new byte[length], true));
DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
// Check that we are flow control stalled
expectException(TimeoutException.class, new Callable<DataInfo>()
{
@Override
public DataInfo call() throws Exception
{
return exchanger.exchange(null, 1, TimeUnit.SECONDS);
}
});
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(windowSize, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
// Check that we are flow control stalled
expectException(TimeoutException.class, new Callable<DataInfo>()
{
@Override
public DataInfo call() throws Exception
{
return exchanger.exchange(null, 1, TimeUnit.SECONDS);
}
});
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(0, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.consume(dataInfo.length());
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
// Check that we are flow control stalled
expectException(TimeoutException.class, new Callable<DataInfo>()
{
@Override
public DataInfo call() throws Exception
{
return exchanger.exchange(null, 1, TimeUnit.SECONDS);
}
});
checkThatWeAreFlowControlStalled(exchanger);
Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed());
dataInfo.asByteBuffer(true);
@ -451,6 +412,66 @@ public class FlowControlTest extends AbstractTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testSendBigFileWithoutFlowControl() throws Exception
{
boolean flowControlEnabled = false;
testSendBigFile(flowControlEnabled);
}
@Test
public void testSendBigFileWithFlowControl() throws Exception
{
boolean flowControlEnabled = true;
testSendBigFile(flowControlEnabled);
}
private void testSendBigFile(boolean flowControlEnabled) throws Exception, InterruptedException
{
final int dataSize = 1024 * 1024;
final ByteBufferDataInfo bigByteBufferDataInfo = new ByteBufferDataInfo(ByteBuffer.allocate(dataSize),false);
final CountDownLatch allDataReceivedLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(false));
stream.data(bigByteBufferDataInfo);
return null;
}
},flowControlEnabled),new SessionFrameListener.Adapter());
session.syn(new SynInfo(false),new StreamFrameListener.Adapter()
{
private int dataBytesReceived;
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataBytesReceived = dataBytesReceived + dataInfo.length();
dataInfo.consume(dataInfo.length());
if (dataBytesReceived == dataSize)
allDataReceivedLatch.countDown();
}
});
assertThat("all data bytes have been received by the client",allDataReceivedLatch.await(5,TimeUnit.SECONDS),is(true));
}
private void checkThatWeAreFlowControlStalled(final Exchanger<DataInfo> exchanger)
{
expectException(TimeoutException.class, new Callable<DataInfo>()
{
@Override
public DataInfo call() throws Exception
{
return exchanger.exchange(null, 1, TimeUnit.SECONDS);
}
});
}
private void expectException(Class<? extends Exception> exception, Callable<DataInfo> command)
{
try

View File

@ -16,34 +16,55 @@
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.DataFrame;
import org.eclipse.jetty.spdy.frames.GoAwayFrame;
import org.eclipse.jetty.spdy.frames.RstStreamFrame;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.spdy.parser.Parser.Listener;
import org.junit.Assert;
import org.junit.Test;
public class PushStreamTest extends AbstractTest
{
@Test
@ -66,10 +87,10 @@ public class PushStreamTest extends AbstractTest
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
assertThat("streamId is even", stream.getId() % 2, is(0));
assertThat("stream is unidirectional", stream.isUnidirectional(), is(true));
assertThat("stream is closed", stream.isClosed(), is(true));
assertThat("stream has associated stream", stream.getAssociatedStream(), notNullValue());
assertThat("streamId is even",stream.getId() % 2,is(0));
assertThat("stream is unidirectional",stream.isUnidirectional(),is(true));
assertThat("stream is closed",stream.isClosed(),is(true));
assertThat("stream has associated stream",stream.getAssociatedStream(),notNullValue());
try
{
stream.reply(new ReplyInfo(false));
@ -85,10 +106,10 @@ public class PushStreamTest extends AbstractTest
}
});
Stream stream = clientSession.syn(new SynInfo(true), null).get();
assertThat("onSyn has been called", pushStreamLatch.await(5, TimeUnit.SECONDS), is(true));
Stream stream = clientSession.syn(new SynInfo(true),null).get();
assertThat("onSyn has been called",pushStreamLatch.await(5,TimeUnit.SECONDS),is(true));
Stream pushStream = pushStreamRef.get();
assertThat("main stream and associated stream are the same", stream, sameInstance(pushStream.getAssociatedStream()));
assertThat("main stream and associated stream are the same",stream,sameInstance(pushStream.getAssociatedStream()));
}
@Test
@ -221,7 +242,7 @@ public class PushStreamTest extends AbstractTest
stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter<Stream>()
{
@Override
public void failed(Throwable x)
public void failed(Stream stream, Throwable x)
{
pushStreamFailedLatch.countDown();
}
@ -321,6 +342,170 @@ public class PushStreamTest extends AbstractTest
return bytes;
}
@Test
public void testClientResetsStreamAfterPushSynDoesPreventSendingDataFramesWithFlowControl() throws Exception
{
final boolean flowControl = true;
testNoMoreFramesAreSentOnPushStreamAfterClientResetsThePushStream(flowControl);
}
@Test
public void testClientResetsStreamAfterPushSynDoesPreventSendingDataFramesWithoutFlowControl() throws Exception
{
final boolean flowControl = false;
testNoMoreFramesAreSentOnPushStreamAfterClientResetsThePushStream(flowControl);
}
private volatile boolean read = true;
private void testNoMoreFramesAreSentOnPushStreamAfterClientResetsThePushStream(final boolean flowControl) throws Exception, IOException, InterruptedException
{
final short version = SPDY.V3;
final AtomicBoolean unexpectedExceptionOccured = new AtomicBoolean(false);
final CountDownLatch resetReceivedLatch = new CountDownLatch(1);
final CountDownLatch allDataFramesReceivedLatch = new CountDownLatch(1);
final CountDownLatch goAwayReceivedLatch = new CountDownLatch(1);
final int dataSizeInBytes = 1024 * 256;
final byte[] transferBytes = createHugeByteArray(dataSizeInBytes);
InetSocketAddress serverAddress = startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(final Stream stream, SynInfo synInfo)
{
new Thread(new Runnable()
{
@Override
public void run()
{
Stream pushStream=null;
try
{
stream.reply(new ReplyInfo(false));
pushStream = stream.syn(new SynInfo(false)).get();
resetReceivedLatch.await(5,TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException e)
{
e.printStackTrace();
unexpectedExceptionOccured.set(true);
}
pushStream.data(new BytesDataInfo(transferBytes,true));
stream.data(new StringDataInfo("close",true));
}
}).start();
return null;
}
@Override
public void onRst(Session session, RstInfo rstInfo)
{
resetReceivedLatch.countDown();
}
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
goAwayReceivedLatch.countDown();
}
}, flowControl);
final SocketChannel channel = SocketChannel.open(serverAddress);
final Generator generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory.StandardCompressor());
int streamId = 1;
ByteBuffer writeBuffer = generator.control(new SynStreamFrame(version,(byte)0,streamId,0,(byte)0,new Headers()));
channel.write(writeBuffer);
assertThat("writeBuffer is fully written",writeBuffer.hasRemaining(), is(false));
final Parser parser = new Parser(new StandardCompressionFactory.StandardDecompressor());
parser.addListener(new Listener.Adapter()
{
int bytesRead = 0;
@Override
public void onControlFrame(ControlFrame frame)
{
if(frame instanceof SynStreamFrame){
int pushStreamId = ((SynStreamFrame)frame).getStreamId();
ByteBuffer writeBuffer = generator.control(new RstStreamFrame(version,pushStreamId,StreamStatus.CANCEL_STREAM.getCode(version)));
try
{
channel.write(writeBuffer);
}
catch (IOException e)
{
e.printStackTrace();
unexpectedExceptionOccured.set(true);
}
}
}
@Override
public void onDataFrame(DataFrame frame, ByteBuffer data)
{
if(frame.getStreamId() == 2)
bytesRead = bytesRead + frame.getLength();
if(bytesRead == dataSizeInBytes){
allDataFramesReceivedLatch.countDown();
return;
}
if (flowControl)
{
ByteBuffer writeBuffer = generator.control(new WindowUpdateFrame(version,frame.getStreamId(),frame.getLength()));
try
{
channel.write(writeBuffer);
}
catch (IOException e)
{
e.printStackTrace();
unexpectedExceptionOccured.set(true);
}
}
}
});
Thread reader = new Thread(new Runnable()
{
@Override
public void run()
{
ByteBuffer readBuffer = ByteBuffer.allocate(dataSizeInBytes*2);
while (read)
{
try
{
channel.read(readBuffer);
}
catch (IOException e)
{
e.printStackTrace();
unexpectedExceptionOccured.set(true);
}
readBuffer.flip();
parser.parse(readBuffer);
readBuffer.clear();
}
}
});
reader.start();
read = false;
assertThat("no unexpected exceptions occured", unexpectedExceptionOccured.get(), is(false));
assertThat("not all dataframes have been received as the pushstream has been reset by the client.",allDataFramesReceivedLatch.await(streamId,TimeUnit.SECONDS),is(false));
ByteBuffer buffer = generator.control(new GoAwayFrame(version, streamId, SessionStatus.OK.getCode()));
channel.write(buffer);
Assert.assertThat(buffer.hasRemaining(), is(false));
assertThat("GoAway frame is received by server", goAwayReceivedLatch.await(5,TimeUnit.SECONDS), is(true));
channel.shutdownOutput();
channel.close();
}
@Test
public void testOddEvenStreamIds() throws Exception
{
@ -334,7 +519,7 @@ public class PushStreamTest extends AbstractTest
stream.syn(new SynInfo(false));
return null;
}
}),new SessionFrameListener.Adapter()
}, true),new SessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
@ -367,6 +552,6 @@ public class PushStreamTest extends AbstractTest
private void assertThatNoExceptionOccured(final CountDownLatch exceptionCountDownLatch) throws InterruptedException
{
assertThat("No exception occured", exceptionCountDownLatch.await(1,TimeUnit.SECONDS),is(false));
assertThat("No exception occured",exceptionCountDownLatch.await(1,TimeUnit.SECONDS),is(false));
}
}

View File

@ -28,7 +28,7 @@ public class ResetStreamTest extends AbstractTest
@Test
public void testResetStreamIsRemoved() throws Exception
{
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()),null);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter(), true),null);
Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
session.rst(new RstInfo(stream.getId(),StreamStatus.CANCEL_STREAM)).get(5,TimeUnit.SECONDS);
@ -169,7 +169,7 @@ public class ResetStreamTest extends AbstractTest
stream.data(new StringDataInfo("2nd dataframe",false),5L,TimeUnit.SECONDS,new Handler.Adapter<Void>()
{
@Override
public void failed(Throwable x)
public void failed(Void context, Throwable x)
{
failLatch.countDown();
}