415656 SPDY - add IdleTimeout per Stream functionality

This commit is contained in:
Thomas Becker 2013-08-22 11:31:38 +02:00
parent c0ed8375d3
commit 525b268d41
18 changed files with 359 additions and 70 deletions

View File

@ -541,7 +541,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise)
{
IStream associatedStream = streams.get(frame.getAssociatedStreamId());
IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream, promise);
IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream,
scheduler, promise);
stream.setIdleTimeout(endPoint.getIdleTimeout());
flowControlStrategy.onNewStream(this, stream);
stream.updateCloseState(frame.isClose(), local);

View File

@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
@ -43,8 +44,9 @@ import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public class StandardStream implements IStream
public class StandardStream extends IdleTimeout implements IStream
{
private static final Logger LOG = Log.getLogger(Stream.class);
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
@ -60,8 +62,9 @@ public class StandardStream implements IStream
private volatile CloseState closeState = CloseState.OPENED;
private volatile boolean reset = false;
public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Promise<Stream> promise)
public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Scheduler scheduler, Promise<Stream> promise)
{
super(scheduler);
this.id = id;
this.priority = priority;
this.session = session;
@ -105,6 +108,18 @@ public class StandardStream implements IStream
return priority;
}
@Override
protected void onIdleExpired(TimeoutException timeout)
{
listener.onFailure(timeout);
}
@Override
public boolean isOpen()
{
return !isClosed();
}
@Override
public int getWindowSize()
{
@ -194,6 +209,7 @@ public class StandardStream implements IStream
@Override
public void process(ControlFrame frame)
{
notIdle();
switch (frame.getType())
{
case SYN_STREAM:
@ -234,6 +250,7 @@ public class StandardStream implements IStream
@Override
public void process(DataInfo dataInfo)
{
notIdle();
// TODO: in v3 we need to send a rst instead of just ignoring
// ignore data frame if this stream is remotelyClosed already
if (isRemotelyClosed())
@ -349,6 +366,7 @@ public class StandardStream implements IStream
@Override
public void push(PushInfo pushInfo, Promise<Stream> promise)
{
notIdle();
if (isClosed() || isReset())
{
promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED,
@ -373,6 +391,7 @@ public class StandardStream implements IStream
@Override
public void reply(ReplyInfo replyInfo, Callback callback)
{
notIdle();
if (isUnidirectional())
throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
openState = OpenState.REPLY_SENT;
@ -395,6 +414,7 @@ public class StandardStream implements IStream
@Override
public void data(DataInfo dataInfo, Callback callback)
{
notIdle();
if (!canSend())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
@ -425,6 +445,7 @@ public class StandardStream implements IStream
@Override
public void headers(HeadersInfo headersInfo, Callback callback)
{
notIdle();
if (!canSend())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());

View File

@ -225,4 +225,16 @@ public interface Stream
*/
public Set<Stream> getPushedStreams();
/**
* Get the idle timeout set for this particular stream
* @return the idle timeout
*/
public long getIdleTimeout();
/**
* Set an idle timeout for this stream
* @param timeout
*/
public void setIdleTimeout(long timeout);
}

View File

@ -69,6 +69,12 @@ public interface StreamFrameListener extends EventListener
*/
public void onData(Stream stream, DataInfo dataInfo);
/**
* <p>Callback invoked on errors.</p>
* @param x
*/
public void onFailure(Throwable x);
/**
* <p>Empty implementation of {@link StreamFrameListener}</p>
*/
@ -94,5 +100,10 @@ public interface StreamFrameListener extends EventListener
public void onData(Stream stream, DataInfo dataInfo)
{
}
@Override
public void onFailure(Throwable x)
{
}
}
}

View File

@ -24,7 +24,9 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.SPDYException;
@ -47,6 +49,8 @@ import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class AsyncTimeoutTest
{
EndPoint endPoint = new ByteArrayEndPoint();
@Slow
@Test
public void testAsyncTimeoutInControlFrames() throws Exception
@ -60,7 +64,7 @@ public class AsyncTimeoutTest
scheduler.start(); // TODO need to use jetty lifecycles better here
Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(),
null, null, 1, null, generator, new FlowControlStrategy.None())
endPoint, null, 1, null, generator, new FlowControlStrategy.None())
{
@Override
public void flush()
@ -103,7 +107,7 @@ public class AsyncTimeoutTest
scheduler.start();
Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(),
null, null, 1, null, generator, new FlowControlStrategy.None())
endPoint, null, 1, null, generator, new FlowControlStrategy.None())
{
@Override
protected void write(ByteBuffer buffer, Callback callback)

View File

@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
@ -75,6 +76,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class StandardSessionTest
@ -84,6 +86,10 @@ public class StandardSessionTest
@Mock
private Controller controller;
@Mock
private EndPoint endPoint;
private ExecutorService threadPool;
private StandardSession session;
private Scheduler scheduler;
@ -97,8 +103,9 @@ public class StandardSessionTest
threadPool = Executors.newCachedThreadPool();
scheduler = new TimerScheduler();
scheduler.start();
session = new StandardSession(VERSION, bufferPool, threadPool, scheduler, controller, null, null, 1, null,
session = new StandardSession(VERSION, bufferPool, threadPool, scheduler, controller, endPoint, null, 1, null,
generator, new FlowControlStrategy.None());
when(endPoint.getIdleTimeout()).thenReturn(30000L);
headers = new Fields();
}
@ -428,7 +435,7 @@ public class StandardSessionTest
final CountDownLatch failedCalledLatch = new CountDownLatch(2);
SynStreamFrame synStreamFrame = new SynStreamFrame(VERSION, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null, null);
stream.updateWindowSize(8192);
Callback.Adapter callback = new Callback.Adapter()
{
@ -502,7 +509,7 @@ public class StandardSessionTest
private void testHeaderFramesAreSentInOrder(final byte priority0, final byte priority1, final byte priority2) throws InterruptedException, ExecutionException
{
final StandardSession testLocalSession = new StandardSession(VERSION, bufferPool, threadPool, scheduler,
new ControllerMock(), null, null, 1, null, generator, new FlowControlStrategy.None());
new ControllerMock(), endPoint, null, 1, null, generator, new FlowControlStrategy.None());
HashSet<Future> tasks = new HashSet<>();
int numberOfTasksToRun = 128;

View File

@ -18,16 +18,6 @@
package org.eclipse.jetty.spdy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -43,23 +33,44 @@ import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class StandardStreamTest
{
private final ScheduledExecutorScheduler scheduler = new ScheduledExecutorScheduler();
@Mock
private ISession session;
@Mock
private SynStreamFrame synStreamFrame;
@Before
public void setUp() throws Exception
{
scheduler.start();
}
/**
* Test method for {@link Stream#push(org.eclipse.jetty.spdy.api.PushInfo)}.
*/
@ -67,7 +78,7 @@ public class StandardStreamTest
@Test
public void testSyn()
{
Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null, null);
Set<Stream> streams = new HashSet<>();
streams.add(stream);
when(synStreamFrame.isClose()).thenReturn(false);
@ -100,7 +111,8 @@ public class StandardStreamTest
@Test
public void testSynOnClosedStream()
{
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session,
null, null , null);
stream.updateCloseState(true, true);
stream.updateCloseState(true, false);
assertThat("stream expected to be closed", stream.isClosed(), is(true));
@ -121,11 +133,57 @@ public class StandardStreamTest
public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
{
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session,
null, scheduler, null);
stream.updateWindowSize(8192);
stream.updateCloseState(synStreamFrame.isClose(), true);
assertThat("stream is half closed", stream.isHalfClosed(), is(true));
stream.data(new StringDataInfo("data on half closed stream", true));
verify(session, never()).data(any(IStream.class), any(DataInfo.class), anyInt(), any(TimeUnit.class), any(Callback.class));
}
@Test
@Slow
public void testIdleTimeout() throws InterruptedException, ExecutionException, TimeoutException
{
final CountDownLatch onFailCalledLatch = new CountDownLatch(1);
IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null);
stream.setIdleTimeout(500);
stream.setStreamFrameListener(new StreamFrameListener.Adapter()
{
@Override
public void onFailure(Throwable x)
{
assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class)));
onFailCalledLatch.countDown();
}
});
stream.process(new StringDataInfo("string", false));
Thread.sleep(1000);
assertThat("onFailure has been called", onFailCalledLatch.await(5, TimeUnit.SECONDS), is(true));
}
@Test
@Slow
public void testIdleTimeoutIsInterruptedWhenReceiving() throws InterruptedException, ExecutionException,
TimeoutException
{
final CountDownLatch onFailCalledLatch = new CountDownLatch(1);
IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null);
stream.setStreamFrameListener(new StreamFrameListener.Adapter()
{
@Override
public void onFailure(Throwable x)
{
assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class)));
onFailCalledLatch.countDown();
}
});
stream.process(new StringDataInfo("string", false));
Thread.sleep(500);
stream.process(new StringDataInfo("string", false));
Thread.sleep(500);
assertThat("onFailure has been called", onFailCalledLatch.await(1, TimeUnit.SECONDS), is(false));
}
}

