From b92e7b01a972deb50e8305eb159d10b38b685fbd Mon Sep 17 00:00:00 2001 From: Thomas Becker Date: Fri, 25 May 2012 20:21:10 +0200 Subject: [PATCH] spdy: improve errorHandling, additional tests for sending big data with/without flow control, test that no more frames are sent on reset pushstreams, test for failing controller.writer(), small improvements Change-Id: Ide714e01b0ed3753b3e37103de2665158f41f35d --- .../java/org/eclipse/jetty/spdy/Promise.java | 3 +- .../eclipse/jetty/spdy/StandardSession.java | 108 +++++--- .../eclipse/jetty/spdy/StandardStream.java | 2 +- .../org/eclipse/jetty/spdy/api/Handler.java | 9 +- .../eclipse/jetty/spdy/AsyncTimeoutTest.java | 4 +- .../jetty/spdy/StandardSessionTest.java | 137 +++++++--- .../jetty/spdy/StandardStreamTest.java | 20 +- .../spdy/http/HTTPSPDYServerConnector.java | 1 + .../jetty/spdy/SPDYAsyncConnection.java | 3 +- .../org/eclipse/jetty/spdy/SPDYClient.java | 2 +- .../jetty/spdy/SPDYServerConnector.java | 11 + .../ServerSPDYAsyncConnectionFactory.java | 1 + .../org/eclipse/jetty/spdy/AbstractTest.java | 6 + .../eclipse/jetty/spdy/ClosedStreamTest.java | 11 - .../eclipse/jetty/spdy/FlowControlTest.java | 129 +++++---- .../eclipse/jetty/spdy/PushStreamTest.java | 249 +++++++++++++++--- .../eclipse/jetty/spdy/ResetStreamTest.java | 4 +- 17 files changed, 516 insertions(+), 184 deletions(-) diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Promise.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Promise.java index d44da62dee8..6f107874d54 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Promise.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Promise.java @@ -44,7 +44,8 @@ public class Promise implements Handler, Future latch.countDown(); } - public void failed(Throwable x) + @Override + public void failed(T context, Throwable x) { this.failure = x; latch.countDown(); diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index b77e9eb6afd..7988e5b6dcb 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy; import java.nio.ByteBuffer; import java.nio.channels.InterruptedByTimeoutException; +import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -93,6 +94,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler frameBytes = new DataFrameBytes<>(stream,handler,context,dataInfo); if (timeout > 0) - { frameBytes.task = scheduler.schedule(frameBytes,timeout,unit); - } append(frameBytes); flush(); } @@ -822,9 +819,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler find a better solution - if (stream != null && !streams.containsValue(stream) && !stream.isUnidirectional()) + if (stream != null && stream.isReset()) + { frameBytes.fail(new StreamException(stream.getId(),StreamStatus.INVALID_STREAM)); + return; + } break; } @@ -847,34 +846,50 @@ public class StandardSession implements ISession, Parser.Listener, Handler 0) + fail = failed; + if (!fail) { - FrameBytes element = queue.get(index - 1); - if (element.compareTo(frameBytes) >= 0) - break; - --index; + int index = queue.size(); + while (index > 0) + { + FrameBytes element = queue.get(index - 1); + if (element.compareTo(frameBytes) >= 0) + break; + --index; + } + queue.add(index,frameBytes); } - queue.add(index,frameBytes); } + + if (fail) + frameBytes.fail(new SPDYException("Session failed")); } private void prepend(FrameBytes frameBytes) { + boolean fail; synchronized (queue) { - int index = 0; - while (index < queue.size()) + fail = failed; + if (!fail) { - FrameBytes element = queue.get(index); - if (element.compareTo(frameBytes) <= 0) - break; - ++index; + int index = 0; + while (index < queue.size()) + { + FrameBytes element = queue.get(index); + if (element.compareTo(frameBytes) <= 0) + break; + ++index; + } + queue.add(index,frameBytes); } - queue.add(index,frameBytes); } + + if (fail) + frameBytes.fail(new SPDYException("Session failed")); } @Override @@ -889,9 +904,23 @@ public class StandardSession implements ISession, Parser.Listener, Handler frameBytesToFail = new ArrayList<>(); + frameBytesToFail.add(frameBytes); + + synchronized (queue) + { + failed = true; + String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue",frameBytes,queue.size()); + logger.debug(logMessage,x); + frameBytesToFail.addAll(queue); + queue.clear(); + flushing = false; + } + + for (FrameBytes fb : frameBytesToFail) + fb.fail(x); } protected void write(ByteBuffer buffer, Handler handler, FrameBytes frameBytes) @@ -951,12 +980,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler void notifyHandlerFailed(Handler handler, Throwable x) + private void notifyHandlerFailed(Handler handler, C context, Throwable x) { try { if (handler != null) - handler.failed(x); + handler.failed(context, x); } catch (Exception xx) { @@ -1013,7 +1042,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler 0) { // We have written a frame out of this DataInfo, but there is more to write. // We need to keep the correct ordering of frames, to avoid that another // DataInfo for the same stream is written before this one is finished. prepend(this); + if (!flowControlEnabled) + flush(); } else { @@ -1136,4 +1172,14 @@ public class StandardSession implements ISession, Parser.Listener, Handler *

Callback invoked when the operation completes.

* * @param context the context - * @see #failed(Throwable) + * @see #failed(Object, Throwable) */ public abstract void completed(C context); /** *

Callback invoked when the operation fails.

- * + * @param context the context * @param x the reason for the operation failure */ - public void failed(Throwable x); + public void failed(C context, Throwable x); /** *

Empty implementation of {@link Handler}

@@ -52,9 +52,8 @@ public interface Handler } @Override - public void failed(Throwable x) + public void failed(C context, Throwable x) { - throw new SPDYException(x); } } } diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java index 85cea3452d7..d003ed40534 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java @@ -72,7 +72,7 @@ public class AsyncTimeoutTest } @Override - public void failed(Throwable x) + public void failed(Stream stream, Throwable x) { failedLatch.countDown(); } @@ -120,7 +120,7 @@ public class AsyncTimeoutTest } @Override - public void failed(Throwable x) + public void failed(Void context, Throwable x) { failedLatch.countDown(); } diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java index c999c34c59b..fed81806b9d 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java @@ -19,13 +19,10 @@ package org.eclipse.jetty.spdy; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.*; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -34,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.eclipse.jetty.spdy.StandardSession.FrameBytes; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.Handler; import org.eclipse.jetty.spdy.api.Headers; @@ -55,13 +53,16 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; @RunWith(MockitoJUnitRunner.class) public class StandardSessionTest { @Mock - private ISession sessionMock; + private Controller controller; + private ByteBufferPool bufferPool; private Executor threadPool; private StandardSession session; @@ -76,13 +77,36 @@ public class StandardSessionTest threadPool = Executors.newCachedThreadPool(); scheduler = Executors.newSingleThreadScheduledExecutor(); generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory.StandardCompressor()); - session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,new TestController(),null,1,null,generator); + session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,controller,null,1,null,generator); headers = new Headers(); } + @SuppressWarnings("unchecked") + private void setControllerWriteExpectationToFail(final boolean fail) + { + when(controller.write(any(ByteBuffer.class),any(Handler.class),any(StandardSession.FrameBytes.class))).thenAnswer(new Answer() + { + public Integer answer(InvocationOnMock invocation) + { + Object[] args = invocation.getArguments(); + + Handler handler = (Handler)args[1]; + FrameBytes context = (FrameBytes)args[2]; + + if (fail) + handler.failed(context,new ClosedChannelException()); + else + handler.completed(context); + return 0; + } + }); + } + @Test public void testStreamIsRemovedFromSessionWhenReset() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + IStream stream = createStream(); assertThatStreamIsInSession(stream); assertThat("stream is not reset",stream.isReset(),is(false)); @@ -94,6 +118,8 @@ public class StandardSessionTest @Test public void testStreamIsAddedAndRemovedFromSession() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + IStream stream = createStream(); assertThatStreamIsInSession(stream); stream.updateCloseState(true,true); @@ -105,6 +131,8 @@ public class StandardSessionTest @Test public void testStreamIsRemovedWhenHeadersWithCloseFlagAreSent() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + IStream stream = createStream(); assertThatStreamIsInSession(stream); stream.updateCloseState(true,false); @@ -116,6 +144,8 @@ public class StandardSessionTest @Test public void testStreamIsUnidirectional() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + IStream stream = createStream(); assertThat("stream is not unidirectional",stream.isUnidirectional(),not(true)); Stream pushStream = createPushStream(stream); @@ -125,6 +155,8 @@ public class StandardSessionTest @Test public void testPushStreamCreation() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + Stream stream = createStream(); IStream pushStream = createPushStream(stream); assertThat("Push stream must be associated to the first stream created",pushStream.getAssociatedStream().getId(),is(stream.getId())); @@ -134,6 +166,8 @@ public class StandardSessionTest @Test public void testPushStreamIsNotClosedWhenAssociatedStreamIsClosed() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + IStream stream = createStream(); Stream pushStream = createPushStream(stream); assertThatStreamIsNotHalfClosed(stream); @@ -155,6 +189,8 @@ public class StandardSessionTest @Test public void testCreatePushStreamOnClosedStream() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + IStream stream = createStream(); stream.updateCloseState(true,true); assertThatStreamIsHalfClosed(stream); @@ -167,15 +203,10 @@ public class StandardSessionTest { final CountDownLatch failedLatch = new CountDownLatch(1); SynInfo synInfo = new SynInfo(headers,false,stream.getPriority()); - stream.syn(synInfo,5,TimeUnit.SECONDS,new Handler() + stream.syn(synInfo,5,TimeUnit.SECONDS,new Handler.Adapter() { @Override - public void completed(Stream context) - { - } - - @Override - public void failed(Throwable x) + public void failed(Stream stream, Throwable x) { failedLatch.countDown(); } @@ -186,6 +217,8 @@ public class StandardSessionTest @Test public void testPushStreamIsAddedAndRemovedFromParentAndSessionWhenClosed() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + IStream stream = createStream(); IStream pushStream = createPushStream(stream); assertThatPushStreamIsHalfClosed(pushStream); @@ -200,6 +233,8 @@ public class StandardSessionTest @Test public void testPushStreamIsRemovedWhenReset() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + IStream stream = createStream(); IStream pushStream = (IStream)stream.syn(new SynInfo(false)).get(); assertThatPushStreamIsInSession(pushStream); @@ -212,6 +247,8 @@ public class StandardSessionTest @Test public void testPushStreamWithSynInfoClosedTrue() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + IStream stream = createStream(); SynInfo synInfo = new SynInfo(headers,true,stream.getPriority()); IStream pushStream = (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS); @@ -225,6 +262,8 @@ public class StandardSessionTest public void testPushStreamSendHeadersWithCloseFlagIsRemovedFromSessionAndDisassociateFromParent() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + IStream stream = createStream(); SynInfo synInfo = new SynInfo(headers,false,stream.getPriority()); IStream pushStream = (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS); @@ -240,6 +279,8 @@ public class StandardSessionTest @Test public void testCreatedAndClosedListenersAreCalledForNewStream() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + final CountDownLatch createdListenerCalledLatch = new CountDownLatch(1); final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1); session.addListener(new TestStreamListener(createdListenerCalledLatch,closedListenerCalledLatch)); @@ -253,6 +294,8 @@ public class StandardSessionTest @Test public void testListenerIsCalledForResetStream() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1); session.addListener(new TestStreamListener(null,closedListenerCalledLatch)); IStream stream = createStream(); @@ -263,6 +306,8 @@ public class StandardSessionTest @Test public void testCreatedAndClosedListenersAreCalledForNewPushStream() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + final CountDownLatch createdListenerCalledLatch = new CountDownLatch(2); final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1); session.addListener(new TestStreamListener(createdListenerCalledLatch,closedListenerCalledLatch)); @@ -277,6 +322,8 @@ public class StandardSessionTest @Test public void testListenerIsCalledForResetPushStream() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1); session.addListener(new TestStreamListener(null,closedListenerCalledLatch)); IStream stream = createStream(); @@ -313,22 +360,12 @@ public class StandardSessionTest } } - @SuppressWarnings("unchecked") - @Test(expected = IllegalStateException.class) - public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException - { - SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null); - IStream stream = new StandardStream(synStreamFrame,sessionMock,8184,null); - stream.updateCloseState(synStreamFrame.isClose(),true); - assertThat("stream is half closed",stream.isHalfClosed(),is(true)); - stream.data(new StringDataInfo("data on half closed stream",true)); - verify(sessionMock,never()).data(any(IStream.class),any(DataInfo.class),anyInt(),any(TimeUnit.class),any(Handler.class),any(void.class)); - } - @Test @Ignore("In V3 we need to rst the stream if we receive data on a remotely half closed stream.") public void receiveDataOnRemotelyHalfClosedStreamResetsStreamInV3() throws InterruptedException, ExecutionException { + setControllerWriteExpectationToFail(false); + IStream stream = (IStream)session.syn(new SynInfo(false),new StreamFrameListener.Adapter()).get(); stream.updateCloseState(true,false); assertThat("stream is half closed from remote side",stream.isHalfClosed(),is(true)); @@ -338,6 +375,8 @@ public class StandardSessionTest @Test public void testReceiveDataOnRemotelyClosedStreamIsIgnored() throws InterruptedException, ExecutionException, TimeoutException { + setControllerWriteExpectationToFail(false); + final CountDownLatch onDataCalledLatch = new CountDownLatch(1); Stream stream = session.syn(new SynInfo(false),new StreamFrameListener.Adapter() { @@ -353,10 +392,39 @@ public class StandardSessionTest assertThat("onData is never called",onDataCalledLatch.await(1,TimeUnit.SECONDS),not(true)); } + @SuppressWarnings("unchecked") + @Test + public void testControllerWriteFailsInEndPointFlush() throws InterruptedException + { + setControllerWriteExpectationToFail(true); + + final CountDownLatch failedCalledLatch = new CountDownLatch(2); + SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null); + IStream stream = new StandardStream(synStreamFrame,session,8192,null); + + Handler.Adapter handler = new Handler.Adapter() + { + @Override + public void failed(Void context, Throwable x) + { + failedCalledLatch.countDown(); + } + }; + + // first data frame should fail on controller.write() + stream.data(new StringDataInfo("data",false),5,TimeUnit.SECONDS,handler); + // second data frame should fail without controller.writer() as the connection is expected to be broken after first controller.write() call failed. + stream.data(new StringDataInfo("data",false),5,TimeUnit.SECONDS,handler); + + verify(controller,times(1)).write(any(ByteBuffer.class),any(Handler.class),any(FrameBytes.class)); + assertThat("Handler.failed has been called twice",failedCalledLatch.await(5,TimeUnit.SECONDS),is(true)); + + } + private IStream createStream() throws InterruptedException, ExecutionException, TimeoutException { SynInfo synInfo = new SynInfo(headers,false,(byte)0); - return (IStream)session.syn(synInfo,new StreamFrameListener.Adapter()).get(5,TimeUnit.SECONDS); + return (IStream)session.syn(synInfo,new StreamFrameListener.Adapter()).get(50,TimeUnit.SECONDS); } private IStream createPushStream(Stream stream) throws InterruptedException, ExecutionException, TimeoutException @@ -365,21 +433,6 @@ public class StandardSessionTest return (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS); } - private static class TestController implements Controller - { - @Override - public int write(ByteBuffer buffer, Handler handler, StandardSession.FrameBytes context) - { - handler.completed(context); - return buffer.remaining(); - } - - @Override - public void close(boolean onlyOutput) - { - } - } - private void assertThatStreamIsClosed(IStream stream) { assertThat("stream is closed",stream.isClosed(),is(true)); diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java index 68a6a9576c6..ba5f84d1831 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java @@ -18,19 +18,26 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.Handler; +import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.StreamFrameListener; +import org.eclipse.jetty.spdy.api.StringDataInfo; import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.frames.SynStreamFrame; import org.junit.Test; @@ -101,7 +108,7 @@ public class StandardStreamTest stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter() { @Override - public void failed(Throwable x) + public void failed(Stream stream, Throwable x) { failedLatch.countDown(); } @@ -109,4 +116,15 @@ public class StandardStreamTest assertThat("PushStream creation failed", failedLatch.getCount(), equalTo(0L)); } + @SuppressWarnings("unchecked") + @Test(expected = IllegalStateException.class) + public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException + { + SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null); + IStream stream = new StandardStream(synStreamFrame,session,8192,null); + stream.updateCloseState(synStreamFrame.isClose(),true); + assertThat("stream is half closed",stream.isHalfClosed(),is(true)); + stream.data(new StringDataInfo("data on half closed stream",true)); + verify(session,never()).data(any(IStream.class),any(DataInfo.class),anyInt(),any(TimeUnit.class),any(Handler.class),any(void.class)); + } } diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java index 63544a0356b..8a01d0c5f4d 100644 --- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java +++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java @@ -41,6 +41,7 @@ public class HTTPSPDYServerConnector extends SPDYServerConnector super(null, sslContextFactory); // Override the default connection factory for non-SSL connections defaultConnectionFactory = new ServerHTTPAsyncConnectionFactory(this); + setFlowControlEnabled(false); } @Override diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java index 40dfecde36b..faa51a3c18e 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java @@ -122,7 +122,8 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn catch (Exception x) { close(false); - handler.failed(x); + handler.failed(context, x); + return -1; } finally { diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java index 438e2e3cf03..6f04c5bb3c1 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java @@ -398,7 +398,7 @@ public class SPDYClient } catch (RuntimeException x) { - sessionPromise.failed(x); + sessionPromise.failed(null,x); throw x; } } diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java index 41412b3f0c5..1e431e2eebf 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java @@ -58,6 +58,7 @@ public class SPDYServerConnector extends SelectChannelConnector private final ServerSessionFrameListener listener; private final SslContextFactory sslContextFactory; private AsyncConnectionFactory defaultConnectionFactory; + private volatile boolean flowControlEnabled = true; public SPDYServerConnector(ServerSessionFrameListener listener) { @@ -287,4 +288,14 @@ public class SPDYServerConnector extends SelectChannelConnector { return Collections.unmodifiableCollection(sessions); } + + public boolean isFlowControlEnabled() + { + return flowControlEnabled; + } + + public void setFlowControlEnabled(boolean flowControl) + { + this.flowControlEnabled = flowControl; + } } diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java index 79ac28d1a0c..4b187d7dab2 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java @@ -67,6 +67,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory endPoint.setConnection(connection); final StandardSession session = new StandardSession(version, bufferPool, threadPool, scheduler, connection, connection, 2, listener, generator); + session.setFlowControlEnabled(connector.isFlowControlEnabled()); parser.addListener(session); connection.setSession(session); diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java index b30db27724d..262ee2d7ba0 100644 --- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java @@ -52,10 +52,16 @@ public abstract class AbstractTest protected SPDYServerConnector connector; protected InetSocketAddress startServer(ServerSessionFrameListener listener) throws Exception + { + return startServer(listener,true); + } + + protected InetSocketAddress startServer(ServerSessionFrameListener listener, boolean flowControl) throws Exception { if (connector == null) connector = newSPDYServerConnector(listener); connector.setPort(0); + connector.setFlowControlEnabled(flowControl); server = new Server(); server.addConnector(connector); server.start(); diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java index 7b399f35f1e..e58209c8b27 100644 --- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java @@ -38,7 +38,6 @@ import org.eclipse.jetty.spdy.api.StringDataInfo; import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.spdy.frames.ControlFrame; -import org.eclipse.jetty.spdy.frames.DataFrame; import org.eclipse.jetty.spdy.frames.GoAwayFrame; import org.eclipse.jetty.spdy.frames.RstStreamFrame; import org.eclipse.jetty.spdy.frames.SynReplyFrame; @@ -145,14 +144,12 @@ public class ClosedStreamTest extends AbstractTest public void onReply(Stream stream, ReplyInfo replyInfo) { replyReceivedLatch.countDown(); - super.onReply(stream,replyInfo); } @Override public void onData(Stream stream, DataInfo dataInfo) { clientReceivedDataLatch.countDown(); - super.onData(stream,dataInfo); } }).get(); assertThat("reply has been received by client",replyReceivedLatch.await(5,TimeUnit.SECONDS),is(true)); @@ -204,7 +201,6 @@ public class ClosedStreamTest extends AbstractTest public void onData(Stream stream, DataInfo dataInfo) { serverDataReceivedLatch.countDown(); - super.onData(stream,dataInfo); } }; } @@ -250,13 +246,6 @@ public class ClosedStreamTest extends AbstractTest { clientResetReceivedLatch.countDown(); } - super.onControlFrame(frame); - } - - @Override - public void onDataFrame(DataFrame frame, ByteBuffer data) - { - super.onDataFrame(frame,data); } }); ByteBuffer response = ByteBuffer.allocate(28); diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java index db2303cea30..89792d3bdd0 100644 --- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java @@ -15,6 +15,8 @@ */ package org.eclipse.jetty.spdy; +import static org.junit.Assert.*; +import static org.hamcrest.Matchers.*; import java.nio.ByteBuffer; import java.util.concurrent.Callable; @@ -25,6 +27,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.BytesDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; @@ -183,43 +186,22 @@ public class FlowControlTest extends AbstractTest }); DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); - // Check that we are flow control stalled - expectException(TimeoutException.class, new Callable() - { - @Override - public DataInfo call() throws Exception - { - return exchanger.exchange(null, 1, TimeUnit.SECONDS); - } - }); + checkThatWeAreFlowControlStalled(exchanger); + Assert.assertEquals(windowSize, dataInfo.available()); Assert.assertEquals(0, dataInfo.consumed()); dataInfo.asByteBuffer(true); dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); - // Check that we are flow control stalled - expectException(TimeoutException.class, new Callable() - { - @Override - public DataInfo call() throws Exception - { - return exchanger.exchange(null, 1, TimeUnit.SECONDS); - } - }); + checkThatWeAreFlowControlStalled(exchanger); + Assert.assertEquals(0, dataInfo.available()); Assert.assertEquals(0, dataInfo.consumed()); dataInfo.consume(dataInfo.length()); dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); - // Check that we are flow control stalled - expectException(TimeoutException.class, new Callable() - { - @Override - public DataInfo call() throws Exception - { - return exchanger.exchange(null, 1, TimeUnit.SECONDS); - } - }); + checkThatWeAreFlowControlStalled(exchanger); + Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed()); dataInfo.asByteBuffer(true); @@ -312,43 +294,22 @@ public class FlowControlTest extends AbstractTest stream.data(new BytesDataInfo(new byte[length], true)); DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); - // Check that we are flow control stalled - expectException(TimeoutException.class, new Callable() - { - @Override - public DataInfo call() throws Exception - { - return exchanger.exchange(null, 1, TimeUnit.SECONDS); - } - }); + checkThatWeAreFlowControlStalled(exchanger); + Assert.assertEquals(windowSize, dataInfo.available()); Assert.assertEquals(0, dataInfo.consumed()); dataInfo.asByteBuffer(true); dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); - // Check that we are flow control stalled - expectException(TimeoutException.class, new Callable() - { - @Override - public DataInfo call() throws Exception - { - return exchanger.exchange(null, 1, TimeUnit.SECONDS); - } - }); + checkThatWeAreFlowControlStalled(exchanger); + Assert.assertEquals(0, dataInfo.available()); Assert.assertEquals(0, dataInfo.consumed()); dataInfo.consume(dataInfo.length()); dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS); - // Check that we are flow control stalled - expectException(TimeoutException.class, new Callable() - { - @Override - public DataInfo call() throws Exception - { - return exchanger.exchange(null, 1, TimeUnit.SECONDS); - } - }); + checkThatWeAreFlowControlStalled(exchanger); + Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed()); dataInfo.asByteBuffer(true); @@ -451,6 +412,66 @@ public class FlowControlTest extends AbstractTest Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } + @Test + public void testSendBigFileWithoutFlowControl() throws Exception + { + boolean flowControlEnabled = false; + testSendBigFile(flowControlEnabled); + } + + @Test + public void testSendBigFileWithFlowControl() throws Exception + { + boolean flowControlEnabled = true; + testSendBigFile(flowControlEnabled); + } + + private void testSendBigFile(boolean flowControlEnabled) throws Exception, InterruptedException + { + final int dataSize = 1024 * 1024; + final ByteBufferDataInfo bigByteBufferDataInfo = new ByteBufferDataInfo(ByteBuffer.allocate(dataSize),false); + final CountDownLatch allDataReceivedLatch = new CountDownLatch(1); + + Session session = startClient(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.reply(new ReplyInfo(false)); + stream.data(bigByteBufferDataInfo); + return null; + } + },flowControlEnabled),new SessionFrameListener.Adapter()); + + session.syn(new SynInfo(false),new StreamFrameListener.Adapter() + { + private int dataBytesReceived; + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataBytesReceived = dataBytesReceived + dataInfo.length(); + dataInfo.consume(dataInfo.length()); + if (dataBytesReceived == dataSize) + allDataReceivedLatch.countDown(); + } + }); + + assertThat("all data bytes have been received by the client",allDataReceivedLatch.await(5,TimeUnit.SECONDS),is(true)); + } + + private void checkThatWeAreFlowControlStalled(final Exchanger exchanger) + { + expectException(TimeoutException.class, new Callable() + { + @Override + public DataInfo call() throws Exception + { + return exchanger.exchange(null, 1, TimeUnit.SECONDS); + } + }); + } + private void expectException(Class exception, Callable command) { try diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java index 8843dc12a80..265aea00803 100644 --- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java @@ -16,34 +16,55 @@ package org.eclipse.jetty.spdy; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.Exchanger; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import org.eclipse.jetty.spdy.api.BytesDataInfo; -import org.eclipse.jetty.spdy.api.DataInfo; -import org.eclipse.jetty.spdy.api.Handler; -import org.eclipse.jetty.spdy.api.ReplyInfo; -import org.eclipse.jetty.spdy.api.Session; -import org.eclipse.jetty.spdy.api.SessionFrameListener; -import org.eclipse.jetty.spdy.api.Stream; -import org.eclipse.jetty.spdy.api.StreamFrameListener; -import org.eclipse.jetty.spdy.api.StringDataInfo; -import org.eclipse.jetty.spdy.api.SynInfo; -import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; -import org.junit.Test; - import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Exchanger; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.spdy.api.BytesDataInfo; +import org.eclipse.jetty.spdy.api.DataInfo; +import org.eclipse.jetty.spdy.api.GoAwayInfo; +import org.eclipse.jetty.spdy.api.Handler; +import org.eclipse.jetty.spdy.api.Headers; +import org.eclipse.jetty.spdy.api.ReplyInfo; +import org.eclipse.jetty.spdy.api.RstInfo; +import org.eclipse.jetty.spdy.api.SPDY; +import org.eclipse.jetty.spdy.api.Session; +import org.eclipse.jetty.spdy.api.SessionFrameListener; +import org.eclipse.jetty.spdy.api.SessionStatus; +import org.eclipse.jetty.spdy.api.Stream; +import org.eclipse.jetty.spdy.api.StreamFrameListener; +import org.eclipse.jetty.spdy.api.StreamStatus; +import org.eclipse.jetty.spdy.api.StringDataInfo; +import org.eclipse.jetty.spdy.api.SynInfo; +import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; +import org.eclipse.jetty.spdy.frames.ControlFrame; +import org.eclipse.jetty.spdy.frames.DataFrame; +import org.eclipse.jetty.spdy.frames.GoAwayFrame; +import org.eclipse.jetty.spdy.frames.RstStreamFrame; +import org.eclipse.jetty.spdy.frames.SynStreamFrame; +import org.eclipse.jetty.spdy.frames.WindowUpdateFrame; +import org.eclipse.jetty.spdy.generator.Generator; +import org.eclipse.jetty.spdy.parser.Parser; +import org.eclipse.jetty.spdy.parser.Parser.Listener; +import org.junit.Assert; +import org.junit.Test; + public class PushStreamTest extends AbstractTest { @Test @@ -66,10 +87,10 @@ public class PushStreamTest extends AbstractTest @Override public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) { - assertThat("streamId is even", stream.getId() % 2, is(0)); - assertThat("stream is unidirectional", stream.isUnidirectional(), is(true)); - assertThat("stream is closed", stream.isClosed(), is(true)); - assertThat("stream has associated stream", stream.getAssociatedStream(), notNullValue()); + assertThat("streamId is even",stream.getId() % 2,is(0)); + assertThat("stream is unidirectional",stream.isUnidirectional(),is(true)); + assertThat("stream is closed",stream.isClosed(),is(true)); + assertThat("stream has associated stream",stream.getAssociatedStream(),notNullValue()); try { stream.reply(new ReplyInfo(false)); @@ -85,10 +106,10 @@ public class PushStreamTest extends AbstractTest } }); - Stream stream = clientSession.syn(new SynInfo(true), null).get(); - assertThat("onSyn has been called", pushStreamLatch.await(5, TimeUnit.SECONDS), is(true)); + Stream stream = clientSession.syn(new SynInfo(true),null).get(); + assertThat("onSyn has been called",pushStreamLatch.await(5,TimeUnit.SECONDS),is(true)); Stream pushStream = pushStreamRef.get(); - assertThat("main stream and associated stream are the same", stream, sameInstance(pushStream.getAssociatedStream())); + assertThat("main stream and associated stream are the same",stream,sameInstance(pushStream.getAssociatedStream())); } @Test @@ -221,7 +242,7 @@ public class PushStreamTest extends AbstractTest stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter() { @Override - public void failed(Throwable x) + public void failed(Stream stream, Throwable x) { pushStreamFailedLatch.countDown(); } @@ -321,6 +342,170 @@ public class PushStreamTest extends AbstractTest return bytes; } + + @Test + public void testClientResetsStreamAfterPushSynDoesPreventSendingDataFramesWithFlowControl() throws Exception + { + final boolean flowControl = true; + testNoMoreFramesAreSentOnPushStreamAfterClientResetsThePushStream(flowControl); + } + + @Test + public void testClientResetsStreamAfterPushSynDoesPreventSendingDataFramesWithoutFlowControl() throws Exception + { + final boolean flowControl = false; + testNoMoreFramesAreSentOnPushStreamAfterClientResetsThePushStream(flowControl); + } + + private volatile boolean read = true; + private void testNoMoreFramesAreSentOnPushStreamAfterClientResetsThePushStream(final boolean flowControl) throws Exception, IOException, InterruptedException + { + final short version = SPDY.V3; + final AtomicBoolean unexpectedExceptionOccured = new AtomicBoolean(false); + final CountDownLatch resetReceivedLatch = new CountDownLatch(1); + final CountDownLatch allDataFramesReceivedLatch = new CountDownLatch(1); + final CountDownLatch goAwayReceivedLatch = new CountDownLatch(1); + final int dataSizeInBytes = 1024 * 256; + final byte[] transferBytes = createHugeByteArray(dataSizeInBytes); + + InetSocketAddress serverAddress = startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(final Stream stream, SynInfo synInfo) + { + new Thread(new Runnable() + { + + @Override + public void run() + { + Stream pushStream=null; + try + { + stream.reply(new ReplyInfo(false)); + pushStream = stream.syn(new SynInfo(false)).get(); + resetReceivedLatch.await(5,TimeUnit.SECONDS); + } + catch (InterruptedException | ExecutionException e) + { + e.printStackTrace(); + unexpectedExceptionOccured.set(true); + } + pushStream.data(new BytesDataInfo(transferBytes,true)); + stream.data(new StringDataInfo("close",true)); + } + }).start(); + return null; + } + + @Override + public void onRst(Session session, RstInfo rstInfo) + { + resetReceivedLatch.countDown(); + } + + @Override + public void onGoAway(Session session, GoAwayInfo goAwayInfo) + { + goAwayReceivedLatch.countDown(); + } + }, flowControl); + + final SocketChannel channel = SocketChannel.open(serverAddress); + final Generator generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory.StandardCompressor()); + int streamId = 1; + ByteBuffer writeBuffer = generator.control(new SynStreamFrame(version,(byte)0,streamId,0,(byte)0,new Headers())); + channel.write(writeBuffer); + assertThat("writeBuffer is fully written",writeBuffer.hasRemaining(), is(false)); + + final Parser parser = new Parser(new StandardCompressionFactory.StandardDecompressor()); + parser.addListener(new Listener.Adapter() + { + int bytesRead = 0; + + @Override + public void onControlFrame(ControlFrame frame) + { + if(frame instanceof SynStreamFrame){ + int pushStreamId = ((SynStreamFrame)frame).getStreamId(); + ByteBuffer writeBuffer = generator.control(new RstStreamFrame(version,pushStreamId,StreamStatus.CANCEL_STREAM.getCode(version))); + try + { + channel.write(writeBuffer); + } + catch (IOException e) + { + e.printStackTrace(); + unexpectedExceptionOccured.set(true); + } + } + } + + @Override + public void onDataFrame(DataFrame frame, ByteBuffer data) + { + if(frame.getStreamId() == 2) + bytesRead = bytesRead + frame.getLength(); + if(bytesRead == dataSizeInBytes){ + allDataFramesReceivedLatch.countDown(); + return; + } + if (flowControl) + { + ByteBuffer writeBuffer = generator.control(new WindowUpdateFrame(version,frame.getStreamId(),frame.getLength())); + try + { + channel.write(writeBuffer); + } + catch (IOException e) + { + e.printStackTrace(); + unexpectedExceptionOccured.set(true); + } + } + } + }); + + Thread reader = new Thread(new Runnable() + { + @Override + public void run() + { + ByteBuffer readBuffer = ByteBuffer.allocate(dataSizeInBytes*2); + while (read) + { + try + { + channel.read(readBuffer); + } + catch (IOException e) + { + e.printStackTrace(); + unexpectedExceptionOccured.set(true); + } + readBuffer.flip(); + parser.parse(readBuffer); + readBuffer.clear(); + } + + } + }); + reader.start(); + read = false; + + assertThat("no unexpected exceptions occured", unexpectedExceptionOccured.get(), is(false)); + assertThat("not all dataframes have been received as the pushstream has been reset by the client.",allDataFramesReceivedLatch.await(streamId,TimeUnit.SECONDS),is(false)); + + + ByteBuffer buffer = generator.control(new GoAwayFrame(version, streamId, SessionStatus.OK.getCode())); + channel.write(buffer); + Assert.assertThat(buffer.hasRemaining(), is(false)); + + assertThat("GoAway frame is received by server", goAwayReceivedLatch.await(5,TimeUnit.SECONDS), is(true)); + channel.shutdownOutput(); + channel.close(); + } + @Test public void testOddEvenStreamIds() throws Exception { @@ -334,7 +519,7 @@ public class PushStreamTest extends AbstractTest stream.syn(new SynInfo(false)); return null; } - }),new SessionFrameListener.Adapter() + }, true),new SessionFrameListener.Adapter() { @Override public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) @@ -367,6 +552,6 @@ public class PushStreamTest extends AbstractTest private void assertThatNoExceptionOccured(final CountDownLatch exceptionCountDownLatch) throws InterruptedException { - assertThat("No exception occured", exceptionCountDownLatch.await(1,TimeUnit.SECONDS),is(false)); + assertThat("No exception occured",exceptionCountDownLatch.await(1,TimeUnit.SECONDS),is(false)); } } diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java index 654bb5fddda..7a9fe5897cc 100644 --- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java @@ -28,7 +28,7 @@ public class ResetStreamTest extends AbstractTest @Test public void testResetStreamIsRemoved() throws Exception { - Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()),null); + Session session = startClient(startServer(new ServerSessionFrameListener.Adapter(), true),null); Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS); session.rst(new RstInfo(stream.getId(),StreamStatus.CANCEL_STREAM)).get(5,TimeUnit.SECONDS); @@ -169,7 +169,7 @@ public class ResetStreamTest extends AbstractTest stream.data(new StringDataInfo("2nd dataframe",false),5L,TimeUnit.SECONDS,new Handler.Adapter() { @Override - public void failed(Throwable x) + public void failed(Void context, Throwable x) { failLatch.countDown(); }