376201 - HalfClosed state not handled properly.

This commit is contained in:
Simone Bordet 2012-04-11 12:18:56 +02:00
parent 47c1587166
commit 972087d068
5 changed files with 141 additions and 23 deletions

View File

@ -68,8 +68,9 @@ public interface IStream extends Stream
* of true puts the stream into closed state.</p>
*
* @param close whether the close state should be updated
* @param local whether the close is local or remote
*/
public void updateCloseState(boolean close);
public void updateCloseState(boolean close, boolean local);
/**
* <p>Processes the given control frame,

View File

@ -398,6 +398,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private void onSyn(SynStreamFrame frame)
{
IStream stream = newStream(frame);
stream.updateCloseState(frame.isClose(), false);
logger.debug("Opening {}", stream);
int streamId = frame.getStreamId();
IStream existing = streams.putIfAbsent(streamId, stream);
@ -429,6 +430,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private IStream createStream(SynStreamFrame synStream, StreamFrameListener listener)
{
IStream stream = newStream(synStream);
stream.updateCloseState(synStream.isClose(), true);
stream.setStreamFrameListener(listener);
if (streams.putIfAbsent(synStream.getStreamId(), stream) != null)
{
@ -734,6 +736,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(stream, handler, context, frame, buffer);
if (timeout > 0)
frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
// Special handling for PING frames, they must be sent as soon as possible
if (ControlFrameType.PING == frame.getType())
prepend(frameBytes);
else
append(frameBytes);
}
@ -1089,7 +1096,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
else
{
super.complete();
stream.updateCloseState(dataInfo.isClose());
stream.updateCloseState(dataInfo.isClose(), true);
if (stream.isClosed())
removeStream(stream);
}

View File

@ -51,15 +51,13 @@ public class StandardStream implements IStream
private final AtomicInteger windowSize;
private volatile StreamFrameListener listener;
private volatile OpenState openState = OpenState.SYN_SENT;
private volatile boolean halfClosed;
private volatile boolean closed;
private volatile CloseState closeState = CloseState.OPENED;
public StandardStream(SynStreamFrame frame, ISession session, int windowSize)
{
this.frame = frame;
this.session = session;
this.windowSize = new AtomicInteger(windowSize);
this.halfClosed = frame.isClose();
}
@Override
@ -95,7 +93,8 @@ public class StandardStream implements IStream
public boolean isHalfClosed()
{
return halfClosed;
CloseState closeState = this.closeState;
return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED;
}
@Override
@ -123,14 +122,38 @@ public class StandardStream implements IStream
}
@Override
public void updateCloseState(boolean close)
public void updateCloseState(boolean close, boolean local)
{
if (close)
{
if (isHalfClosed())
closed = true;
switch (closeState)
{
case OPENED:
{
closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
break;
}
case LOCALLY_CLOSED:
{
if (local)
throw new IllegalStateException();
else
halfClosed = true;
closeState = CloseState.CLOSED;
break;
}
case REMOTELY_CLOSED:
{
if (local)
closeState = CloseState.CLOSED;
else
throw new IllegalStateException();
break;
}
default:
{
throw new IllegalStateException();
}
}
}
}
@ -148,7 +171,7 @@ public class StandardStream implements IStream
{
openState = OpenState.REPLY_RECV;
SynReplyFrame synReply = (SynReplyFrame)frame;
updateCloseState(synReply.isClose());
updateCloseState(synReply.isClose(), false);
ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
notifyOnReply(replyInfo);
break;
@ -156,7 +179,7 @@ public class StandardStream implements IStream
case HEADERS:
{
HeadersFrame headers = (HeadersFrame)frame;
updateCloseState(headers.isClose());
updateCloseState(headers.isClose(), false);
HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
notifyOnHeaders(headersInfo);
break;
@ -189,7 +212,7 @@ public class StandardStream implements IStream
return;
}
updateCloseState(frame.isClose());
updateCloseState(frame.isClose(), false);
ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose(), frame.isCompress())
{
@ -287,7 +310,7 @@ public class StandardStream implements IStream
public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
openState = OpenState.REPLY_SENT;
updateCloseState(replyInfo.isClose());
updateCloseState(replyInfo.isClose(), true);
SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
session.control(this, frame, timeout, unit, handler, null);
}
@ -306,7 +329,12 @@ public class StandardStream implements IStream
if (!canSend())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Cannot send DATA frames before a SYN_REPLY frame");
throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
}
if (isLocallyClosed())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream");
}
// Cannot update the close state here, because the data that we send may
@ -328,10 +356,15 @@ public class StandardStream implements IStream
if (!canSend())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Cannot send a HEADERS frame before a SYN_REPLY frame");
throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
}
if (isLocallyClosed())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
}
updateCloseState(headersInfo.isClose());
updateCloseState(headersInfo.isClose(), true);
HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
session.control(this, frame, timeout, unit, handler, null);
}
@ -339,13 +372,19 @@ public class StandardStream implements IStream
@Override
public boolean isClosed()
{
return closed;
return closeState == CloseState.CLOSED;
}
private boolean isLocallyClosed()
{
CloseState closeState = this.closeState;
return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED;
}
@Override
public String toString()
{
return String.format("stream=%d v%d closed=%s", getId(), session.getVersion(), isClosed() ? "true" : isHalfClosed() ? "half" : "false");
return String.format("stream=%d v%d %s", getId(), session.getVersion(), closeState);
}
private boolean canSend()
@ -364,4 +403,9 @@ public class StandardStream implements IStream
{
SYN_SENT, SYN_RECV, REPLY_SENT, REPLY_RECV
}
private enum CloseState
{
OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
}
}

View File

@ -307,7 +307,7 @@ public class FlowControlTest extends AbstractTest
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
Stream stream = session.syn(new SynInfo(true), null).get(5, TimeUnit.SECONDS);
Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
final int length = 5 * windowSize;
stream.data(new BytesDataInfo(new byte[length], true));

View File

@ -8,6 +8,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
@ -21,11 +24,12 @@ import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.frames.ControlFrameType;
import org.eclipse.jetty.spdy.frames.GoAwayFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
import org.eclipse.jetty.spdy.generator.Generator;
import org.junit.Assert;
import org.junit.Test;
public class APIUsageTest extends AbstractTest
public class ProtocolViolationsTest extends AbstractTest
{
@Test
public void testSendDataBeforeReplyIsIllegal() throws Exception
@ -95,4 +99,66 @@ public class APIUsageTest extends AbstractTest
server.close();
}
@Test(expected = IllegalStateException.class)
public void testSendDataAfterCloseIsIllegal() throws Exception
{
Session session = startClient(startServer(null), null);
Stream stream = session.syn(new SynInfo(true), null).get(5, TimeUnit.SECONDS);
stream.data(new StringDataInfo("test", true));
}
@Test(expected = IllegalStateException.class)
public void testSendHeadersAfterCloseIsIllegal() throws Exception
{
Session session = startClient(startServer(null), null);
Stream stream = session.syn(new SynInfo(true), null).get(5, TimeUnit.SECONDS);
stream.headers(new HeadersInfo(new Headers(), true));
}
@Test
public void testDataSentAfterCloseIsDiscardedByRecipient() throws Exception
{
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress("localhost", 0));
Session session = startClient(new InetSocketAddress("localhost", server.socket().getLocalPort()), null);
final CountDownLatch dataLatch = new CountDownLatch(2);
session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataLatch.countDown();
}
});
SocketChannel channel = server.accept();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
channel.read(readBuffer);
readBuffer.flip();
int streamId = readBuffer.getInt(8);
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory.StandardCompressor());
ByteBuffer writeBuffer = generator.control(new SynReplyFrame(SPDY.V2, (byte)0, streamId, new Headers()));
channel.write(writeBuffer);
byte[] bytes = new byte[1];
writeBuffer = generator.data(streamId, bytes.length, new BytesDataInfo(bytes, true));
channel.write(writeBuffer);
// Write again to simulate the faulty condition
writeBuffer.flip();
channel.write(writeBuffer);
Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
writeBuffer = generator.control(new GoAwayFrame(SPDY.V2, 0, SessionStatus.OK.getCode()));
channel.write(writeBuffer);
channel.shutdownOutput();
channel.close();
server.close();
}
}