View File

@ -139,4 +139,13 @@ public class HttpReceiverOverSPDY extends HttpReceiver implements StreamFrameLis
responseFailure(x);
}
}
@Override
public void onFailure(Throwable x)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
exchange.getRequest().abort(x);
}
}

View File

@ -50,7 +50,7 @@ public class HttpSenderOverSPDY extends HttpSender
protected void sendHeaders(HttpExchange exchange, final HttpContent content, final Callback callback)
{
final Request request = exchange.getRequest();
final long idleTimeout = request.getIdleTimeout();
short spdyVersion = getHttpChannel().getSession().getVersion();
Fields fields = new Fields();
HttpField hostHeader = null;
@ -81,6 +81,7 @@ public class HttpSenderOverSPDY extends HttpSender
@Override
public void succeeded(Stream stream)
{
stream.setIdleTimeout(idleTimeout);
if (content.hasContent())
HttpSenderOverSPDY.this.stream = stream;
callback.succeeded();

View File

@ -42,7 +42,6 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.junit.Assert;
import org.junit.Test;
import org.junit.Ignore;
public class HttpClientTest extends AbstractHttpClientServerTest
{
@ -324,7 +323,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Slow
@Test
@Ignore
public void test_Request_IdleTimeout() throws Exception
{
final long idleTimeout = 1000;

View File

@ -39,7 +39,7 @@ import org.eclipse.jetty.util.log.Logger;
public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory implements HttpConfiguration.ConnectionFactory
{
private static final String CHANNEL_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.HTTPChannelOverSPDY";
private static final Logger logger = Log.getLogger(HTTPSPDYServerConnectionFactory.class);
private static final Logger LOG = Log.getLogger(HTTPSPDYServerConnectionFactory.class);
private final PushStrategy pushStrategy;
private final HttpConfiguration httpConfiguration;
@ -94,7 +94,7 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory
// can arrive on the same connection, so we need to create an
// HttpChannel for each SYN in order to run concurrently.
logger.debug("Received {} on {}", synInfo, stream);
LOG.debug("Received {} on {}", synInfo, stream);
Fields headers = synInfo.getHeaders();
// According to SPDY/3 spec section 3.2.1 user-agents MUST support gzip compression. Firefox omits the
@ -136,7 +136,7 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
logger.debug("Received {} on {}", headersInfo, stream);
LOG.debug("Received {} on {}", headersInfo, stream);
HttpChannelOverSPDY channel = (HttpChannelOverSPDY)stream.getAttribute(CHANNEL_ATTRIBUTE);
channel.requestHeaders(headersInfo.getHeaders(), headersInfo.isClose());
}
@ -150,9 +150,15 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory
@Override
public void onData(Stream stream, final DataInfo dataInfo)
{
logger.debug("Received {} on {}", dataInfo, stream);
LOG.debug("Received {} on {}", dataInfo, stream);
HttpChannelOverSPDY channel = (HttpChannelOverSPDY)stream.getAttribute(CHANNEL_ATTRIBUTE);
channel.requestContent(dataInfo, dataInfo.isClose());
}
@Override
public void onFailure(Throwable x)
{
LOG.debug(x);
}
}
}

View File

@ -213,7 +213,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
private HTTPStream(int id, byte priority, ISession session, IStream associatedStream)
{
super(id, priority, session, associatedStream, null);
super(id, priority, session, associatedStream, getHttpChannel().getScheduler(), null);
}
@Override
@ -318,7 +318,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
{
private HTTPPushStream(int id, byte priority, ISession session, IStream associatedStream)
{
super(id, priority, session, associatedStream, null);
super(id, priority, session, associatedStream, getHttpChannel().getScheduler(), null);
}
@Override

View File

@ -170,6 +170,12 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
streamPromise.data(serverDataInfo);
}
@Override
public void onFailure(Throwable x)
{
LOG.debug(x);
}
private Session produceSession(String host, short version, InetSocketAddress address)
{
try
@ -267,6 +273,12 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
pushStreamPromise.data(clientDataInfo);
}
@Override
public void onFailure(Throwable x)
{
LOG.debug(x);
}
}
private class ProxyStreamFrameListener extends StreamFrameListener.Adapter

View File

@ -75,17 +75,17 @@ public abstract class AbstractHTTPSPDYTest
protected InetSocketAddress startHTTPServer(Handler handler) throws Exception
{
return startHTTPServer(SPDY.V2, handler);
return startHTTPServer(SPDY.V2, handler, 30000);
}
protected InetSocketAddress startHTTPServer(short version, Handler handler) throws Exception
protected InetSocketAddress startHTTPServer(short version, Handler handler, long idleTimeout) throws Exception
{
QueuedThreadPool threadPool = new QueuedThreadPool(256);
threadPool.setName("serverQTP");
server = new Server(threadPool);
connector = newHTTPSPDYServerConnector(version);
connector.setPort(0);
connector.setIdleTimeout(30000);
connector.setIdleTimeout(idleTimeout);
server.addConnector(connector);
server.setHandler(handler);
server.start();

View File

@ -79,7 +79,7 @@ public class ConcurrentStreamsTest extends AbstractHTTPSPDYTest
throw new ServletException(x);
}
}
}), null);
}, 30000), null);
// Perform slow request. This will wait on server side until the fast request wakes it up
Fields headers = createHeaders(slowPath);

