diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java index 3fe6559b3ac..963094776e0 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java @@ -114,6 +114,15 @@ public class StandardStream extends IdleTimeout implements IStream StreamFrameListener listener = this.listener; if (listener != null) listener.onFailure(this, timeout); + // The stream is now gone, we must close it to + // avoid that its idle timeout is rescheduled. + close(); + } + + private void close() + { + closeState = CloseState.CLOSED; + onClose(); } @Override @@ -189,13 +198,13 @@ public class StandardStream extends IdleTimeout implements IStream if (local) throw new IllegalStateException(); else - closeState = CloseState.CLOSED; + close(); break; } case REMOTELY_CLOSED: { if (local) - closeState = CloseState.CLOSED; + close(); else throw new IllegalStateException(); break; @@ -369,12 +378,13 @@ public class StandardStream extends IdleTimeout implements IStream notIdle(); if (isClosed() || isReset()) { + close(); promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED, "Stream: " + this + " already closed or reset!")); return; } PushSynInfo pushSynInfo = new PushSynInfo(getId(), pushInfo); - session.syn(pushSynInfo, null, promise); + session.syn(pushSynInfo, null, new StreamPromise(promise)); } @Override @@ -393,11 +403,14 @@ public class StandardStream extends IdleTimeout implements IStream { notIdle(); if (isUnidirectional()) + { + close(); throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams"); + } openState = OpenState.REPLY_SENT; updateCloseState(replyInfo.isClose(), true); SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders()); - session.control(this, frame, replyInfo.getTimeout(), replyInfo.getUnit(), callback); + session.control(this, frame, replyInfo.getTimeout(), replyInfo.getUnit(), new StreamCallback(callback)); } @Override @@ -417,18 +430,18 @@ public class StandardStream extends IdleTimeout implements IStream notIdle(); if (!canSend()) { - session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); + session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback()); throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame"); } if (isLocallyClosed()) { - session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); + session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback()); throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream"); } // Cannot update the close state here, because the data that we send may // be flow controlled, so we need the stream to update the window size. - session.data(this, dataInfo, dataInfo.getTimeout(), dataInfo.getUnit(), callback); + session.data(this, dataInfo, dataInfo.getTimeout(), dataInfo.getUnit(), new StreamCallback(callback)); } @Override @@ -448,18 +461,18 @@ public class StandardStream extends IdleTimeout implements IStream notIdle(); if (!canSend()) { - session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); + session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback()); throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame"); } if (isLocallyClosed()) { - session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); + session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback()); throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream"); } updateCloseState(headersInfo.isClose(), true); HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders()); - session.control(this, frame, headersInfo.getTimeout(), headersInfo.getUnit(), callback); + session.control(this, frame, headersInfo.getTimeout(), headersInfo.getUnit(), new StreamCallback(callback)); } @Override @@ -527,4 +540,55 @@ public class StandardStream extends IdleTimeout implements IStream { OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED } + + private class StreamCallback implements Callback + { + private final Callback callback; + + private StreamCallback() + { + this(new Adapter()); + } + + private StreamCallback(Callback callback) + { + this.callback = callback; + } + + @Override + public void succeeded() + { + callback.succeeded(); + } + + @Override + public void failed(Throwable x) + { + close(); + callback.failed(x); + } + } + + private class StreamPromise implements Promise + { + private final Promise promise; + + public StreamPromise(Promise promise) + { + this.promise = promise; + } + + @Override + public void succeeded(Stream result) + { + promise.succeeded(result); + } + + @Override + public void failed(Throwable x) + { + close(); + promise.failed(x); + } + } } diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java index fd5d580075b..ea8b661c0cd 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java @@ -18,16 +18,6 @@ package org.eclipse.jetty.spdy; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.HashSet; @@ -38,6 +28,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; @@ -46,6 +38,7 @@ import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.PushInfo; +import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Session; @@ -58,7 +51,6 @@ import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.frames.DataFrame; import org.eclipse.jetty.spdy.frames.SettingsFrame; import org.eclipse.jetty.spdy.frames.SynReplyFrame; -import org.eclipse.jetty.spdy.frames.SynStreamFrame; import org.eclipse.jetty.spdy.generator.Generator; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Fields; @@ -66,9 +58,10 @@ import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.TimerScheduler; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -78,6 +71,16 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @RunWith(MockitoJUnitRunner.class) public class StandardSessionTest { @@ -101,7 +104,7 @@ public class StandardSessionTest public void setUp() throws Exception { threadPool = Executors.newCachedThreadPool(); - scheduler = new TimerScheduler(); + scheduler = new ScheduledExecutorScheduler(); scheduler.start(); session = new StandardSession(VERSION, bufferPool, scheduler, controller, endPoint, null, 1, null, generator, new FlowControlStrategy.None()); @@ -131,8 +134,7 @@ public class StandardSessionTest callback.succeeded(); return null; } - }) - .when(controller).write(any(Callback.class), any(ByteBuffer.class)); + }).when(controller).write(any(Callback.class), any(ByteBuffer.class)); } @Test @@ -429,14 +431,38 @@ public class StandardSessionTest @SuppressWarnings("unchecked") @Test - public void testControllerWriteFailsInEndPointFlush() throws InterruptedException + public void testControllerWriteFails() throws Exception { - setControllerWriteExpectation(true); + final AtomicInteger writes = new AtomicInteger(); + final AtomicBoolean fail = new AtomicBoolean(); + Controller controller = new Controller() + { + @Override + public void write(Callback callback, ByteBuffer... buffers) + { + writes.incrementAndGet(); + if (fail.get()) + callback.failed(new ClosedChannelException()); + else + callback.succeeded(); + } - final CountDownLatch failedCalledLatch = new CountDownLatch(2); - SynStreamFrame synStreamFrame = new SynStreamFrame(VERSION, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null); - IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null, null); + @Override + public void close(boolean onlyOutput) + { + + } + }; + ISession session = new StandardSession(VERSION, bufferPool, scheduler, controller, endPoint, null, 1, null, generator, null); + IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null); stream.updateWindowSize(8192); + + // Send a reply to comply with the API usage + stream.reply(new ReplyInfo(false), new Callback.Adapter()); + + // Make the controller fail + fail.set(true); + final CountDownLatch failedCalledLatch = new CountDownLatch(1); Callback.Adapter callback = new Callback.Adapter() { @Override @@ -445,14 +471,11 @@ public class StandardSessionTest failedCalledLatch.countDown(); } }; - - // first data frame should fail on controller.write() - stream.data(new StringDataInfo(5, TimeUnit.SECONDS, "data", false), callback); - // second data frame should fail without controller.write() as the connection is expected to be broken after first controller.write() call failed. + // Data frame should fail on controller.write() stream.data(new StringDataInfo(5, TimeUnit.SECONDS, "data", false), callback); - verify(controller, times(1)).write(any(Callback.class), any(ByteBuffer.class)); - assertThat("Callback.failed has been called twice", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true)); + Assert.assertEquals(2, writes.get()); + Assert.assertTrue(failedCalledLatch.await(5, TimeUnit.SECONDS)); } @Test diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java index bda7f0e55cf..8c5d26e5fa0 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java @@ -18,6 +18,39 @@ package org.eclipse.jetty.spdy; +import java.nio.channels.ClosedChannelException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jetty.spdy.api.DataInfo; +import org.eclipse.jetty.spdy.api.PushInfo; +import org.eclipse.jetty.spdy.api.ReplyInfo; +import org.eclipse.jetty.spdy.api.SPDY; +import org.eclipse.jetty.spdy.api.Stream; +import org.eclipse.jetty.spdy.api.StreamFrameListener; +import org.eclipse.jetty.spdy.api.StringDataInfo; +import org.eclipse.jetty.spdy.api.SynInfo; +import org.eclipse.jetty.spdy.frames.ControlFrame; +import org.eclipse.jetty.spdy.frames.SynStreamFrame; +import org.eclipse.jetty.toolchain.test.annotation.Slow; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Fields; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -29,33 +62,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.eclipse.jetty.spdy.api.DataInfo; -import org.eclipse.jetty.spdy.api.PushInfo; -import org.eclipse.jetty.spdy.api.SPDY; -import org.eclipse.jetty.spdy.api.Stream; -import org.eclipse.jetty.spdy.api.StreamFrameListener; -import org.eclipse.jetty.spdy.api.StringDataInfo; -import org.eclipse.jetty.spdy.api.SynInfo; -import org.eclipse.jetty.spdy.frames.SynStreamFrame; -import org.eclipse.jetty.toolchain.test.annotation.Slow; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.Fields; -import org.eclipse.jetty.util.Promise; -import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentMatcher; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; - @RunWith(MockitoJUnitRunner.class) public class StandardStreamTest { @@ -71,9 +77,12 @@ public class StandardStreamTest scheduler.start(); } - /** - * Test method for {@link Stream#push(org.eclipse.jetty.spdy.api.PushInfo)}. - */ + @After + public void tearDown() throws Exception + { + scheduler.stop(); + } + @SuppressWarnings("unchecked") @Test public void testSyn() @@ -144,46 +153,106 @@ public class StandardStreamTest @Test @Slow - public void testIdleTimeout() throws InterruptedException, ExecutionException, TimeoutException + public void testIdleTimeout() throws Exception { - final CountDownLatch onFailCalledLatch = new CountDownLatch(1); IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null); - stream.setIdleTimeout(500); + long idleTimeout = 500; + stream.setIdleTimeout(idleTimeout); + + final AtomicInteger failureCount = new AtomicInteger(); + final CountDownLatch failureLatch = new CountDownLatch(1); stream.setStreamFrameListener(new StreamFrameListener.Adapter() { @Override public void onFailure(Stream stream, Throwable x) { assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class))); - onFailCalledLatch.countDown(); + failureCount.incrementAndGet(); + failureLatch.countDown(); } }); stream.process(new StringDataInfo("string", false)); - Thread.sleep(1000); - assertThat("onFailure has been called", onFailCalledLatch.await(5, TimeUnit.SECONDS), is(true)); + + // Wait more than (2 * idleTimeout) to be sure to trigger a failureCount > 1 + Thread.sleep(3 * idleTimeout); + + assertThat("onFailure has been called", failureLatch.await(5, TimeUnit.SECONDS), is(true)); + Assert.assertEquals(1, failureCount.get()); } @Test @Slow - public void testIdleTimeoutIsInterruptedWhenReceiving() throws InterruptedException, ExecutionException, - TimeoutException + public void testIdleTimeoutIsInterruptedWhenReceiving() throws Exception { - final CountDownLatch onFailCalledLatch = new CountDownLatch(1); + final CountDownLatch failureLatch = new CountDownLatch(1); IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null); + long idleTimeout = 1000; + stream.setIdleTimeout(idleTimeout); stream.setStreamFrameListener(new StreamFrameListener.Adapter() { @Override public void onFailure(Stream stream, Throwable x) { assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class))); - onFailCalledLatch.countDown(); + failureLatch.countDown(); } }); + stream.process(new SynStreamFrame(SPDY.V3, (byte)0, 1, 0, (byte)0, (short)0, null)); stream.process(new StringDataInfo("string", false)); - Thread.sleep(500); + Thread.sleep(idleTimeout / 2); stream.process(new StringDataInfo("string", false)); - Thread.sleep(500); - assertThat("onFailure has been called", onFailCalledLatch.await(1, TimeUnit.SECONDS), is(false)); + Thread.sleep(idleTimeout / 2); + stream.process(new StringDataInfo("string", false)); + Thread.sleep(idleTimeout / 2); + stream.process(new StringDataInfo("string", true)); + stream.reply(new ReplyInfo(true), new Callback.Adapter()); + Thread.sleep(idleTimeout); + assertThat("onFailure has not been called", failureLatch.await(idleTimeout, TimeUnit.MILLISECONDS), is(false)); } + @Test + @Slow + public void testReplyFailureClosesStream() throws Exception + { + ISession session = new StandardSession(SPDY.V3, null, null, null, null, null, 1, null, null, null) + { + @Override + public void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback) + { + callback.failed(new ClosedChannelException()); + } + }; + IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null); + final AtomicInteger failureCount = new AtomicInteger(); + stream.setStreamFrameListener(new StreamFrameListener.Adapter() + { + @Override + public void onFailure(Stream stream, Throwable x) + { + failureCount.incrementAndGet(); + } + }); + long idleTimeout = 500; + stream.setIdleTimeout(idleTimeout); + + stream.process(new SynStreamFrame(SPDY.V3, (byte)0, 1, 0, (byte)0, (short)0, null)); + + final CountDownLatch failureLatch = new CountDownLatch(1); + stream.reply(new ReplyInfo(false), new Callback.Adapter() + { + @Override + public void failed(Throwable x) + { + failureLatch.countDown(); + } + }); + + Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + + // Make sure that the idle timeout never fires, since the failure above should have closed the stream + Thread.sleep(3 * idleTimeout); + + Assert.assertEquals(0, failureCount.get()); + Assert.assertTrue(stream.isClosed()); + } }