Reads and consumes the content bytes of this {@link DataInfo} into the given {@link ByteBuffer}.
*
- * @param output the {@link ByteBuffer} to copy to bytes into
+ * @param output the {@link ByteBuffer} to copy the bytes into
* @return the number of bytes copied
* @see #consume(int)
*/
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java
index 75c27d5a02f..5e2e5e281d7 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java
@@ -75,7 +75,7 @@ public interface Session
* @see #syn(SynInfo, StreamFrameListener, long, TimeUnit, Handler)
*/
public Future syn(SynInfo synInfo, StreamFrameListener listener);
-
+
/**
* Sends asynchronously a SYN_FRAME to create a new {@link Stream SPDY stream}.
* Callers may pass a non-null completion handler to be notified of when the
@@ -90,6 +90,7 @@ public interface Session
*/
public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Handler handler);
+
/**
* Sends asynchronously a RST_STREAM to abort a stream.
* Callers may use the returned future to wait for the reset to be sent.
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java
index c25bd4551b0..6a138297049 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java
@@ -17,6 +17,7 @@
package org.eclipse.jetty.spdy.api;
import java.nio.channels.WritePendingException;
+import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -79,12 +80,35 @@ public interface Stream
* @return the priority of this stream
*/
public byte getPriority();
-
+
/**
* @return the session this stream is associated to
*/
public Session getSession();
+ /**
+ * Initiate a unidirectional spdy pushstream associated to this stream asynchronously
+ *
Callers may use the returned future to get the pushstream once it got created
+ *
+ * @param synInfo the metadata to send on stream creation
+ * @return a future containing the stream once it got established
+ * @see #syn(SynInfo, long, TimeUnit, Handler)
+ */
+ public Future syn(SynInfo synInfo);
+
+ /**
+ * Initiate a unidirectional spdy pushstream associated to this stream asynchronously
+ *
Callers may pass a non-null completion handler to be notified of when the
+ * pushstream has been established.
+ *
+ * @param synInfo the metadata to send on stream creation
+ * @param timeout the operation's timeout
+ * @param unit the timeout's unit
+ * @param handler the completion handler that gets notified once the pushstream is established
+ * @see #syn(SynInfo)
+ */
+ public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler handler);
+
/**
* Sends asynchronously a SYN_REPLY frame in response to a SYN_STREAM frame.
* Callers may use the returned future to wait for the reply to be actually sent.
@@ -161,6 +185,16 @@ public interface Stream
*/
public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler handler);
+ /**
+ * @return whether this stream is unidirectional or not
+ */
+ public boolean isUnidirectional();
+
+ /**
+ * @return whether this stream has been reset
+ */
+ public boolean isReset();
+
/**
* @return whether this stream has been closed by both parties
* @see #isHalfClosed()
@@ -171,7 +205,6 @@ public interface Stream
* @return whether this stream has been closed by one party only
* @see #isClosed() * @param timeout the timeout for the stream creation
* @param unit the timeout's unit
-
*/
public boolean isHalfClosed();
@@ -196,4 +229,15 @@ public interface Stream
* @see #setAttribute(String, Object)
*/
public Object removeAttribute(String key);
+
+ /**
+ * @return the associated parent stream or null if this is not an associated stream
+ */
+ public Stream getAssociatedStream();
+
+ /**
+ * @return associated child streams or an empty set if no associated streams exist
+ */
+ public Set getPushedStreams();
+
}
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java
index 08c18e70bf1..c51a0016dd0 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java
@@ -28,11 +28,8 @@ public class SynInfo
* @see #getFlags()
*/
public static final byte FLAG_CLOSE = 1;
- public static final byte FLAG_UNIDIRECTIONAL = 2;
private final boolean close;
- private final boolean unidirectional;
- private final int associatedStreamId;
private final byte priority;
private final Headers headers;
@@ -56,28 +53,28 @@ public class SynInfo
*/
public SynInfo(Headers headers, boolean close)
{
- this(headers, close, false, 0, (byte)0);
+ this(headers, close, (byte)0);
}
/**
- * Creates a {@link ReplyInfo} instance with the given headers and the given close flag,
- * the given unidirectional flag, the given associated stream, and with the given priority.
- *
- * @param headers the {@link Headers}
- * @param close the value of the close flag
- * @param unidirectional the value of the unidirectional flag
- * @param associatedStreamId the associated stream id
- * @param priority the priority
+ *
+ * Creates a {@link ReplyInfo} instance with the given headers, the given close flag and with the given priority.
+ *
+ *
+ * @param headers
+ * the {@link Headers}
+ * @param close
+ * the value of the close flag
+ * @param priority
+ * the priority
*/
- public SynInfo(Headers headers, boolean close, boolean unidirectional, int associatedStreamId, byte priority)
+ public SynInfo(Headers headers, boolean close, byte priority)
{
this.close = close;
- this.unidirectional = unidirectional;
- this.associatedStreamId = associatedStreamId;
this.priority = priority;
this.headers = headers;
}
-
+
/**
* @return the value of the close flag
*/
@@ -86,22 +83,6 @@ public class SynInfo
return close;
}
- /**
- * @return the value of the unidirectional flag
- */
- public boolean isUnidirectional()
- {
- return unidirectional;
- }
-
- /**
- * @return the associated stream id
- */
- public int getAssociatedStreamId()
- {
- return associatedStreamId;
- }
-
/**
* @return the priority
*/
@@ -117,17 +98,14 @@ public class SynInfo
{
return headers;
}
-
+
/**
- * @return the close and unidirectional flags as integer
+ * @return the close flag as integer
* @see #FLAG_CLOSE
- * @see #FLAG_UNIDIRECTIONAL
*/
public byte getFlags()
{
- byte flags = isClose() ? FLAG_CLOSE : 0;
- flags += isUnidirectional() ? FLAG_UNIDIRECTIONAL : 0;
- return flags;
+ return isClose() ? FLAG_CLOSE : 0;
}
@Override
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java
index 60cee7ce444..334b8166ac6 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java
@@ -29,17 +29,17 @@ public class RstStreamFrame extends ControlFrame
this.streamId = streamId;
this.statusCode = statusCode;
}
-
+
public int getStreamId()
{
return streamId;
}
-
+
public int getStatusCode()
{
return statusCode;
}
-
+
@Override
public String toString()
{
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java
index 427cb588a95..1b4089541c3 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java
@@ -16,6 +16,7 @@
package org.eclipse.jetty.spdy.frames;
+import org.eclipse.jetty.spdy.PushSynInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.SynInfo;
@@ -62,7 +63,7 @@ public class SynStreamFrame extends ControlFrame
public boolean isUnidirectional()
{
- return (getFlags() & SynInfo.FLAG_UNIDIRECTIONAL) == SynInfo.FLAG_UNIDIRECTIONAL;
+ return (getFlags() & PushSynInfo.FLAG_UNIDIRECTIONAL) == PushSynInfo.FLAG_UNIDIRECTIONAL;
}
@Override
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java
index 986b3751414..f18f0c3bb77 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java
@@ -19,6 +19,7 @@ package org.eclipse.jetty.spdy.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.CompressionFactory;
+import org.eclipse.jetty.spdy.PushSynInfo;
import org.eclipse.jetty.spdy.StreamException;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.SPDY;
@@ -131,7 +132,7 @@ public class SynStreamBodyParser extends ControlFrameBodyParser
{
byte flags = controlFrameParser.getFlags();
// TODO: can it be both FIN and UNIDIRECTIONAL ?
- if (flags != 0 && flags != SynInfo.FLAG_CLOSE && flags != SynInfo.FLAG_UNIDIRECTIONAL)
+ if (flags != 0 && flags != SynInfo.FLAG_CLOSE && flags != PushSynInfo.FLAG_UNIDIRECTIONAL)
throw new IllegalArgumentException("Invalid flag " + flags + " for frame " + ControlFrameType.SYN_STREAM);
SynStreamFrame frame = new SynStreamFrame(version, flags, streamId, associatedStreamId, priority, new Headers(headers, true));
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
new file mode 100644
index 00000000000..c999c34c59b
--- /dev/null
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
@@ -0,0 +1,457 @@
+/*
+ * Copyright (c) 2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.eclipse.jetty.spdy;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.eclipse.jetty.spdy.api.DataInfo;
+import org.eclipse.jetty.spdy.api.Handler;
+import org.eclipse.jetty.spdy.api.Headers;
+import org.eclipse.jetty.spdy.api.HeadersInfo;
+import org.eclipse.jetty.spdy.api.RstInfo;
+import org.eclipse.jetty.spdy.api.SPDY;
+import org.eclipse.jetty.spdy.api.Session;
+import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.StreamStatus;
+import org.eclipse.jetty.spdy.api.StringDataInfo;
+import org.eclipse.jetty.spdy.api.SynInfo;
+import org.eclipse.jetty.spdy.frames.DataFrame;
+import org.eclipse.jetty.spdy.frames.SynReplyFrame;
+import org.eclipse.jetty.spdy.frames.SynStreamFrame;
+import org.eclipse.jetty.spdy.generator.Generator;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StandardSessionTest
+{
+ @Mock
+ private ISession sessionMock;
+ private ByteBufferPool bufferPool;
+ private Executor threadPool;
+ private StandardSession session;
+ private Generator generator;
+ private ScheduledExecutorService scheduler;
+ private Headers headers;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ bufferPool = new StandardByteBufferPool();
+ threadPool = Executors.newCachedThreadPool();
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory.StandardCompressor());
+ session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,new TestController(),null,1,null,generator);
+ headers = new Headers();
+ }
+
+ @Test
+ public void testStreamIsRemovedFromSessionWhenReset() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ assertThatStreamIsInSession(stream);
+ assertThat("stream is not reset",stream.isReset(),is(false));
+ session.rst(new RstInfo(stream.getId(),StreamStatus.STREAM_ALREADY_CLOSED));
+ assertThatStreamIsNotInSession(stream);
+ assertThatStreamIsReset(stream);
+ }
+
+ @Test
+ public void testStreamIsAddedAndRemovedFromSession() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ assertThatStreamIsInSession(stream);
+ stream.updateCloseState(true,true);
+ session.onControlFrame(new SynReplyFrame(SPDY.V2,SynInfo.FLAG_CLOSE,stream.getId(),null));
+ assertThatStreamIsClosed(stream);
+ assertThatStreamIsNotInSession(stream);
+ }
+
+ @Test
+ public void testStreamIsRemovedWhenHeadersWithCloseFlagAreSent() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ assertThatStreamIsInSession(stream);
+ stream.updateCloseState(true,false);
+ stream.headers(new HeadersInfo(headers,true));
+ assertThatStreamIsClosed(stream);
+ assertThatStreamIsNotInSession(stream);
+ }
+
+ @Test
+ public void testStreamIsUnidirectional() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ assertThat("stream is not unidirectional",stream.isUnidirectional(),not(true));
+ Stream pushStream = createPushStream(stream);
+ assertThat("pushStream is unidirectional",pushStream.isUnidirectional(),is(true));
+ }
+
+ @Test
+ public void testPushStreamCreation() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ Stream stream = createStream();
+ IStream pushStream = createPushStream(stream);
+ assertThat("Push stream must be associated to the first stream created",pushStream.getAssociatedStream().getId(),is(stream.getId()));
+ assertThat("streamIds need to be monotonic",pushStream.getId(),greaterThan(stream.getId()));
+ }
+
+ @Test
+ public void testPushStreamIsNotClosedWhenAssociatedStreamIsClosed() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ Stream pushStream = createPushStream(stream);
+ assertThatStreamIsNotHalfClosed(stream);
+ assertThatStreamIsNotClosed(stream);
+ assertThatPushStreamIsHalfClosed(pushStream);
+ assertThatPushStreamIsNotClosed(pushStream);
+
+ stream.updateCloseState(true,true);
+ assertThatStreamIsHalfClosed(stream);
+ assertThatStreamIsNotClosed(stream);
+ assertThatPushStreamIsHalfClosed(pushStream);
+ assertThatPushStreamIsNotClosed(pushStream);
+
+ session.onControlFrame(new SynReplyFrame(SPDY.V2,SynInfo.FLAG_CLOSE,stream.getId(),null));
+ assertThatStreamIsClosed(stream);
+ assertThatPushStreamIsNotClosed(pushStream);
+ }
+
+ @Test
+ public void testCreatePushStreamOnClosedStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ stream.updateCloseState(true,true);
+ assertThatStreamIsHalfClosed(stream);
+ stream.updateCloseState(true,false);
+ assertThatStreamIsClosed(stream);
+ createPushStreamAndMakeSureItFails(stream);
+ }
+
+ private void createPushStreamAndMakeSureItFails(IStream stream) throws InterruptedException
+ {
+ final CountDownLatch failedLatch = new CountDownLatch(1);
+ SynInfo synInfo = new SynInfo(headers,false,stream.getPriority());
+ stream.syn(synInfo,5,TimeUnit.SECONDS,new Handler()
+ {
+ @Override
+ public void completed(Stream context)
+ {
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ failedLatch.countDown();
+ }
+ });
+ assertThat("pushStream creation failed",failedLatch.await(5,TimeUnit.SECONDS),is(true));
+ }
+
+ @Test
+ public void testPushStreamIsAddedAndRemovedFromParentAndSessionWhenClosed() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ IStream pushStream = createPushStream(stream);
+ assertThatPushStreamIsHalfClosed(pushStream);
+ assertThatPushStreamIsInSession(pushStream);
+ assertThatStreamIsAssociatedWithPushStream(stream,pushStream);
+ session.data(pushStream,new StringDataInfo("close",true),5,TimeUnit.SECONDS,null,null);
+ assertThatPushStreamIsClosed(pushStream);
+ assertThatPushStreamIsNotInSession(pushStream);
+ assertThatStreamIsNotAssociatedWithPushStream(stream,pushStream);
+ }
+
+ @Test
+ public void testPushStreamIsRemovedWhenReset() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ IStream pushStream = (IStream)stream.syn(new SynInfo(false)).get();
+ assertThatPushStreamIsInSession(pushStream);
+ session.rst(new RstInfo(pushStream.getId(),StreamStatus.INVALID_STREAM));
+ assertThatPushStreamIsNotInSession(pushStream);
+ assertThatStreamIsNotAssociatedWithPushStream(stream,pushStream);
+ assertThatStreamIsReset(pushStream);
+ }
+
+ @Test
+ public void testPushStreamWithSynInfoClosedTrue() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ SynInfo synInfo = new SynInfo(headers,true,stream.getPriority());
+ IStream pushStream = (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS);
+ assertThatPushStreamIsHalfClosed(pushStream);
+ assertThatPushStreamIsClosed(pushStream);
+ assertThatStreamIsNotAssociatedWithPushStream(stream,pushStream);
+ assertThatStreamIsNotInSession(pushStream);
+ }
+
+ @Test
+ public void testPushStreamSendHeadersWithCloseFlagIsRemovedFromSessionAndDisassociateFromParent() throws InterruptedException, ExecutionException,
+ TimeoutException
+ {
+ IStream stream = createStream();
+ SynInfo synInfo = new SynInfo(headers,false,stream.getPriority());
+ IStream pushStream = (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS);
+ assertThatStreamIsAssociatedWithPushStream(stream,pushStream);
+ assertThatPushStreamIsInSession(pushStream);
+ pushStream.headers(new HeadersInfo(headers,true));
+ assertThatPushStreamIsNotInSession(pushStream);
+ assertThatPushStreamIsHalfClosed(pushStream);
+ assertThatPushStreamIsClosed(pushStream);
+ assertThatStreamIsNotAssociatedWithPushStream(stream,pushStream);
+ }
+
+ @Test
+ public void testCreatedAndClosedListenersAreCalledForNewStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ final CountDownLatch createdListenerCalledLatch = new CountDownLatch(1);
+ final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
+ session.addListener(new TestStreamListener(createdListenerCalledLatch,closedListenerCalledLatch));
+ IStream stream = createStream();
+ session.onDataFrame(new DataFrame(stream.getId(),SynInfo.FLAG_CLOSE,128),ByteBuffer.allocate(128));
+ stream.data(new StringDataInfo("close",true));
+ assertThat("onStreamCreated listener has been called",createdListenerCalledLatch.await(5,TimeUnit.SECONDS),is(true));
+ assertThatOnStreamClosedListenerHasBeenCalled(closedListenerCalledLatch);
+ }
+
+ @Test
+ public void testListenerIsCalledForResetStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
+ session.addListener(new TestStreamListener(null,closedListenerCalledLatch));
+ IStream stream = createStream();
+ session.rst(new RstInfo(stream.getId(),StreamStatus.CANCEL_STREAM));
+ assertThatOnStreamClosedListenerHasBeenCalled(closedListenerCalledLatch);
+ }
+
+ @Test
+ public void testCreatedAndClosedListenersAreCalledForNewPushStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ final CountDownLatch createdListenerCalledLatch = new CountDownLatch(2);
+ final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
+ session.addListener(new TestStreamListener(createdListenerCalledLatch,closedListenerCalledLatch));
+ IStream stream = createStream();
+ IStream pushStream = createPushStream(stream);
+ session.data(pushStream,new StringDataInfo("close",true),5,TimeUnit.SECONDS,null,null);
+ assertThat("onStreamCreated listener has been called twice. Once for the stream and once for the pushStream",
+ createdListenerCalledLatch.await(5,TimeUnit.SECONDS),is(true));
+ assertThatOnStreamClosedListenerHasBeenCalled(closedListenerCalledLatch);
+ }
+
+ @Test
+ public void testListenerIsCalledForResetPushStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
+ session.addListener(new TestStreamListener(null,closedListenerCalledLatch));
+ IStream stream = createStream();
+ IStream pushStream = createPushStream(stream);
+ session.rst(new RstInfo(pushStream.getId(),StreamStatus.CANCEL_STREAM));
+ assertThatOnStreamClosedListenerHasBeenCalled(closedListenerCalledLatch);
+ }
+
+ private class TestStreamListener extends Session.StreamListener.Adapter
+ {
+ private CountDownLatch createdListenerCalledLatch;
+ private CountDownLatch closedListenerCalledLatch;
+
+ public TestStreamListener(CountDownLatch createdListenerCalledLatch, CountDownLatch closedListenerCalledLatch)
+ {
+ this.createdListenerCalledLatch = createdListenerCalledLatch;
+ this.closedListenerCalledLatch = closedListenerCalledLatch;
+ }
+
+ @Override
+ public void onStreamCreated(Stream stream)
+ {
+ if (createdListenerCalledLatch != null)
+ createdListenerCalledLatch.countDown();
+ super.onStreamCreated(stream);
+ }
+
+ @Override
+ public void onStreamClosed(Stream stream)
+ {
+ if (closedListenerCalledLatch != null)
+ closedListenerCalledLatch.countDown();
+ super.onStreamClosed(stream);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = IllegalStateException.class)
+ public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null);
+ IStream stream = new StandardStream(synStreamFrame,sessionMock,8184,null);
+ stream.updateCloseState(synStreamFrame.isClose(),true);
+ assertThat("stream is half closed",stream.isHalfClosed(),is(true));
+ stream.data(new StringDataInfo("data on half closed stream",true));
+ verify(sessionMock,never()).data(any(IStream.class),any(DataInfo.class),anyInt(),any(TimeUnit.class),any(Handler.class),any(void.class));
+ }
+
+ @Test
+ @Ignore("In V3 we need to rst the stream if we receive data on a remotely half closed stream.")
+ public void receiveDataOnRemotelyHalfClosedStreamResetsStreamInV3() throws InterruptedException, ExecutionException
+ {
+ IStream stream = (IStream)session.syn(new SynInfo(false),new StreamFrameListener.Adapter()).get();
+ stream.updateCloseState(true,false);
+ assertThat("stream is half closed from remote side",stream.isHalfClosed(),is(true));
+ stream.process(new DataFrame(stream.getId(),(byte)0,256),ByteBuffer.allocate(256));
+ }
+
+ @Test
+ public void testReceiveDataOnRemotelyClosedStreamIsIgnored() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ final CountDownLatch onDataCalledLatch = new CountDownLatch(1);
+ Stream stream = session.syn(new SynInfo(false),new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ onDataCalledLatch.countDown();
+ super.onData(stream,dataInfo);
+ }
+ }).get(5,TimeUnit.SECONDS);
+ session.onControlFrame(new SynReplyFrame(SPDY.V2,SynInfo.FLAG_CLOSE,stream.getId(),headers));
+ session.onDataFrame(new DataFrame(stream.getId(),(byte)0,0),ByteBuffer.allocate(128));
+ assertThat("onData is never called",onDataCalledLatch.await(1,TimeUnit.SECONDS),not(true));
+ }
+
+ private IStream createStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ SynInfo synInfo = new SynInfo(headers,false,(byte)0);
+ return (IStream)session.syn(synInfo,new StreamFrameListener.Adapter()).get(5,TimeUnit.SECONDS);
+ }
+
+ private IStream createPushStream(Stream stream) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ SynInfo synInfo = new SynInfo(headers,false,stream.getPriority());
+ return (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS);
+ }
+
+ private static class TestController implements Controller
+ {
+ @Override
+ public int write(ByteBuffer buffer, Handler handler, StandardSession.FrameBytes context)
+ {
+ handler.completed(context);
+ return buffer.remaining();
+ }
+
+ @Override
+ public void close(boolean onlyOutput)
+ {
+ }
+ }
+
+ private void assertThatStreamIsClosed(IStream stream)
+ {
+ assertThat("stream is closed",stream.isClosed(),is(true));
+ }
+
+ private void assertThatStreamIsReset(IStream stream)
+ {
+ assertThat("stream is reset",stream.isReset(),is(true));
+ }
+
+ private void assertThatStreamIsNotInSession(IStream stream)
+ {
+ assertThat("stream is not in session",session.getStreams().contains(stream),not(true));
+ }
+
+ private void assertThatStreamIsInSession(IStream stream)
+ {
+ assertThat("stream is in session",session.getStreams().contains(stream),is(true));
+ }
+
+ private void assertThatStreamIsNotClosed(IStream stream)
+ {
+ assertThat("stream is not closed",stream.isClosed(),not(true));
+ }
+
+ private void assertThatStreamIsNotHalfClosed(IStream stream)
+ {
+ assertThat("stream is not halfClosed",stream.isHalfClosed(),not(true));
+ }
+
+ private void assertThatPushStreamIsNotClosed(Stream pushStream)
+ {
+ assertThat("pushStream is not closed",pushStream.isClosed(),not(true));
+ }
+
+ private void assertThatStreamIsHalfClosed(IStream stream)
+ {
+ assertThat("stream is halfClosed",stream.isHalfClosed(),is(true));
+ }
+
+ private void assertThatStreamIsNotAssociatedWithPushStream(IStream stream, IStream pushStream)
+ {
+ assertThat("pushStream is removed from parent",stream.getPushedStreams().contains(pushStream),not(true));
+ }
+
+ private void assertThatPushStreamIsNotInSession(Stream pushStream)
+ {
+ assertThat("pushStream is not in session",session.getStreams().contains(pushStream.getId()),not(true));
+ }
+
+ private void assertThatPushStreamIsInSession(Stream pushStream)
+ {
+ assertThat("pushStream is in session",session.getStreams().contains(pushStream),is(true));
+ }
+
+ private void assertThatStreamIsAssociatedWithPushStream(IStream stream, Stream pushStream)
+ {
+ assertThat("stream is associated with pushStream",stream.getPushedStreams().contains(pushStream),is(true));
+ }
+
+ private void assertThatPushStreamIsClosed(Stream pushStream)
+ {
+ assertThat("pushStream is closed",pushStream.isClosed(),is(true));
+ }
+
+ private void assertThatPushStreamIsHalfClosed(Stream pushStream)
+ {
+ assertThat("pushStream is half closed ",pushStream.isHalfClosed(),is(true));
+ }
+
+ private void assertThatOnStreamClosedListenerHasBeenCalled(final CountDownLatch closedListenerCalledLatch) throws InterruptedException
+ {
+ assertThat("onStreamClosed listener has been called",closedListenerCalledLatch.await(5,TimeUnit.SECONDS),is(true));
+ }
+}
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java
new file mode 100644
index 00000000000..68a6a9576c6
--- /dev/null
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java
@@ -0,0 +1,112 @@
+// ========================================================================
+// Copyright (c) 2009-2009 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+
+
+package org.eclipse.jetty.spdy;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.spdy.api.Handler;
+import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.SynInfo;
+import org.eclipse.jetty.spdy.frames.SynStreamFrame;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+
+/* ------------------------------------------------------------ */
+/**
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class StandardStreamTest
+{
+ @Mock private ISession session;
+ @Mock private SynStreamFrame synStreamFrame;
+
+ /**
+ * Test method for {@link org.eclipse.jetty.spdy.StandardStream#syn(org.eclipse.jetty.spdy.api.SynInfo)}.
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSyn()
+ {
+ Stream stream = new StandardStream(synStreamFrame,session,0,null);
+ Set streams = new HashSet<>();
+ streams.add(stream);
+ when(synStreamFrame.isClose()).thenReturn(false);
+ SynInfo synInfo = new SynInfo(false);
+ when(session.getStreams()).thenReturn(streams);
+ stream.syn(synInfo);
+ verify(session).syn(argThat(new PushSynInfoMatcher(stream.getId(),synInfo)),any(StreamFrameListener.class),anyLong(),any(TimeUnit.class),any(Handler.class));
+ }
+
+ private class PushSynInfoMatcher extends ArgumentMatcher{
+ int associatedStreamId;
+ SynInfo synInfo;
+
+ public PushSynInfoMatcher(int associatedStreamId, SynInfo synInfo)
+ {
+ this.associatedStreamId = associatedStreamId;
+ this.synInfo = synInfo;
+ }
+ @Override
+ public boolean matches(Object argument)
+ {
+ PushSynInfo pushSynInfo = (PushSynInfo)argument;
+ if(pushSynInfo.getAssociatedStreamId() != associatedStreamId){
+ System.out.println("streamIds do not match!");
+ return false;
+ }
+ if(pushSynInfo.isClose() != synInfo.isClose()){
+ System.out.println("isClose doesn't match");
+ return false;
+ }
+ return true;
+ }
+ }
+
+ @Test
+ public void testSynOnClosedStream(){
+ IStream stream = new StandardStream(synStreamFrame,session,0,null);
+ stream.updateCloseState(true,true);
+ stream.updateCloseState(true,false);
+ assertThat("stream expected to be closed",stream.isClosed(),is(true));
+ final CountDownLatch failedLatch = new CountDownLatch(1);
+ stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter()
+ {
+ @Override
+ public void failed(Throwable x)
+ {
+ failedLatch.countDown();
+ }
+ });
+ assertThat("PushStream creation failed", failedLatch.getCount(), equalTo(0L));
+ }
+
+}
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java
index f7b3c57d01d..a8bff06d67b 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java
@@ -99,7 +99,7 @@ public class ServerUsageTest
Session session = stream.getSession();
// Since it's unidirectional, no need to pass the listener
- session.syn(new SynInfo(new Headers(), false, true, stream.getId(), (byte)0), null, 0, TimeUnit.MILLISECONDS, new Handler.Adapter()
+ session.syn(new SynInfo(new Headers(), false, (byte)0), null, 0, TimeUnit.MILLISECONDS, new Handler.Adapter()
{
@Override
public void completed(Stream pushStream)
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java
index ea3e6d74043..378a3faf917 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java
@@ -16,6 +16,12 @@
package org.eclipse.jetty.spdy.frames;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
@@ -38,7 +44,7 @@ public class RstStreamGenerateParseTest
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
- Assert.assertNotNull(buffer);
+ assertThat("buffer is not null", buffer, not(nullValue()));
TestSPDYParserListener listener = new TestSPDYParserListener();
Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
@@ -46,13 +52,13 @@ public class RstStreamGenerateParseTest
parser.parse(buffer);
ControlFrame frame2 = listener.getControlFrame();
- Assert.assertNotNull(frame2);
- Assert.assertEquals(ControlFrameType.RST_STREAM, frame2.getType());
+ assertThat("frame2 is not null", frame2, not(nullValue()));
+ assertThat("frame2 is type RST_STREAM",ControlFrameType.RST_STREAM, equalTo(frame2.getType()));
RstStreamFrame rstStream = (RstStreamFrame)frame2;
- Assert.assertEquals(SPDY.V2, rstStream.getVersion());
- Assert.assertEquals(streamId, rstStream.getStreamId());
- Assert.assertEquals(0, rstStream.getFlags());
- Assert.assertEquals(streamStatus, rstStream.getStatusCode());
+ assertThat("rstStream version is SPDY.V2",SPDY.V2, equalTo(rstStream.getVersion()));
+ assertThat("rstStream id is equal to streamId",streamId, equalTo(rstStream.getStreamId()));
+ assertThat("rstStream flags are 0",(byte)0, equalTo(rstStream.getFlags()));
+ assertThat("stream status is equal to rstStream statuscode",streamStatus, is(rstStream.getStatusCode()));
}
@Test
diff --git a/jetty-spdy/spdy-jetty/pom.xml b/jetty-spdy/spdy-jetty/pom.xml
index 08cbaf6bce0..7476bf972ef 100644
--- a/jetty-spdy/spdy-jetty/pom.xml
+++ b/jetty-spdy/spdy-jetty/pom.xml
@@ -64,7 +64,13 @@
junit
junit
+ test
+
+ org.hamcrest
+ hamcrest-all
+ test
+
org.slf4j
slf4j-log4j12
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java
new file mode 100644
index 00000000000..7d5d6c79f97
--- /dev/null
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright (c) 2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.eclipse.jetty.spdy;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.spdy.api.BytesDataInfo;
+import org.eclipse.jetty.spdy.api.DataInfo;
+import org.eclipse.jetty.spdy.api.Headers;
+import org.eclipse.jetty.spdy.api.ReplyInfo;
+import org.eclipse.jetty.spdy.api.SPDY;
+import org.eclipse.jetty.spdy.api.Session;
+import org.eclipse.jetty.spdy.api.SessionStatus;
+import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.StringDataInfo;
+import org.eclipse.jetty.spdy.api.SynInfo;
+import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
+import org.eclipse.jetty.spdy.frames.ControlFrame;
+import org.eclipse.jetty.spdy.frames.DataFrame;
+import org.eclipse.jetty.spdy.frames.GoAwayFrame;
+import org.eclipse.jetty.spdy.frames.RstStreamFrame;
+import org.eclipse.jetty.spdy.frames.SynReplyFrame;
+import org.eclipse.jetty.spdy.frames.SynStreamFrame;
+import org.eclipse.jetty.spdy.generator.Generator;
+import org.eclipse.jetty.spdy.parser.Parser;
+import org.eclipse.jetty.spdy.parser.Parser.Listener;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class ClosedStreamTest extends AbstractTest
+{
+ //TODO: Right now it sends a rst as the stream is unknown to the session once it's closed. But according to the spec we probably should just ignore the data?!
+ @Test
+ public void testDataSentOnClosedStreamIsIgnored() throws Exception
+ {
+ ServerSocketChannel server = ServerSocketChannel.open();
+ server.bind(new InetSocketAddress("localhost", 0));
+
+ Session session = startClient(new InetSocketAddress("localhost", server.socket().getLocalPort()), null);
+ final CountDownLatch dataLatch = new CountDownLatch(2);
+ session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataLatch.countDown();
+ }
+ });
+
+ SocketChannel channel = server.accept();
+ ByteBuffer readBuffer = ByteBuffer.allocate(1024);
+ channel.read(readBuffer);
+ readBuffer.flip();
+ int streamId = readBuffer.getInt(8);
+
+ Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory.StandardCompressor());
+
+ ByteBuffer writeBuffer = generator.control(new SynReplyFrame(SPDY.V2, (byte)0, streamId, new Headers()));
+ channel.write(writeBuffer);
+
+ byte[] bytes = new byte[1];
+ writeBuffer = generator.data(streamId, bytes.length, new BytesDataInfo(bytes, true));
+ channel.write(writeBuffer);
+
+ // Write again to simulate the faulty condition
+ writeBuffer.flip();
+ channel.write(writeBuffer);
+
+ Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
+
+ writeBuffer = generator.control(new GoAwayFrame(SPDY.V2, 0, SessionStatus.OK.getCode()));
+ channel.write(writeBuffer);
+ channel.shutdownOutput();
+ channel.close();
+
+ server.close();
+ }
+
+ @Test
+ public void testSendDataOnHalfClosedStreamCausesExceptionOnServer() throws Exception
+ {
+ final CountDownLatch replyReceivedLatch = new CountDownLatch(1);
+ final CountDownLatch clientReceivedDataLatch = new CountDownLatch(1);
+ final CountDownLatch exceptionWhenSendingData = new CountDownLatch(1);
+
+ Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(true));
+ try
+ {
+ replyReceivedLatch.await(5,TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ try
+ {
+ stream.data(new StringDataInfo("data send after half closed",false));
+ }
+ catch (RuntimeException e)
+ {
+ // we expect an exception here, but we don't want it to be logged
+ exceptionWhenSendingData.countDown();
+ }
+
+ return null;
+ }
+ }),null);
+
+ Stream stream = clientSession.syn(new SynInfo(false),new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onReply(Stream stream, ReplyInfo replyInfo)
+ {
+ replyReceivedLatch.countDown();
+ super.onReply(stream,replyInfo);
+ }
+
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ clientReceivedDataLatch.countDown();
+ super.onData(stream,dataInfo);
+ }
+ }).get();
+ assertThat("reply has been received by client",replyReceivedLatch.await(5,TimeUnit.SECONDS),is(true));
+ assertThat("stream is half closed from server",stream.isHalfClosed(),is(true));
+ assertThat("client has not received any data sent after stream was half closed by server",clientReceivedDataLatch.await(1,TimeUnit.SECONDS),
+ is(false));
+ assertThat("sending data threw an exception",exceptionWhenSendingData.await(5,TimeUnit.SECONDS),is(true));
+ }
+
+ @Test
+ public void testV2ReceiveDataOnHalfClosedStream() throws Exception
+ {
+ final CountDownLatch clientResetReceivedLatch = runReceiveDataOnHalfClosedStream(SPDY.V2);
+ assertThat("server didn't receive data",clientResetReceivedLatch.await(1,TimeUnit.SECONDS),not(true));
+ }
+
+ @Test
+ @Ignore("until v3 is properly implemented")
+ public void testV3ReceiveDataOnHalfClosedStream() throws Exception
+ {
+ final CountDownLatch clientResetReceivedLatch = runReceiveDataOnHalfClosedStream(SPDY.V3);
+ assertThat("server didn't receive data",clientResetReceivedLatch.await(1,TimeUnit.SECONDS),not(true));
+ }
+
+ private CountDownLatch runReceiveDataOnHalfClosedStream(short version) throws Exception, IOException, InterruptedException
+ {
+ final CountDownLatch clientResetReceivedLatch = new CountDownLatch(1);
+ final CountDownLatch serverReplySentLatch = new CountDownLatch(1);
+ final CountDownLatch clientReplyReceivedLatch = new CountDownLatch(1);
+ final CountDownLatch serverDataReceivedLatch = new CountDownLatch(1);
+
+ InetSocketAddress startServer = startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(false));
+ serverReplySentLatch.countDown();
+ try
+ {
+ clientReplyReceivedLatch.await(5,TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ return new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ serverDataReceivedLatch.countDown();
+ super.onData(stream,dataInfo);
+ }
+ };
+ }
+ });
+
+ final SocketChannel socketChannel = SocketChannel.open(startServer);
+ final Generator generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory().newCompressor());
+ ByteBuffer synData = generator.control(new SynStreamFrame(version,SynInfo.FLAG_CLOSE,1,0,(byte)0,new Headers()));
+
+ socketChannel.write(synData);
+
+ assertThat("server: syn reply is sent",serverReplySentLatch.await(5,TimeUnit.SECONDS),is(true));
+
+ Parser parser = new Parser(new StandardCompressionFactory.StandardDecompressor());
+ parser.addListener(new Listener.Adapter()
+ {
+ @Override
+ public void onControlFrame(ControlFrame frame)
+ {
+ if (frame instanceof SynReplyFrame)
+ {
+ SynReplyFrame synReplyFrame = (SynReplyFrame)frame;
+ clientReplyReceivedLatch.countDown();
+ int streamId = synReplyFrame.getStreamId();
+ ByteBuffer data = generator.data(streamId,0,new StringDataInfo("data",false));
+ try
+ {
+ socketChannel.write(data);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ else if (frame instanceof RstStreamFrame)
+ {
+ clientResetReceivedLatch.countDown();
+ }
+ super.onControlFrame(frame);
+ }
+
+ @Override
+ public void onDataFrame(DataFrame frame, ByteBuffer data)
+ {
+ super.onDataFrame(frame,data);
+ }
+ });
+ ByteBuffer response = ByteBuffer.allocate(28);
+ socketChannel.read(response);
+ response.flip();
+ parser.parse(response);
+
+ assertThat("server didn't receive data",serverDataReceivedLatch.await(1,TimeUnit.SECONDS),not(true));
+ return clientResetReceivedLatch;
+ }
+
+}
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java
index f1908e73bcc..db2303cea30 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java
@@ -451,7 +451,7 @@ public class FlowControlTest extends AbstractTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
- private void expectException(Class extends Exception> exception, Callable command)
+ private void expectException(Class extends Exception> exception, Callable command)
{
try
{
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java
index 92e642f2b13..eb75be7e078 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java
@@ -116,19 +116,20 @@ public class ProtocolViolationsTest extends AbstractTest
stream.headers(new HeadersInfo(new Headers(), true));
}
- @Test
- public void testDataSentAfterCloseIsDiscardedByRecipient() throws Exception
+ @Test //TODO: throws an ISException in StandardStream.updateCloseState(). But instead we should send a rst or something to the server probably?!
+ public void testServerClosesStreamTwice() throws Exception
{
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress("localhost", 0));
Session session = startClient(new InetSocketAddress("localhost", server.socket().getLocalPort()), null);
final CountDownLatch dataLatch = new CountDownLatch(2);
- session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
+ session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
+ System.out.println("ondata");
dataLatch.countDown();
}
});
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java
new file mode 100644
index 00000000000..2dac9cd3545
--- /dev/null
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java
@@ -0,0 +1,355 @@
+/*
+ * Copyright (c) 2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.eclipse.jetty.spdy;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.spdy.api.BytesDataInfo;
+import org.eclipse.jetty.spdy.api.DataInfo;
+import org.eclipse.jetty.spdy.api.Handler;
+import org.eclipse.jetty.spdy.api.ReplyInfo;
+import org.eclipse.jetty.spdy.api.Session;
+import org.eclipse.jetty.spdy.api.SessionFrameListener;
+import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.StringDataInfo;
+import org.eclipse.jetty.spdy.api.SynInfo;
+import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
+import org.junit.Test;
+
+public class PushStreamTest extends AbstractTest
+{
+
+ @Test
+ public void testSynPushStream() throws Exception
+ {
+ final CountDownLatch pushStreamSynLatch = new CountDownLatch(1);
+
+ Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(false));
+ stream.syn(new SynInfo(false));
+ return null;
+ }
+ }),new SessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ pushStreamSynLatch.countDown();
+ stream.reply(new ReplyInfo(false));
+ return super.onSyn(stream,synInfo);
+ }
+ });
+
+ clientSession.syn(new SynInfo(false),null).get();
+ assertThat("onSyn has been called",pushStreamSynLatch.await(5,TimeUnit.SECONDS),is(true));
+ }
+
+ @Test
+ public void testSendDataOnPushStreamAfterAssociatedStreamIsClosed() throws Exception
+ {
+ final Exchanger streamExchanger = new Exchanger<>();
+ final CountDownLatch pushStreamSynLatch = new CountDownLatch(1);
+ final CyclicBarrier replyBarrier = new CyclicBarrier(3);
+ final CyclicBarrier closeBarrier = new CyclicBarrier(3);
+ final CountDownLatch streamDataSent = new CountDownLatch(2);
+ final CountDownLatch pushStreamDataReceived = new CountDownLatch(2);
+ final CountDownLatch exceptionCountDownLatch = new CountDownLatch(1);
+
+ Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(false));
+ try
+ {
+ replyBarrier.await(5,TimeUnit.SECONDS);
+ return new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ try
+ {
+ if (dataInfo.isClose())
+ {
+ stream.data(new StringDataInfo("close stream",true));
+ closeBarrier.await(5,TimeUnit.SECONDS);
+ }
+ streamDataSent.countDown();
+ if (pushStreamDataReceived.getCount() == 2)
+ {
+ Stream pushStream = stream.syn(new SynInfo(false)).get();
+ streamExchanger.exchange(pushStream,5,TimeUnit.SECONDS);
+ }
+ }
+ catch (Exception e)
+ {
+ exceptionCountDownLatch.countDown();
+ }
+ }
+ };
+ }
+ catch (Exception e)
+ {
+ exceptionCountDownLatch.countDown();
+ throw new IllegalStateException(e);
+ }
+ }
+
+ }),new SessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ pushStreamSynLatch.countDown();
+ stream.reply(new ReplyInfo(false));
+ return new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ pushStreamDataReceived.countDown();
+ super.onData(stream,dataInfo);
+ }
+ };
+ }
+ });
+
+ Stream stream = clientSession.syn(new SynInfo(false),new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onReply(Stream stream, ReplyInfo replyInfo)
+ {
+ try
+ {
+ replyBarrier.await(5,TimeUnit.SECONDS);
+ }
+ catch (Exception e)
+ {
+ exceptionCountDownLatch.countDown();
+ }
+ }
+
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ try
+ {
+ closeBarrier.await(5,TimeUnit.SECONDS);
+ }
+ catch (Exception e)
+ {
+ exceptionCountDownLatch.countDown();
+ }
+ }
+ }).get();
+
+ replyBarrier.await(5,TimeUnit.SECONDS);
+ stream.data(new StringDataInfo("client data",false));
+ Stream pushStream = streamExchanger.exchange(null,5,TimeUnit.SECONDS);
+ pushStream.data(new StringDataInfo("first push data frame",false));
+ // nasty, but less complex than using another cyclicBarrier for example
+ while (pushStreamDataReceived.getCount() != 1)
+ Thread.sleep(1);
+ stream.data(new StringDataInfo("client close",true));
+ closeBarrier.await(5,TimeUnit.SECONDS);
+ assertThat("stream is closed",stream.isClosed(),is(true));
+ pushStream.data(new StringDataInfo("second push data frame while associated stream has been closed already",false));
+ assertThat("2 pushStream data frames have been received.",pushStreamDataReceived.await(5,TimeUnit.SECONDS),is(true));
+ assertThat("2 data frames have been sent",streamDataSent.await(5,TimeUnit.SECONDS),is(true));
+ assertThatNoExceptionOccured(exceptionCountDownLatch);
+ }
+
+ @Test
+ public void testSynPushStreamOnClosedStream() throws Exception
+ {
+ final CountDownLatch pushStreamFailedLatch = new CountDownLatch(1);
+
+ Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(true));
+ stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter()
+ {
+ @Override
+ public void failed(Throwable x)
+ {
+ pushStreamFailedLatch.countDown();
+ }
+ });
+ return super.onSyn(stream,synInfo);
+ }
+ }),new SessionFrameListener.Adapter());
+
+ clientSession.syn(new SynInfo(true),null);
+ assertThat("pushStream syn has failed",pushStreamFailedLatch.await(5,TimeUnit.SECONDS),is(true));
+ }
+
+ @Test
+ public void testSendBigDataOnPushStreamWhenAssociatedStreamIsClosed() throws Exception
+ {
+ final CountDownLatch streamClosedLatch = new CountDownLatch(1);
+ final CountDownLatch allDataReceived = new CountDownLatch(1);
+ final CountDownLatch exceptionCountDownLatch = new CountDownLatch(1);
+ final Exchanger exchanger = new Exchanger<>();
+ final int dataSizeInBytes = 1024 * 1024 * 1;
+ final byte[] transferBytes = createHugeByteArray(dataSizeInBytes);
+
+ Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ try
+ {
+ Stream pushStream = stream.syn(new SynInfo(false)).get();
+ stream.reply(new ReplyInfo(true));
+ // wait until stream is closed
+ streamClosedLatch.await(5,TimeUnit.SECONDS);
+ pushStream.data(new BytesDataInfo(transferBytes,true));
+ return null;
+ }
+ catch (Exception e)
+ {
+ exceptionCountDownLatch.countDown();
+ throw new IllegalStateException(e);
+ }
+ }
+ }),new SessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ return new StreamFrameListener.Adapter()
+ {
+ ByteBuffer receivedBytes = ByteBuffer.allocate(dataSizeInBytes);
+
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataInfo.consumeInto(receivedBytes);
+ if (dataInfo.isClose())
+ {
+ allDataReceived.countDown();
+ try
+ {
+ receivedBytes.flip();
+ exchanger.exchange(receivedBytes.slice(),5,TimeUnit.SECONDS);
+ }
+ catch (Exception e)
+ {
+ exceptionCountDownLatch.countDown();
+ }
+ }
+ }
+ };
+ }
+ });
+
+ Stream stream = clientSession.syn(new SynInfo(true),new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onReply(Stream stream, ReplyInfo replyInfo)
+ {
+ streamClosedLatch.countDown();
+ super.onReply(stream,replyInfo);
+ }
+ }).get();
+
+ ByteBuffer receivedBytes = exchanger.exchange(null,5,TimeUnit.SECONDS);
+
+ assertThat("received byte array is the same as transferred byte array",Arrays.equals(transferBytes,receivedBytes.array()),is(true));
+ assertThat("onReply has been called to close the stream",streamClosedLatch.await(5,TimeUnit.SECONDS),is(true));
+ assertThat("stream is closed",stream.isClosed(),is(true));
+ assertThat("all data has been received",allDataReceived.await(20,TimeUnit.SECONDS),is(true));
+ assertThatNoExceptionOccured(exceptionCountDownLatch);
+ }
+
+ private byte[] createHugeByteArray(int sizeInBytes)
+ {
+ byte[] bytes = new byte[sizeInBytes];
+ new Random().nextBytes(bytes);
+ return bytes;
+ }
+
+ @Test
+ public void testOddEvenStreamIds() throws Exception
+ {
+ final CountDownLatch pushStreamIdIsEvenLatch = new CountDownLatch(3);
+
+ Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.syn(new SynInfo(false));
+ return null;
+ }
+ }),new SessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(false));
+ assertStreamIdIsEven(stream);
+ pushStreamIdIsEvenLatch.countDown();
+ return super.onSyn(stream,synInfo);
+ }
+ });
+
+ Stream stream = clientSession.syn(new SynInfo(false),null).get();
+ Stream stream2 = clientSession.syn(new SynInfo(false),null).get();
+ Stream stream3 = clientSession.syn(new SynInfo(false),null).get();
+ assertStreamIdIsOdd(stream);
+ assertStreamIdIsOdd(stream2);
+ assertStreamIdIsOdd(stream3);
+
+ assertThat("all pushStreams had even ids",pushStreamIdIsEvenLatch.await(5,TimeUnit.SECONDS),is(true));
+ }
+
+ private void assertStreamIdIsEven(Stream stream)
+ {
+ assertThat("streamId is odd",stream.getId() % 2,is(0));
+ }
+
+ private void assertStreamIdIsOdd(Stream stream)
+ {
+ assertThat("streamId is odd",stream.getId() % 2,is(1));
+ }
+
+ private void assertThatNoExceptionOccured(final CountDownLatch exceptionCountDownLatch) throws InterruptedException
+ {
+ assertThat("No exception occured", exceptionCountDownLatch.await(1,TimeUnit.SECONDS),is(false));
+ }
+}
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java
index c6c0a2b4449..654bb5fddda 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java
@@ -1,5 +1,11 @@
package org.eclipse.jetty.spdy;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -15,7 +21,6 @@ import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
-import org.junit.Assert;
import org.junit.Test;
public class ResetStreamTest extends AbstractTest
@@ -23,12 +28,12 @@ public class ResetStreamTest extends AbstractTest
@Test
public void testResetStreamIsRemoved() throws Exception
{
- Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()), null);
+ Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()),null);
- Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
- session.rst(new RstInfo(stream.getId(), StreamStatus.CANCEL_STREAM)).get(5, TimeUnit.SECONDS);
+ Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
+ session.rst(new RstInfo(stream.getId(),StreamStatus.CANCEL_STREAM)).get(5,TimeUnit.SECONDS);
- Assert.assertEquals(0, session.getStreams().size());
+ assertEquals("session expected to contain 0 streams",0,session.getStreams().size());
}
@Test
@@ -44,11 +49,11 @@ public class ResetStreamTest extends AbstractTest
{
Session serverSession = stream.getSession();
serverSessionRef.set(serverSession);
- serverSession.rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
+ serverSession.rst(new RstInfo(stream.getId(),StreamStatus.REFUSED_STREAM));
synLatch.countDown();
return null;
}
- }), new SessionFrameListener.Adapter()
+ }),new SessionFrameListener.Adapter()
{
@Override
public void onRst(Session session, RstInfo rstInfo)
@@ -57,16 +62,17 @@ public class ResetStreamTest extends AbstractTest
}
});
- clientSession.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
+ Stream stream = clientSession.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
- Assert.assertTrue(synLatch.await(5, TimeUnit.SECONDS));
+ assertTrue("syncLatch didn't count down",synLatch.await(5,TimeUnit.SECONDS));
Session serverSession = serverSessionRef.get();
- Assert.assertEquals(0, serverSession.getStreams().size());
+ assertEquals("serverSession expected to contain 0 streams",0,serverSession.getStreams().size());
- Assert.assertTrue(rstLatch.await(5, TimeUnit.SECONDS));
+ assertTrue("rstLatch didn't count down",rstLatch.await(5,TimeUnit.SECONDS));
// Need to sleep a while to give the chance to the implementation to remove the stream
TimeUnit.SECONDS.sleep(1);
- Assert.assertEquals(0, clientSession.getStreams().size());
+ assertTrue("stream is expected to be reset",stream.isReset());
+ assertEquals("clientSession expected to contain 0 streams",0,clientSession.getStreams().size());
}
@Test
@@ -83,8 +89,8 @@ public class ResetStreamTest extends AbstractTest
try
{
// Refuse the stream, we must ignore data frames
- Assert.assertTrue(synLatch.await(5, TimeUnit.SECONDS));
- stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
+ assertTrue(synLatch.await(5,TimeUnit.SECONDS));
+ stream.getSession().rst(new RstInfo(stream.getId(),StreamStatus.REFUSED_STREAM));
return new StreamFrameListener.Adapter()
{
@Override
@@ -100,7 +106,7 @@ public class ResetStreamTest extends AbstractTest
return null;
}
}
- }), new SessionFrameListener.Adapter()
+ }),new SessionFrameListener.Adapter()
{
@Override
public void onRst(Session session, RstInfo rstInfo)
@@ -109,8 +115,8 @@ public class ResetStreamTest extends AbstractTest
}
});
- Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
- stream.data(new StringDataInfo("data", true), 5, TimeUnit.SECONDS, new Handler.Adapter()
+ Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
+ stream.data(new StringDataInfo("data",true),5,TimeUnit.SECONDS,new Handler.Adapter()
{
@Override
public void completed(Void context)
@@ -119,7 +125,60 @@ public class ResetStreamTest extends AbstractTest
}
});
- Assert.assertTrue(rstLatch.await(5, TimeUnit.SECONDS));
- Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
+ assertTrue("rstLatch didn't count down",rstLatch.await(5,TimeUnit.SECONDS));
+ assertTrue("stream is expected to be reset",stream.isReset());
+ assertFalse("dataLatch shouln't be count down",dataLatch.await(1,TimeUnit.SECONDS));
}
+
+ @Test
+ public void testResetAfterServerReceivedFirstDataFrameAndSecondDataFrameFails() throws Exception
+ {
+ final CountDownLatch synLatch = new CountDownLatch(1);
+ final CountDownLatch dataLatch = new CountDownLatch(1);
+ final CountDownLatch rstLatch = new CountDownLatch(1);
+ final CountDownLatch failLatch = new CountDownLatch(1);
+ Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ synLatch.countDown();
+ return new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataLatch.countDown();
+ stream.getSession().rst(new RstInfo(stream.getId(),StreamStatus.REFUSED_STREAM));
+ }
+ };
+ }
+ }),new SessionFrameListener.Adapter()
+ {
+ @Override
+ public void onRst(Session session, RstInfo rstInfo)
+ {
+ rstLatch.countDown();
+ }
+ });
+
+ Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
+ assertThat("syn is received by server", synLatch.await(5,TimeUnit.SECONDS),is(true));
+ stream.data(new StringDataInfo("data",false),5,TimeUnit.SECONDS,null);
+ assertThat("stream is reset",rstLatch.await(5,TimeUnit.SECONDS),is(true));
+ stream.data(new StringDataInfo("2nd dataframe",false),5L,TimeUnit.SECONDS,new Handler.Adapter()
+ {
+ @Override
+ public void failed(Throwable x)
+ {
+ failLatch.countDown();
+ }
+ });
+
+ assertThat("2nd data call failed",failLatch.await(5,TimeUnit.SECONDS),is(true));
+ assertThat("stream is reset",stream.isReset(),is(true));
+ }
+
+ // TODO: If server already received 2nd dataframe after it rst, it should ignore it. Not easy to do.
+
}
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java
index cdb9a7ef6e2..2e5855118cd 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java
@@ -36,6 +36,7 @@ public class SSLEngineLeakTest extends AbstractTest
Field field = NextProtoNego.class.getDeclaredField("objects");
field.setAccessible(true);
+ @SuppressWarnings("unchecked")
Map objects = (Map)field.get(null);
int initialSize = objects.size();
diff --git a/pom.xml b/pom.xml
index be0f93a52f7..81543887f9d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -497,6 +497,11 @@
junit
4.8.1
+
+ org.hamcrest
+ hamcrest-all
+ 1.1
+
org.mockito
mockito-core