View File

@ -82,7 +82,7 @@ public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest
@Test
public void benchmarkPushStrategy() throws Exception
{
InetSocketAddress address = startHTTPServer(version, new PushStrategyBenchmarkHandler());
InetSocketAddress address = startHTTPServer(version, new PushStrategyBenchmarkHandler(), 30000);
// Plain HTTP
ConnectionFactory factory = new HttpConnectionFactory(new HttpConfiguration());

View File

@ -357,7 +357,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
outputStream.write(bytes);
baseRequest.setHandled(true);
}
});
}, 30000);
Session pushCacheBuildSession = startClient(version, bigResponseServerAddress, null);
Fields mainResourceHeaders = createHeadersWithoutReferrer(mainResource);
@ -443,7 +443,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
baseRequest.setHandled(true);
}
});
return startHTTPServer(version, gzipHandler);
return startHTTPServer(version, gzipHandler, 30000);
}
private Session sendMainRequestAndCSSRequest(SessionFrameListener sessionFrameListener, boolean awaitPush) throws Exception
@ -597,7 +597,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
output.print("body { background: #FFF; }");
baseRequest.setHandled(true);
}
});
}, 30000);
Session session1 = startClient(version, address, null);
final CountDownLatch mainResourceLatch = new CountDownLatch(1);
@ -688,7 +688,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
}
baseRequest.setHandled(true);
}
});
}, 30000);
Session session1 = startClient(version, address, null);
final CountDownLatch mainResourceLatch = new CountDownLatch(1);
@ -799,7 +799,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
output.print("\u0000");
baseRequest.setHandled(true);
}
});
}, 30000);
Session session1 = startClient(version, address, null);
final CountDownLatch mainResourceLatch = new CountDownLatch(1);
@ -919,7 +919,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
output.print("<html><head/><body>HELLO</body></html>");
baseRequest.setHandled(true);
}
});
}, 30000);
Session session1 = startClient(version, address, null);
final CountDownLatch mainResourceLatch = new CountDownLatch(1);
@ -1004,7 +1004,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
output.print("body { background: #FFF; }");
baseRequest.setHandled(true);
}
});
}, 30000);
Session session1 = startClient(version, address, null);
final CountDownLatch mainResourceLatch = new CountDownLatch(1);

