From 972087d0684fa46341e7c6ef32dcd95c407400c1 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 11 Apr 2012 12:18:56 +0200 Subject: [PATCH] 376201 - HalfClosed state not handled properly. --- .../java/org/eclipse/jetty/spdy/IStream.java | 3 +- .../eclipse/jetty/spdy/StandardSession.java | 11 ++- .../eclipse/jetty/spdy/StandardStream.java | 80 ++++++++++++++----- .../eclipse/jetty/spdy/FlowControlTest.java | 2 +- ...eTest.java => ProtocolViolationsTest.java} | 68 +++++++++++++++- 5 files changed, 141 insertions(+), 23 deletions(-) rename jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/{APIUsageTest.java => ProtocolViolationsTest.java} (58%) diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java index 86e3bd98066..2291cf78b64 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java @@ -68,8 +68,9 @@ public interface IStream extends Stream * of true puts the stream into closed state.

* * @param close whether the close state should be updated + * @param local whether the close is local or remote */ - public void updateCloseState(boolean close); + public void updateCloseState(boolean close, boolean local); /** *

Processes the given control frame, diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index 4caa3df86c4..8b1a53dda42 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -398,6 +398,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler frameBytes = new ControlFrameBytes<>(stream, handler, context, frame, buffer); if (timeout > 0) frameBytes.task = scheduler.schedule(frameBytes, timeout, unit); - append(frameBytes); + + // Special handling for PING frames, they must be sent as soon as possible + if (ControlFrameType.PING == frame.getType()) + prepend(frameBytes); + else + append(frameBytes); } flush(); @@ -1089,7 +1096,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler handler) { openState = OpenState.REPLY_SENT; - updateCloseState(replyInfo.isClose()); + updateCloseState(replyInfo.isClose(), true); SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders()); session.control(this, frame, timeout, unit, handler, null); } @@ -306,7 +329,12 @@ public class StandardStream implements IStream if (!canSend()) { session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR)); - throw new IllegalStateException("Cannot send DATA frames before a SYN_REPLY frame"); + throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame"); + } + if (isLocallyClosed()) + { + session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR)); + throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream"); } // Cannot update the close state here, because the data that we send may @@ -328,10 +356,15 @@ public class StandardStream implements IStream if (!canSend()) { session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR)); - throw new IllegalStateException("Cannot send a HEADERS frame before a SYN_REPLY frame"); + throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame"); + } + if (isLocallyClosed()) + { + session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR)); + throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream"); } - updateCloseState(headersInfo.isClose()); + updateCloseState(headersInfo.isClose(), true); HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders()); session.control(this, frame, timeout, unit, handler, null); } @@ -339,13 +372,19 @@ public class StandardStream implements IStream @Override public boolean isClosed() { - return closed; + return closeState == CloseState.CLOSED; + } + + private boolean isLocallyClosed() + { + CloseState closeState = this.closeState; + return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED; } @Override public String toString() { - return String.format("stream=%d v%d closed=%s", getId(), session.getVersion(), isClosed() ? "true" : isHalfClosed() ? "half" : "false"); + return String.format("stream=%d v%d %s", getId(), session.getVersion(), closeState); } private boolean canSend() @@ -364,4 +403,9 @@ public class StandardStream implements IStream { SYN_SENT, SYN_RECV, REPLY_SENT, REPLY_RECV } + + private enum CloseState + { + OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED + } } diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java index 6e246409d58..f1908e73bcc 100644 --- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java @@ -307,7 +307,7 @@ public class FlowControlTest extends AbstractTest Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); - Stream stream = session.syn(new SynInfo(true), null).get(5, TimeUnit.SECONDS); + Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS); final int length = 5 * windowSize; stream.data(new BytesDataInfo(new byte[length], true)); diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/APIUsageTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java similarity index 58% rename from jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/APIUsageTest.java rename to jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java index 36a0a2cb7bf..92e642f2b13 100644 --- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/APIUsageTest.java +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java @@ -8,6 +8,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.spdy.api.BytesDataInfo; +import org.eclipse.jetty.spdy.api.DataInfo; +import org.eclipse.jetty.spdy.api.Headers; +import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Session; @@ -21,11 +24,12 @@ import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.spdy.frames.ControlFrameType; import org.eclipse.jetty.spdy.frames.GoAwayFrame; +import org.eclipse.jetty.spdy.frames.SynReplyFrame; import org.eclipse.jetty.spdy.generator.Generator; import org.junit.Assert; import org.junit.Test; -public class APIUsageTest extends AbstractTest +public class ProtocolViolationsTest extends AbstractTest { @Test public void testSendDataBeforeReplyIsIllegal() throws Exception @@ -95,4 +99,66 @@ public class APIUsageTest extends AbstractTest server.close(); } + + @Test(expected = IllegalStateException.class) + public void testSendDataAfterCloseIsIllegal() throws Exception + { + Session session = startClient(startServer(null), null); + Stream stream = session.syn(new SynInfo(true), null).get(5, TimeUnit.SECONDS); + stream.data(new StringDataInfo("test", true)); + } + + @Test(expected = IllegalStateException.class) + public void testSendHeadersAfterCloseIsIllegal() throws Exception + { + Session session = startClient(startServer(null), null); + Stream stream = session.syn(new SynInfo(true), null).get(5, TimeUnit.SECONDS); + stream.headers(new HeadersInfo(new Headers(), true)); + } + + @Test + public void testDataSentAfterCloseIsDiscardedByRecipient() throws Exception + { + ServerSocketChannel server = ServerSocketChannel.open(); + server.bind(new InetSocketAddress("localhost", 0)); + + Session session = startClient(new InetSocketAddress("localhost", server.socket().getLocalPort()), null); + final CountDownLatch dataLatch = new CountDownLatch(2); + session.syn(new SynInfo(true), new StreamFrameListener.Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataLatch.countDown(); + } + }); + + SocketChannel channel = server.accept(); + ByteBuffer readBuffer = ByteBuffer.allocate(1024); + channel.read(readBuffer); + readBuffer.flip(); + int streamId = readBuffer.getInt(8); + + Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory.StandardCompressor()); + + ByteBuffer writeBuffer = generator.control(new SynReplyFrame(SPDY.V2, (byte)0, streamId, new Headers())); + channel.write(writeBuffer); + + byte[] bytes = new byte[1]; + writeBuffer = generator.data(streamId, bytes.length, new BytesDataInfo(bytes, true)); + channel.write(writeBuffer); + + // Write again to simulate the faulty condition + writeBuffer.flip(); + channel.write(writeBuffer); + + Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS)); + + writeBuffer = generator.control(new GoAwayFrame(SPDY.V2, 0, SessionStatus.OK.getCode())); + channel.write(writeBuffer); + channel.shutdownOutput(); + channel.close(); + + server.close(); + } }