From 525b268d41cc73cb76526a0353903032e98dbb9b Mon Sep 17 00:00:00 2001 From: Thomas Becker Date: Thu, 22 Aug 2013 11:31:38 +0200 Subject: [PATCH 1/2] 415656 SPDY - add IdleTimeout per Stream functionality --- .../eclipse/jetty/spdy/StandardSession.java | 4 +- .../eclipse/jetty/spdy/StandardStream.java | 25 ++- .../org/eclipse/jetty/spdy/api/Stream.java | 12 ++ .../jetty/spdy/api/StreamFrameListener.java | 11 + .../eclipse/jetty/spdy/AsyncTimeoutTest.java | 8 +- .../jetty/spdy/StandardSessionTest.java | 13 +- .../jetty/spdy/StandardStreamTest.java | 84 ++++++-- .../client/http/HttpReceiverOverSPDY.java | 9 + .../spdy/client/http/HttpSenderOverSPDY.java | 3 +- .../spdy/client/http/HttpClientTest.java | 2 - .../http/HTTPSPDYServerConnectionFactory.java | 14 +- .../server/proxy/ProxyHTTPSPDYConnection.java | 4 +- .../spdy/server/proxy/SPDYProxyEngine.java | 12 ++ .../server/http/AbstractHTTPSPDYTest.java | 6 +- .../server/http/ConcurrentStreamsTest.java | 2 +- .../http/PushStrategyBenchmarkTest.java | 2 +- .../server/http/ReferrerPushStrategyTest.java | 14 +- .../spdy/server/http/ServerHTTPSPDYTest.java | 204 +++++++++++++++--- 18 files changed, 359 insertions(+), 70 deletions(-) diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index e50f98e1abc..5d1d5b954f2 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -541,7 +541,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise promise) { IStream associatedStream = streams.get(frame.getAssociatedStreamId()); - IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream, promise); + IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream, + scheduler, promise); + stream.setIdleTimeout(endPoint.getIdleTimeout()); flowControlStrategy.onNewStream(this, stream); stream.updateCloseState(frame.isClose(), local); diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java index e0b65c7b793..71457904f39 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.eclipse.jetty.io.IdleTimeout; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.PushInfo; @@ -43,8 +44,9 @@ import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; -public class StandardStream implements IStream +public class StandardStream extends IdleTimeout implements IStream { private static final Logger LOG = Log.getLogger(Stream.class); private final Map attributes = new ConcurrentHashMap<>(); @@ -60,8 +62,9 @@ public class StandardStream implements IStream private volatile CloseState closeState = CloseState.OPENED; private volatile boolean reset = false; - public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Promise promise) + public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Scheduler scheduler, Promise promise) { + super(scheduler); this.id = id; this.priority = priority; this.session = session; @@ -105,6 +108,18 @@ public class StandardStream implements IStream return priority; } + @Override + protected void onIdleExpired(TimeoutException timeout) + { + listener.onFailure(timeout); + } + + @Override + public boolean isOpen() + { + return !isClosed(); + } + @Override public int getWindowSize() { @@ -194,6 +209,7 @@ public class StandardStream implements IStream @Override public void process(ControlFrame frame) { + notIdle(); switch (frame.getType()) { case SYN_STREAM: @@ -234,6 +250,7 @@ public class StandardStream implements IStream @Override public void process(DataInfo dataInfo) { + notIdle(); // TODO: in v3 we need to send a rst instead of just ignoring // ignore data frame if this stream is remotelyClosed already if (isRemotelyClosed()) @@ -349,6 +366,7 @@ public class StandardStream implements IStream @Override public void push(PushInfo pushInfo, Promise promise) { + notIdle(); if (isClosed() || isReset()) { promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED, @@ -373,6 +391,7 @@ public class StandardStream implements IStream @Override public void reply(ReplyInfo replyInfo, Callback callback) { + notIdle(); if (isUnidirectional()) throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams"); openState = OpenState.REPLY_SENT; @@ -395,6 +414,7 @@ public class StandardStream implements IStream @Override public void data(DataInfo dataInfo, Callback callback) { + notIdle(); if (!canSend()) { session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); @@ -425,6 +445,7 @@ public class StandardStream implements IStream @Override public void headers(HeadersInfo headersInfo, Callback callback) { + notIdle(); if (!canSend()) { session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java index abc1b888473..4b511314c99 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java @@ -225,4 +225,16 @@ public interface Stream */ public Set getPushedStreams(); + /** + * Get the idle timeout set for this particular stream + * @return the idle timeout + */ + public long getIdleTimeout(); + + /** + * Set an idle timeout for this stream + * @param timeout + */ + public void setIdleTimeout(long timeout); + } diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java index 0a4248a1f97..c8139e340be 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java @@ -69,6 +69,12 @@ public interface StreamFrameListener extends EventListener */ public void onData(Stream stream, DataInfo dataInfo); + /** + *

Callback invoked on errors.

+ * @param x + */ + public void onFailure(Throwable x); + /** *

Empty implementation of {@link StreamFrameListener}

*/ @@ -94,5 +100,10 @@ public interface StreamFrameListener extends EventListener public void onData(Stream stream, DataInfo dataInfo) { } + + @Override + public void onFailure(Throwable x) + { + } } } diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java index 1d198a802ae..dc8c650bf4c 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java @@ -24,7 +24,9 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.io.ByteArrayEndPoint; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.SPDYException; @@ -47,6 +49,8 @@ import org.junit.runner.RunWith; @RunWith(AdvancedRunner.class) public class AsyncTimeoutTest { + EndPoint endPoint = new ByteArrayEndPoint(); + @Slow @Test public void testAsyncTimeoutInControlFrames() throws Exception @@ -60,7 +64,7 @@ public class AsyncTimeoutTest scheduler.start(); // TODO need to use jetty lifecycles better here Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor()); Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), - null, null, 1, null, generator, new FlowControlStrategy.None()) + endPoint, null, 1, null, generator, new FlowControlStrategy.None()) { @Override public void flush() @@ -103,7 +107,7 @@ public class AsyncTimeoutTest scheduler.start(); Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor()); Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), - null, null, 1, null, generator, new FlowControlStrategy.None()) + endPoint, null, 1, null, generator, new FlowControlStrategy.None()) { @Override protected void write(ByteBuffer buffer, Callback callback) diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java index 813fb0ceb57..45e000aa274 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; @@ -75,6 +76,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class StandardSessionTest @@ -84,6 +86,10 @@ public class StandardSessionTest @Mock private Controller controller; + + @Mock + private EndPoint endPoint; + private ExecutorService threadPool; private StandardSession session; private Scheduler scheduler; @@ -97,8 +103,9 @@ public class StandardSessionTest threadPool = Executors.newCachedThreadPool(); scheduler = new TimerScheduler(); scheduler.start(); - session = new StandardSession(VERSION, bufferPool, threadPool, scheduler, controller, null, null, 1, null, + session = new StandardSession(VERSION, bufferPool, threadPool, scheduler, controller, endPoint, null, 1, null, generator, new FlowControlStrategy.None()); + when(endPoint.getIdleTimeout()).thenReturn(30000L); headers = new Fields(); } @@ -428,7 +435,7 @@ public class StandardSessionTest final CountDownLatch failedCalledLatch = new CountDownLatch(2); SynStreamFrame synStreamFrame = new SynStreamFrame(VERSION, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null); - IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null); + IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null, null); stream.updateWindowSize(8192); Callback.Adapter callback = new Callback.Adapter() { @@ -502,7 +509,7 @@ public class StandardSessionTest private void testHeaderFramesAreSentInOrder(final byte priority0, final byte priority1, final byte priority2) throws InterruptedException, ExecutionException { final StandardSession testLocalSession = new StandardSession(VERSION, bufferPool, threadPool, scheduler, - new ControllerMock(), null, null, 1, null, generator, new FlowControlStrategy.None()); + new ControllerMock(), endPoint, null, 1, null, generator, new FlowControlStrategy.None()); HashSet tasks = new HashSet<>(); int numberOfTasksToRun = 128; diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java index 57bdd988054..aadb2ddf53c 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java @@ -18,16 +18,6 @@ package org.eclipse.jetty.spdy; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -43,23 +33,44 @@ import org.eclipse.jetty.spdy.api.StreamFrameListener; import org.eclipse.jetty.spdy.api.StringDataInfo; import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.frames.SynStreamFrame; +import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Fields; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @RunWith(MockitoJUnitRunner.class) public class StandardStreamTest { + private final ScheduledExecutorScheduler scheduler = new ScheduledExecutorScheduler(); @Mock private ISession session; @Mock private SynStreamFrame synStreamFrame; + @Before + public void setUp() throws Exception + { + scheduler.start(); + } + /** * Test method for {@link Stream#push(org.eclipse.jetty.spdy.api.PushInfo)}. */ @@ -67,7 +78,7 @@ public class StandardStreamTest @Test public void testSyn() { - Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null); + Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null, null); Set streams = new HashSet<>(); streams.add(stream); when(synStreamFrame.isClose()).thenReturn(false); @@ -100,7 +111,8 @@ public class StandardStreamTest @Test public void testSynOnClosedStream() { - IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null); + IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, + null, null , null); stream.updateCloseState(true, true); stream.updateCloseState(true, false); assertThat("stream expected to be closed", stream.isClosed(), is(true)); @@ -121,11 +133,57 @@ public class StandardStreamTest public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException { SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null); - IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null); + IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, + null, scheduler, null); stream.updateWindowSize(8192); stream.updateCloseState(synStreamFrame.isClose(), true); assertThat("stream is half closed", stream.isHalfClosed(), is(true)); stream.data(new StringDataInfo("data on half closed stream", true)); verify(session, never()).data(any(IStream.class), any(DataInfo.class), anyInt(), any(TimeUnit.class), any(Callback.class)); } + + @Test + @Slow + public void testIdleTimeout() throws InterruptedException, ExecutionException, TimeoutException + { + final CountDownLatch onFailCalledLatch = new CountDownLatch(1); + IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null); + stream.setIdleTimeout(500); + stream.setStreamFrameListener(new StreamFrameListener.Adapter() + { + @Override + public void onFailure(Throwable x) + { + assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class))); + onFailCalledLatch.countDown(); + } + }); + stream.process(new StringDataInfo("string", false)); + Thread.sleep(1000); + assertThat("onFailure has been called", onFailCalledLatch.await(5, TimeUnit.SECONDS), is(true)); + } + + @Test + @Slow + public void testIdleTimeoutIsInterruptedWhenReceiving() throws InterruptedException, ExecutionException, + TimeoutException + { + final CountDownLatch onFailCalledLatch = new CountDownLatch(1); + IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null); + stream.setStreamFrameListener(new StreamFrameListener.Adapter() + { + @Override + public void onFailure(Throwable x) + { + assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class))); + onFailCalledLatch.countDown(); + } + }); + stream.process(new StringDataInfo("string", false)); + Thread.sleep(500); + stream.process(new StringDataInfo("string", false)); + Thread.sleep(500); + assertThat("onFailure has been called", onFailCalledLatch.await(1, TimeUnit.SECONDS), is(false)); + } + } diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpReceiverOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpReceiverOverSPDY.java index a870b5e7d7c..e693c9a8db7 100644 --- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpReceiverOverSPDY.java +++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpReceiverOverSPDY.java @@ -139,4 +139,13 @@ public class HttpReceiverOverSPDY extends HttpReceiver implements StreamFrameLis responseFailure(x); } } + + @Override + public void onFailure(Throwable x) + { + HttpExchange exchange = getHttpExchange(); + if (exchange == null) + return; + exchange.getRequest().abort(x); + } } diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpSenderOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpSenderOverSPDY.java index 3a48371c2b5..9dcf17f8f12 100644 --- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpSenderOverSPDY.java +++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpSenderOverSPDY.java @@ -50,7 +50,7 @@ public class HttpSenderOverSPDY extends HttpSender protected void sendHeaders(HttpExchange exchange, final HttpContent content, final Callback callback) { final Request request = exchange.getRequest(); - + final long idleTimeout = request.getIdleTimeout(); short spdyVersion = getHttpChannel().getSession().getVersion(); Fields fields = new Fields(); HttpField hostHeader = null; @@ -81,6 +81,7 @@ public class HttpSenderOverSPDY extends HttpSender @Override public void succeeded(Stream stream) { + stream.setIdleTimeout(idleTimeout); if (content.hasContent()) HttpSenderOverSPDY.this.stream = stream; callback.succeeded(); diff --git a/jetty-spdy/spdy-http-client-transport/src/test/java/org/eclipse/jetty/spdy/client/http/HttpClientTest.java b/jetty-spdy/spdy-http-client-transport/src/test/java/org/eclipse/jetty/spdy/client/http/HttpClientTest.java index c8c1e3b9dc2..76e29eeab0b 100644 --- a/jetty-spdy/spdy-http-client-transport/src/test/java/org/eclipse/jetty/spdy/client/http/HttpClientTest.java +++ b/jetty-spdy/spdy-http-client-transport/src/test/java/org/eclipse/jetty/spdy/client/http/HttpClientTest.java @@ -42,7 +42,6 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.junit.Assert; import org.junit.Test; -import org.junit.Ignore; public class HttpClientTest extends AbstractHttpClientServerTest { @@ -324,7 +323,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest @Slow @Test - @Ignore public void test_Request_IdleTimeout() throws Exception { final long idleTimeout = 1000; diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java index 45d867a28f5..40f405d4a28 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/http/HTTPSPDYServerConnectionFactory.java @@ -39,7 +39,7 @@ import org.eclipse.jetty.util.log.Logger; public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory implements HttpConfiguration.ConnectionFactory { private static final String CHANNEL_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.HTTPChannelOverSPDY"; - private static final Logger logger = Log.getLogger(HTTPSPDYServerConnectionFactory.class); + private static final Logger LOG = Log.getLogger(HTTPSPDYServerConnectionFactory.class); private final PushStrategy pushStrategy; private final HttpConfiguration httpConfiguration; @@ -94,7 +94,7 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory // can arrive on the same connection, so we need to create an // HttpChannel for each SYN in order to run concurrently. - logger.debug("Received {} on {}", synInfo, stream); + LOG.debug("Received {} on {}", synInfo, stream); Fields headers = synInfo.getHeaders(); // According to SPDY/3 spec section 3.2.1 user-agents MUST support gzip compression. Firefox omits the @@ -136,7 +136,7 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory @Override public void onHeaders(Stream stream, HeadersInfo headersInfo) { - logger.debug("Received {} on {}", headersInfo, stream); + LOG.debug("Received {} on {}", headersInfo, stream); HttpChannelOverSPDY channel = (HttpChannelOverSPDY)stream.getAttribute(CHANNEL_ATTRIBUTE); channel.requestHeaders(headersInfo.getHeaders(), headersInfo.isClose()); } @@ -150,9 +150,15 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory @Override public void onData(Stream stream, final DataInfo dataInfo) { - logger.debug("Received {} on {}", dataInfo, stream); + LOG.debug("Received {} on {}", dataInfo, stream); HttpChannelOverSPDY channel = (HttpChannelOverSPDY)stream.getAttribute(CHANNEL_ATTRIBUTE); channel.requestContent(dataInfo, dataInfo.isClose()); } + + @Override + public void onFailure(Throwable x) + { + LOG.debug(x); + } } } diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java index fda3dcb58c3..8b0d92a61a5 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java @@ -213,7 +213,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse private HTTPStream(int id, byte priority, ISession session, IStream associatedStream) { - super(id, priority, session, associatedStream, null); + super(id, priority, session, associatedStream, getHttpChannel().getScheduler(), null); } @Override @@ -318,7 +318,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse { private HTTPPushStream(int id, byte priority, ISession session, IStream associatedStream) { - super(id, priority, session, associatedStream, null); + super(id, priority, session, associatedStream, getHttpChannel().getScheduler(), null); } @Override diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java index 2f1401f0bfb..fce2585348c 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java @@ -170,6 +170,12 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener streamPromise.data(serverDataInfo); } + @Override + public void onFailure(Throwable x) + { + LOG.debug(x); + } + private Session produceSession(String host, short version, InetSocketAddress address) { try @@ -267,6 +273,12 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener pushStreamPromise.data(clientDataInfo); } + + @Override + public void onFailure(Throwable x) + { + LOG.debug(x); + } } private class ProxyStreamFrameListener extends StreamFrameListener.Adapter diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/AbstractHTTPSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/AbstractHTTPSPDYTest.java index 66b140ae1b4..fe532865ece 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/AbstractHTTPSPDYTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/AbstractHTTPSPDYTest.java @@ -75,17 +75,17 @@ public abstract class AbstractHTTPSPDYTest protected InetSocketAddress startHTTPServer(Handler handler) throws Exception { - return startHTTPServer(SPDY.V2, handler); + return startHTTPServer(SPDY.V2, handler, 30000); } - protected InetSocketAddress startHTTPServer(short version, Handler handler) throws Exception + protected InetSocketAddress startHTTPServer(short version, Handler handler, long idleTimeout) throws Exception { QueuedThreadPool threadPool = new QueuedThreadPool(256); threadPool.setName("serverQTP"); server = new Server(threadPool); connector = newHTTPSPDYServerConnector(version); connector.setPort(0); - connector.setIdleTimeout(30000); + connector.setIdleTimeout(idleTimeout); server.addConnector(connector); server.setHandler(handler); server.start(); diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ConcurrentStreamsTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ConcurrentStreamsTest.java index 1f35b35117f..99e9e5fcb8b 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ConcurrentStreamsTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ConcurrentStreamsTest.java @@ -79,7 +79,7 @@ public class ConcurrentStreamsTest extends AbstractHTTPSPDYTest throw new ServletException(x); } } - }), null); + }, 30000), null); // Perform slow request. This will wait on server side until the fast request wakes it up Fields headers = createHeaders(slowPath); diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/PushStrategyBenchmarkTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/PushStrategyBenchmarkTest.java index 25b5b871e65..004b05f81f2 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/PushStrategyBenchmarkTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/PushStrategyBenchmarkTest.java @@ -82,7 +82,7 @@ public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest @Test public void benchmarkPushStrategy() throws Exception { - InetSocketAddress address = startHTTPServer(version, new PushStrategyBenchmarkHandler()); + InetSocketAddress address = startHTTPServer(version, new PushStrategyBenchmarkHandler(), 30000); // Plain HTTP ConnectionFactory factory = new HttpConnectionFactory(new HttpConfiguration()); diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java index 362027c6a7a..7cdeb25d19d 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java @@ -357,7 +357,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest outputStream.write(bytes); baseRequest.setHandled(true); } - }); + }, 30000); Session pushCacheBuildSession = startClient(version, bigResponseServerAddress, null); Fields mainResourceHeaders = createHeadersWithoutReferrer(mainResource); @@ -443,7 +443,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest baseRequest.setHandled(true); } }); - return startHTTPServer(version, gzipHandler); + return startHTTPServer(version, gzipHandler, 30000); } private Session sendMainRequestAndCSSRequest(SessionFrameListener sessionFrameListener, boolean awaitPush) throws Exception @@ -597,7 +597,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest output.print("body { background: #FFF; }"); baseRequest.setHandled(true); } - }); + }, 30000); Session session1 = startClient(version, address, null); final CountDownLatch mainResourceLatch = new CountDownLatch(1); @@ -688,7 +688,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest } baseRequest.setHandled(true); } - }); + }, 30000); Session session1 = startClient(version, address, null); final CountDownLatch mainResourceLatch = new CountDownLatch(1); @@ -799,7 +799,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest output.print("\u0000"); baseRequest.setHandled(true); } - }); + }, 30000); Session session1 = startClient(version, address, null); final CountDownLatch mainResourceLatch = new CountDownLatch(1); @@ -919,7 +919,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest output.print("HELLO"); baseRequest.setHandled(true); } - }); + }, 30000); Session session1 = startClient(version, address, null); final CountDownLatch mainResourceLatch = new CountDownLatch(1); @@ -1004,7 +1004,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest output.print("body { background: #FFF; }"); baseRequest.setHandled(true); } - }); + }, 30000); Session session1 = startClient(version, address, null); final CountDownLatch mainResourceLatch = new CountDownLatch(1); diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ServerHTTPSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ServerHTTPSPDYTest.java index 791af667804..ac524194891 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ServerHTTPSPDYTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ServerHTTPSPDYTest.java @@ -26,7 +26,9 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; @@ -50,6 +52,8 @@ import org.eclipse.jetty.spdy.api.StringDataInfo; import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.http.HTTPSPDYHeader; import org.eclipse.jetty.util.Fields; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.StdErrLog; import org.junit.Assert; import org.junit.Test; @@ -57,6 +61,7 @@ import org.junit.Test; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; @@ -65,6 +70,8 @@ import static org.junit.Assert.fail; public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest { + private static final Logger LOG = Log.getLogger(ServerHTTPSPDYTest.class); + public ServerHTTPSPDYTest(short version) { super(version); @@ -90,7 +97,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest assertThat(httpRequest.getHeader("host"), is("localhost:" + connector.getLocalPort())); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getLocalPort(), version, "GET", path); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -131,7 +138,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest assertEquals(query, httpRequest.getQueryString()); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", uri); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -174,7 +181,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest assertThat("requestUri is /foo", httpRequest.getRequestURI(), is(path)); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", uri); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -216,7 +223,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest httpResponse.getWriter().write("body that shouldn't be sent on a HEAD request"); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "HEAD", path); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -256,7 +263,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest request.setHandled(true); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path); headers.put("content-type", "application/x-www-form-urlencoded"); @@ -305,7 +312,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest assertNotNull(httpRequest.getServerName()); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path); headers.put("content-type", "application/x-www-form-urlencoded"); @@ -347,7 +354,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest assertEquals("2", httpRequest.getParameter("b")); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path); headers.put("content-type", "application/x-www-form-urlencoded"); @@ -392,7 +399,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest assertEquals("2", httpRequest.getParameter("b")); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path); headers.put("content-type", "application/x-www-form-urlencoded"); @@ -434,7 +441,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest output.write(data.getBytes("UTF-8")); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -480,7 +487,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest output.write(data); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -531,7 +538,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest output.write(data2.getBytes("UTF-8")); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -586,7 +593,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest output.write(data); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -642,7 +649,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest } handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -695,7 +702,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest output.write(data); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -748,7 +755,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest output.close(); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -806,7 +813,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest output.write(data2.getBytes("UTF-8")); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -859,7 +866,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest httpResponse.sendRedirect(location); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -896,7 +903,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest httpResponse.sendError(HttpServletResponse.SC_NOT_FOUND); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -941,7 +948,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest { throw new NullPointerException("thrown_explicitly_by_the_test"); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -995,7 +1002,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest output.write(pangram2.getBytes("UTF-8")); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -1054,7 +1061,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest request.getResponse().getHttpOutput().sendContent(ByteBuffer.wrap(data)); handlerLatch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -1110,7 +1117,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest output.write(data); output.write(data); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -1174,7 +1181,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest } }.start(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -1217,7 +1224,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest } } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -1262,7 +1269,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest } } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -1330,7 +1337,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest } }.start(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo"); final CountDownLatch replyLatch = new CountDownLatch(1); @@ -1398,7 +1405,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest output.write(data); } } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo"); final CountDownLatch responseLatch = new CountDownLatch(2); @@ -1439,7 +1446,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest request.setHandled(true); latch.countDown(); } - }), null); + }, 30000), null); Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo"); final CountDownLatch responseLatch = new CountDownLatch(1); @@ -1460,4 +1467,145 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); } + @Test + public void testIdleTimeout() throws Exception + { + final int idleTimeout = 500; + final CountDownLatch timeoutReceivedLatch = new CountDownLatch(1); + + Session session = startClient(version, startHTTPServer(version, new AbstractHandler() + { + @Override + public void handle(String target, final Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) + throws IOException, ServletException + { + try + { + Thread.sleep(2 * idleTimeout); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + request.setHandled(true); + } + }, 30000), null); + + Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/"); + Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, true, (byte)0), + new StreamFrameListener.Adapter() + { + @Override + public void onFailure(Throwable x) + { + assertThat("we got a TimeoutException", x, instanceOf(TimeoutException.class)); + timeoutReceivedLatch.countDown(); + } + }); + stream.setIdleTimeout(idleTimeout); + + assertThat("idle timeout hit", timeoutReceivedLatch.await(5, TimeUnit.SECONDS), is(true)); + } + + @Test + public void testIdleTimeoutSetOnConnectionOnly() throws Exception + { + final int idleTimeout = 500; + final CountDownLatch timeoutReceivedLatch = new CountDownLatch(1); + Session session = startClient(version, startHTTPServer(version, new AbstractHandler() + { + @Override + public void handle(String target, final Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) + throws IOException, ServletException + { + try + { + Thread.sleep(2 * idleTimeout); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + request.setHandled(true); + } + }, idleTimeout), null); + + Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/"); + Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, true, (byte)0), + new StreamFrameListener.Adapter() + { + @Override + public void onFailure(Throwable x) + { + assertThat("we got a TimeoutException", x, instanceOf(TimeoutException.class)); + timeoutReceivedLatch.countDown(); + } + }); + + assertThat("idle timeout hit", timeoutReceivedLatch.await(5, TimeUnit.SECONDS), is(true)); + } + + @Test + public void testSingleStreamIdleTimeout() throws Exception + { + final int idleTimeout = 500; + final CountDownLatch timeoutReceivedLatch = new CountDownLatch(1); + final CountDownLatch replyReceivedLatch = new CountDownLatch(3); + Session session = startClient(version, startHTTPServer(version, new AbstractHandler() + { + @Override + public void handle(String target, final Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) + throws IOException, ServletException + { + if ("true".equals(request.getHeader("slow"))) + { + try + { + Thread.sleep(2 * idleTimeout); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + request.setHandled(true); + } + }, idleTimeout), null); + + Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/"); + Fields slowHeaders = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/"); + slowHeaders.add("slow", "true"); + sendSingleRequestThatIsNotExpectedToTimeout(replyReceivedLatch, session, headers); + session.syn(new SynInfo(5, TimeUnit.SECONDS, slowHeaders, true, (byte)0), + new StreamFrameListener.Adapter() + { + @Override + public void onFailure(Throwable x) + { + assertThat("we got a TimeoutException", x, instanceOf(TimeoutException.class)); + timeoutReceivedLatch.countDown(); + } + }); + Thread.sleep(idleTimeout / 2); + sendSingleRequestThatIsNotExpectedToTimeout(replyReceivedLatch, session, headers); + Thread.sleep(idleTimeout / 2); + sendSingleRequestThatIsNotExpectedToTimeout(replyReceivedLatch, session, headers); + assertThat("idle timeout hit", timeoutReceivedLatch.await(5, TimeUnit.SECONDS), is(true)); + assertThat("received replies on 3 non idle requests", replyReceivedLatch.await(5, TimeUnit.SECONDS), + is(true)); + } + + private void sendSingleRequestThatIsNotExpectedToTimeout(final CountDownLatch replyReceivedLatch, Session session, Fields headers) throws ExecutionException, InterruptedException, TimeoutException + { + session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, true, (byte)0), + new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + replyReceivedLatch.countDown(); + } + }); + } + } From 465f1516692720c5e4c636693fe92ee0d482e260 Mon Sep 17 00:00:00 2001 From: Thomas Becker Date: Thu, 22 Aug 2013 14:00:12 +0200 Subject: [PATCH 2/2] 415641 Remove remaining calls to deprecated HttpTranspoert.send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException --- .../jetty/spdy/server/http/HttpTransportOverSPDYTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDYTest.java index 451e303ee82..cb603eeef4d 100644 --- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDYTest.java +++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/HttpTransportOverSPDYTest.java @@ -262,7 +262,7 @@ public class HttpTransportOverSPDYTest verify(stream, times(1)).reply(replyInfoCaptor.capture(), any(Callback.class)); assertThat("ReplyInfo close is false", replyInfoCaptor.getValue().isClose(), is(false)); - httpTransportOverSPDY.send(HttpGenerator.RESPONSE_500_INFO, null,true); + httpTransportOverSPDY.send(HttpGenerator.RESPONSE_500_INFO, null,true, new Callback.Adapter()); verify(stream, times(1)).data(any(DataInfo.class), any(Callback.class));