View File

@ -26,7 +26,9 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
@ -50,6 +52,8 @@ import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StdErrLog;
import org.junit.Assert;
import org.junit.Test;
@ -57,6 +61,7 @@ import org.junit.Test;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@ -65,6 +70,8 @@ import static org.junit.Assert.fail;
public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
{
private static final Logger LOG = Log.getLogger(ServerHTTPSPDYTest.class);
public ServerHTTPSPDYTest(short version)
{
super(version);
@ -90,7 +97,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertThat(httpRequest.getHeader("host"), is("localhost:" + connector.getLocalPort()));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getLocalPort(), version, "GET", path);
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -131,7 +138,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertEquals(query, httpRequest.getQueryString());
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", uri);
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -174,7 +181,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertThat("requestUri is /foo", httpRequest.getRequestURI(), is(path));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", uri);
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -216,7 +223,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
httpResponse.getWriter().write("body that shouldn't be sent on a HEAD request");
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "HEAD", path);
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -256,7 +263,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
request.setHandled(true);
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path);
headers.put("content-type", "application/x-www-form-urlencoded");
@ -305,7 +312,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertNotNull(httpRequest.getServerName());
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path);
headers.put("content-type", "application/x-www-form-urlencoded");
@ -347,7 +354,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertEquals("2", httpRequest.getParameter("b"));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path);
headers.put("content-type", "application/x-www-form-urlencoded");
@ -392,7 +399,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertEquals("2", httpRequest.getParameter("b"));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path);
headers.put("content-type", "application/x-www-form-urlencoded");
@ -434,7 +441,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data.getBytes("UTF-8"));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -480,7 +487,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data);
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -531,7 +538,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data2.getBytes("UTF-8"));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -586,7 +593,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data);
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -642,7 +649,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
}
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -695,7 +702,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data);
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -748,7 +755,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.close();
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -806,7 +813,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data2.getBytes("UTF-8"));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -859,7 +866,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
httpResponse.sendRedirect(location);
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -896,7 +903,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
httpResponse.sendError(HttpServletResponse.SC_NOT_FOUND);
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -941,7 +948,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
{
throw new NullPointerException("thrown_explicitly_by_the_test");
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -995,7 +1002,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(pangram2.getBytes("UTF-8"));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1054,7 +1061,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
request.getResponse().getHttpOutput().sendContent(ByteBuffer.wrap(data));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1110,7 +1117,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data);
output.write(data);
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1174,7 +1181,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
}
}.start();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1217,7 +1224,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
}
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1262,7 +1269,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
}
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1330,7 +1337,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
}
}.start();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1398,7 +1405,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data);
}
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo");
final CountDownLatch responseLatch = new CountDownLatch(2);
@ -1439,7 +1446,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
request.setHandled(true);
latch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo");
final CountDownLatch responseLatch = new CountDownLatch(1);
@ -1460,4 +1467,145 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testIdleTimeout() throws Exception
{
final int idleTimeout = 500;
final CountDownLatch timeoutReceivedLatch = new CountDownLatch(1);
Session session = startClient(version, startHTTPServer(version, new AbstractHandler()
{
@Override
public void handle(String target, final Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
try
{
Thread.sleep(2 * idleTimeout);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
request.setHandled(true);
}
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/");
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, true, (byte)0),
new StreamFrameListener.Adapter()
{
@Override
public void onFailure(Throwable x)
{
assertThat("we got a TimeoutException", x, instanceOf(TimeoutException.class));
timeoutReceivedLatch.countDown();
}
});
stream.setIdleTimeout(idleTimeout);
assertThat("idle timeout hit", timeoutReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
}
@Test
public void testIdleTimeoutSetOnConnectionOnly() throws Exception
{
final int idleTimeout = 500;
final CountDownLatch timeoutReceivedLatch = new CountDownLatch(1);
Session session = startClient(version, startHTTPServer(version, new AbstractHandler()
{
@Override
public void handle(String target, final Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
try
{
Thread.sleep(2 * idleTimeout);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
request.setHandled(true);
}
}, idleTimeout), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/");
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, true, (byte)0),
new StreamFrameListener.Adapter()
{
@Override
public void onFailure(Throwable x)
{
assertThat("we got a TimeoutException", x, instanceOf(TimeoutException.class));
timeoutReceivedLatch.countDown();
}
});
assertThat("idle timeout hit", timeoutReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
}
@Test
public void testSingleStreamIdleTimeout() throws Exception
{
final int idleTimeout = 500;
final CountDownLatch timeoutReceivedLatch = new CountDownLatch(1);
final CountDownLatch replyReceivedLatch = new CountDownLatch(3);
Session session = startClient(version, startHTTPServer(version, new AbstractHandler()
{
@Override
public void handle(String target, final Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
if ("true".equals(request.getHeader("slow")))
{
try
{
Thread.sleep(2 * idleTimeout);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
request.setHandled(true);
}
}, idleTimeout), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/");
Fields slowHeaders = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/");
slowHeaders.add("slow", "true");
sendSingleRequestThatIsNotExpectedToTimeout(replyReceivedLatch, session, headers);
session.syn(new SynInfo(5, TimeUnit.SECONDS, slowHeaders, true, (byte)0),
new StreamFrameListener.Adapter()
{
@Override
public void onFailure(Throwable x)
{
assertThat("we got a TimeoutException", x, instanceOf(TimeoutException.class));
timeoutReceivedLatch.countDown();
}
});
Thread.sleep(idleTimeout / 2);
sendSingleRequestThatIsNotExpectedToTimeout(replyReceivedLatch, session, headers);
Thread.sleep(idleTimeout / 2);
sendSingleRequestThatIsNotExpectedToTimeout(replyReceivedLatch, session, headers);
assertThat("idle timeout hit", timeoutReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
assertThat("received replies on 3 non idle requests", replyReceivedLatch.await(5, TimeUnit.SECONDS),
is(true));
}
private void sendSingleRequestThatIsNotExpectedToTimeout(final CountDownLatch replyReceivedLatch, Session session, Fields headers) throws ExecutionException, InterruptedException, TimeoutException
{
session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, true, (byte)0),
new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyReceivedLatch.countDown();
}
});
}
}