396036 SPDY send controlFrames even if Stream is reset to avoid breaking the compression context

This commit is contained in:
Thomas Becker 2012-12-07 13:22:19 +01:00
parent db6fdca3b9
commit dc2850c898
2 changed files with 36 additions and 22 deletions

View File

@ -908,7 +908,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
if (buffer != null) if (buffer != null)
{ {
queue.remove(i); queue.remove(i);
if (stream != null && stream.isReset()) if (stream != null && stream.isReset() && !(frameBytes instanceof ControlFrameBytes))
{ {
frameBytes.fail(new StreamException(stream.getId(), StreamStatus.INVALID_STREAM, frameBytes.fail(new StreamException(stream.getId(), StreamStatus.INVALID_STREAM,
"Stream: " + stream + " is reset!")); "Stream: " + stream + " is reset!"));
@ -929,7 +929,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
return; return;
flushing = true; flushing = true;
LOG.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size()); LOG.warn("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
} }
write(buffer, frameBytes); write(buffer, frameBytes);
} }

View File

@ -104,7 +104,7 @@ public class StandardSessionTest
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void setControllerWriteExpectationToFail(final boolean fail) private void setControllerWriteExpectation(final boolean fail)
{ {
doAnswer(new Answer() doAnswer(new Answer()
{ {
@ -125,7 +125,7 @@ public class StandardSessionTest
@Test @Test
public void testStreamIsRemovedFromSessionWhenReset() throws InterruptedException, ExecutionException, TimeoutException public void testStreamIsRemovedFromSessionWhenReset() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
IStream stream = createStream(); IStream stream = createStream();
assertThatStreamIsInSession(stream); assertThatStreamIsInSession(stream);
@ -138,7 +138,7 @@ public class StandardSessionTest
@Test @Test
public void testStreamIsAddedAndRemovedFromSession() throws InterruptedException, ExecutionException, TimeoutException public void testStreamIsAddedAndRemovedFromSession() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
IStream stream = createStream(); IStream stream = createStream();
assertThatStreamIsInSession(stream); assertThatStreamIsInSession(stream);
@ -151,7 +151,7 @@ public class StandardSessionTest
@Test @Test
public void testStreamIsRemovedWhenHeadersWithCloseFlagAreSent() throws InterruptedException, ExecutionException, TimeoutException public void testStreamIsRemovedWhenHeadersWithCloseFlagAreSent() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
IStream stream = createStream(); IStream stream = createStream();
assertThatStreamIsInSession(stream); assertThatStreamIsInSession(stream);
@ -164,7 +164,7 @@ public class StandardSessionTest
@Test @Test
public void testStreamIsUnidirectional() throws InterruptedException, ExecutionException, TimeoutException public void testStreamIsUnidirectional() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
IStream stream = createStream(); IStream stream = createStream();
assertThat("stream is not unidirectional", stream.isUnidirectional(), not(true)); assertThat("stream is not unidirectional", stream.isUnidirectional(), not(true));
@ -175,7 +175,7 @@ public class StandardSessionTest
@Test @Test
public void testPushStreamCreation() throws InterruptedException, ExecutionException, TimeoutException public void testPushStreamCreation() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
Stream stream = createStream(); Stream stream = createStream();
IStream pushStream = createPushStream(stream); IStream pushStream = createPushStream(stream);
@ -186,7 +186,7 @@ public class StandardSessionTest
@Test @Test
public void testPushStreamIsNotClosedWhenAssociatedStreamIsClosed() throws InterruptedException, ExecutionException, TimeoutException public void testPushStreamIsNotClosedWhenAssociatedStreamIsClosed() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
IStream stream = createStream(); IStream stream = createStream();
Stream pushStream = createPushStream(stream); Stream pushStream = createPushStream(stream);
@ -209,7 +209,7 @@ public class StandardSessionTest
@Test @Test
public void testCreatePushStreamOnClosedStream() throws InterruptedException, ExecutionException, TimeoutException public void testCreatePushStreamOnClosedStream() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
IStream stream = createStream(); IStream stream = createStream();
stream.updateCloseState(true, true); stream.updateCloseState(true, true);
@ -237,7 +237,7 @@ public class StandardSessionTest
@Test @Test
public void testPushStreamIsAddedAndRemovedFromParentAndSessionWhenClosed() throws InterruptedException, ExecutionException, TimeoutException public void testPushStreamIsAddedAndRemovedFromParentAndSessionWhenClosed() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
IStream stream = createStream(); IStream stream = createStream();
IStream pushStream = createPushStream(stream); IStream pushStream = createPushStream(stream);
@ -253,7 +253,7 @@ public class StandardSessionTest
@Test @Test
public void testPushStreamIsRemovedWhenReset() throws InterruptedException, ExecutionException, TimeoutException public void testPushStreamIsRemovedWhenReset() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
IStream stream = createStream(); IStream stream = createStream();
IStream pushStream = (IStream)stream.syn(new SynInfo(false)).get(); IStream pushStream = (IStream)stream.syn(new SynInfo(false)).get();
@ -267,7 +267,7 @@ public class StandardSessionTest
@Test @Test
public void testPushStreamWithSynInfoClosedTrue() throws InterruptedException, ExecutionException, TimeoutException public void testPushStreamWithSynInfoClosedTrue() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
IStream stream = createStream(); IStream stream = createStream();
SynInfo synInfo = new SynInfo(headers, true, stream.getPriority()); SynInfo synInfo = new SynInfo(headers, true, stream.getPriority());
@ -282,7 +282,7 @@ public class StandardSessionTest
public void testPushStreamSendHeadersWithCloseFlagIsRemovedFromSessionAndDisassociateFromParent() throws InterruptedException, ExecutionException, public void testPushStreamSendHeadersWithCloseFlagIsRemovedFromSessionAndDisassociateFromParent() throws InterruptedException, ExecutionException,
TimeoutException TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
IStream stream = createStream(); IStream stream = createStream();
SynInfo synInfo = new SynInfo(headers, false, stream.getPriority()); SynInfo synInfo = new SynInfo(headers, false, stream.getPriority());
@ -299,7 +299,7 @@ public class StandardSessionTest
@Test @Test
public void testCreatedAndClosedListenersAreCalledForNewStream() throws InterruptedException, ExecutionException, TimeoutException public void testCreatedAndClosedListenersAreCalledForNewStream() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
final CountDownLatch createdListenerCalledLatch = new CountDownLatch(1); final CountDownLatch createdListenerCalledLatch = new CountDownLatch(1);
final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1); final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
@ -314,7 +314,7 @@ public class StandardSessionTest
@Test @Test
public void testListenerIsCalledForResetStream() throws InterruptedException, ExecutionException, TimeoutException public void testListenerIsCalledForResetStream() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1); final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
session.addListener(new TestStreamListener(null, closedListenerCalledLatch)); session.addListener(new TestStreamListener(null, closedListenerCalledLatch));
@ -326,7 +326,7 @@ public class StandardSessionTest
@Test @Test
public void testCreatedAndClosedListenersAreCalledForNewPushStream() throws InterruptedException, ExecutionException, TimeoutException public void testCreatedAndClosedListenersAreCalledForNewPushStream() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
final CountDownLatch createdListenerCalledLatch = new CountDownLatch(2); final CountDownLatch createdListenerCalledLatch = new CountDownLatch(2);
final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1); final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
@ -342,7 +342,7 @@ public class StandardSessionTest
@Test @Test
public void testListenerIsCalledForResetPushStream() throws InterruptedException, ExecutionException, TimeoutException public void testListenerIsCalledForResetPushStream() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1); final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
session.addListener(new TestStreamListener(null, closedListenerCalledLatch)); session.addListener(new TestStreamListener(null, closedListenerCalledLatch));
@ -384,7 +384,7 @@ public class StandardSessionTest
@Ignore("In V3 we need to rst the stream if we receive data on a remotely half closed stream.") @Ignore("In V3 we need to rst the stream if we receive data on a remotely half closed stream.")
public void receiveDataOnRemotelyHalfClosedStreamResetsStreamInV3() throws InterruptedException, ExecutionException public void receiveDataOnRemotelyHalfClosedStreamResetsStreamInV3() throws InterruptedException, ExecutionException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
IStream stream = (IStream)session.syn(new SynInfo(false), new StreamFrameListener.Adapter()).get(); IStream stream = (IStream)session.syn(new SynInfo(false), new StreamFrameListener.Adapter()).get();
stream.updateCloseState(true, false); stream.updateCloseState(true, false);
@ -395,7 +395,7 @@ public class StandardSessionTest
@Test @Test
public void testReceiveDataOnRemotelyClosedStreamIsIgnored() throws InterruptedException, ExecutionException, TimeoutException public void testReceiveDataOnRemotelyClosedStreamIsIgnored() throws InterruptedException, ExecutionException, TimeoutException
{ {
setControllerWriteExpectationToFail(false); setControllerWriteExpectation(false);
final CountDownLatch onDataCalledLatch = new CountDownLatch(1); final CountDownLatch onDataCalledLatch = new CountDownLatch(1);
Stream stream = session.syn(new SynInfo(false), new StreamFrameListener.Adapter() Stream stream = session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
@ -416,7 +416,7 @@ public class StandardSessionTest
@Test @Test
public void testControllerWriteFailsInEndPointFlush() throws InterruptedException public void testControllerWriteFailsInEndPointFlush() throws InterruptedException
{ {
setControllerWriteExpectationToFail(true); setControllerWriteExpectation(true);
final CountDownLatch failedCalledLatch = new CountDownLatch(2); final CountDownLatch failedCalledLatch = new CountDownLatch(2);
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null); SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
@ -440,6 +440,20 @@ public class StandardSessionTest
assertThat("Callback.failed has been called twice", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true)); assertThat("Callback.failed has been called twice", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true));
} }
@Test
public void testControlFramesAreStillSentForResetStreams() throws InterruptedException, ExecutionException, TimeoutException
{
setControllerWriteExpectation(false);
// This is necessary to keep the compression context of Headers valid
IStream stream = createStream();
session.rst(new RstInfo(stream.getId(), StreamStatus.INVALID_STREAM));
stream.headers(new HeadersInfo(headers,true));
verify(controller, times(3)).write(any(ByteBuffer.class), any(Callback.class));
}
@Test @Test
public void testHeaderFramesAreSentInTheOrderTheyAreCreated() throws ExecutionException, public void testHeaderFramesAreSentInTheOrderTheyAreCreated() throws ExecutionException,
TimeoutException, InterruptedException TimeoutException, InterruptedException
@ -519,7 +533,7 @@ public class StandardSessionTest
private IStream createStream() throws InterruptedException, ExecutionException, TimeoutException private IStream createStream() throws InterruptedException, ExecutionException, TimeoutException
{ {
SynInfo synInfo = new SynInfo(headers, false, (byte)0); SynInfo synInfo = new SynInfo(headers, false, (byte)0);
return (IStream)session.syn(synInfo, new StreamFrameListener.Adapter()).get(50, TimeUnit.SECONDS); return (IStream)session.syn(synInfo, new StreamFrameListener.Adapter()).get(5, TimeUnit.SECONDS);
} }
private IStream createPushStream(Stream stream) throws InterruptedException, ExecutionException, TimeoutException private IStream createPushStream(Stream stream) throws InterruptedException, ExecutionException, TimeoutException