Implemented idle timeout.

When the idle timeout expires, SPDY will send a GO_AWAY and close the connection.
This commit is contained in:
Simone Bordet 2012-02-09 18:50:27 +01:00
parent 5b4eec8a3d
commit 7b30b760a3
4 changed files with 306 additions and 27 deletions

View File

@ -23,14 +23,15 @@ import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection; import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.DirectNIOBuffer; import org.eclipse.jetty.io.nio.DirectNIOBuffer;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer; import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.nio.NIOBuffer; import org.eclipse.jetty.io.nio.NIOBuffer;
import org.eclipse.jetty.spdy.ISession; import org.eclipse.jetty.spdy.ISession;
import org.eclipse.jetty.spdy.ISession.Controller; import org.eclipse.jetty.spdy.ISession.Controller;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.SPDYException; import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.parser.Parser; import org.eclipse.jetty.spdy.parser.Parser;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -39,38 +40,48 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
{ {
private static final Logger logger = LoggerFactory.getLogger(SPDYAsyncConnection.class); private static final Logger logger = LoggerFactory.getLogger(SPDYAsyncConnection.class);
private final Parser parser; private final Parser parser;
private volatile Session session;
private ByteBuffer readBuffer; private ByteBuffer readBuffer;
private ByteBuffer writeBuffer; private ByteBuffer writeBuffer;
private Handler writeHandler; private Handler writeHandler;
private volatile boolean writePending; private volatile boolean writePending;
public SPDYAsyncConnection(EndPoint endp, Parser parser) public SPDYAsyncConnection(AsyncEndPoint endPoint, Parser parser)
{ {
super(endp); super(endPoint);
this.parser = parser; this.parser = parser;
endPoint.setCheckForIdle(true);
} }
@Override @Override
public Connection handle() throws IOException public Connection handle() throws IOException
{ {
AsyncEndPoint endPoint = getEndPoint(); AsyncEndPoint endPoint = getEndPoint();
boolean progress = true; try
while (endPoint.isOpen() && progress)
{ {
int filled = fill(); endPoint.setCheckForIdle(false);
progress = filled > 0; boolean progress = true;
while (endPoint.isOpen() && progress)
{
int filled = fill();
progress = filled > 0;
int flushed = flush(); int flushed = flush();
progress |= flushed > 0; progress |= flushed > 0;
endPoint.flush(); endPoint.flush();
progress |= endPoint.hasProgressed(); progress |= endPoint.hasProgressed();
if (!progress && filled < 0) if (!progress && filled < 0)
close(false); close(false);
}
return this;
}
finally
{
endPoint.setCheckForIdle(true);
} }
return this;
} }
public int fill() throws IOException public int fill() throws IOException
@ -215,4 +226,20 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
{ {
// TODO // TODO
} }
@Override
public void onIdleExpired(long idleForMs)
{
session.goAway(SPDY.V2);
}
protected Session getSession()
{
return session;
}
protected void setSession(Session session)
{
this.session = session;
}
} }

View File

@ -428,6 +428,7 @@ public class SPDYClient
StandardSession session = new StandardSession(connection, 1, sessionFuture.listener, generator); StandardSession session = new StandardSession(connection, 1, sessionFuture.listener, generator);
parser.addListener(session); parser.addListener(session);
sessionFuture.connected(session); sessionFuture.connected(session);
connection.setSession(session);
return connection; return connection;
} }

View File

@ -21,12 +21,10 @@ import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection; import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.spdy.CompressionFactory; import org.eclipse.jetty.spdy.CompressionFactory;
import org.eclipse.jetty.spdy.StandardCompressionFactory; import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.StandardSession; import org.eclipse.jetty.spdy.StandardSession;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.generator.Generator; import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser; import org.eclipse.jetty.spdy.parser.Parser;
@ -74,9 +72,9 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
private static class ServerSPDYAsyncConnection extends SPDYAsyncConnection private static class ServerSPDYAsyncConnection extends SPDYAsyncConnection
{ {
private final ServerSessionFrameListener listener; private final ServerSessionFrameListener listener;
private volatile Session session; private volatile boolean connected;
private ServerSPDYAsyncConnection(EndPoint endPoint, Parser parser, ServerSessionFrameListener listener) private ServerSPDYAsyncConnection(AsyncEndPoint endPoint, Parser parser, ServerSessionFrameListener listener)
{ {
super(endPoint, parser); super(endPoint, parser);
this.listener = listener; this.listener = listener;
@ -85,20 +83,14 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
@Override @Override
public Connection handle() throws IOException public Connection handle() throws IOException
{ {
final Session session = this.session; if (!connected)
if (session != null)
{ {
// NPE guard to support tests // NPE guard to support tests
if (listener != null) if (listener != null)
listener.onConnect(session); listener.onConnect(getSession());
this.session = null; connected = true;
} }
return super.handle(); return super.handle();
} }
private void setSession(Session session)
{
this.session = session;
}
} }
} }

View File

@ -0,0 +1,259 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.nio;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.AbstractTest;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.Assert;
import org.junit.Test;
public class IdleTimeoutTest extends AbstractTest
{
@Test
public void testServerEnforcingIdleTimeout() throws Exception
{
server = new Server();
connector = newSPDYServerConnector(new ServerSessionFrameListener.Adapter()
{
@Override
public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(true));
return null;
}
});
server.addConnector(connector);
int maxIdleTime = 1000;
connector.setMaxIdleTime(maxIdleTime);
server.start();
final CountDownLatch latch = new CountDownLatch(1);
Session session = startClient(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.FrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
latch.countDown();
}
});
session.syn(SPDY.V2, new SynInfo(true), null);
Assert.assertTrue(latch.await(2 * maxIdleTime, TimeUnit.MILLISECONDS));
}
@Test
public void testServerEnforcingIdleTimeoutWithUnrespondedStream() throws Exception
{
server = new Server();
connector = newSPDYServerConnector(null);
server.addConnector(connector);
int maxIdleTime = 1000;
connector.setMaxIdleTime(maxIdleTime);
server.start();
final CountDownLatch latch = new CountDownLatch(1);
Session session = startClient(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.FrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
latch.countDown();
}
});
// The SYN is not replied, and the server should idle timeout
session.syn(SPDY.V2, new SynInfo(true), null);
Assert.assertTrue(latch.await(2 * maxIdleTime, TimeUnit.MILLISECONDS));
}
@Test
public void testServerNotEnforcingIdleTimeoutWithPendingStream() throws Exception
{
final int maxIdleTime = 1000;
server = new Server();
connector = newSPDYServerConnector(new ServerSessionFrameListener.Adapter()
{
@Override
public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
{
try
{
Thread.sleep(2 * maxIdleTime);
stream.reply(new ReplyInfo(true));
return null;
}
catch (InterruptedException x)
{
Assert.fail();
return null;
}
}
});
server.addConnector(connector);
connector.setMaxIdleTime(maxIdleTime);
server.start();
final CountDownLatch latch = new CountDownLatch(1);
Session session = startClient(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.FrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
latch.countDown();
}
});
final CountDownLatch replyLatch = new CountDownLatch(1);
session.syn(SPDY.V2, new SynInfo(true), new Stream.FrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyLatch.countDown();
}
});
Assert.assertTrue(replyLatch.await(3 * maxIdleTime, TimeUnit.MILLISECONDS));
Assert.assertFalse(latch.await(1000, TimeUnit.MILLISECONDS));
}
@Test
public void testClientEnforcingIdleTimeout() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
InetSocketAddress address = startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(true));
return null;
}
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
latch.countDown();
}
});
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName(threadPool.getName() + "-client");
clientFactory = newSPDYClientFactory(threadPool);
clientFactory.start();
SPDYClient client = clientFactory.newSPDYClient();
long maxIdleTime = 1000;
client.setMaxIdleTime(maxIdleTime);
Session session = client.connect(address, null).get();
session.syn(SPDY.V2, new SynInfo(true), null);
Assert.assertTrue(latch.await(2 * maxIdleTime, TimeUnit.MILLISECONDS));
}
@Test
public void testClientEnforcingIdleTimeoutWithUnrespondedStream() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
InetSocketAddress address = startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
latch.countDown();
}
});
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName(threadPool.getName() + "-client");
clientFactory = newSPDYClientFactory(threadPool);
clientFactory.start();
SPDYClient client = clientFactory.newSPDYClient();
long maxIdleTime = 1000;
client.setMaxIdleTime(maxIdleTime);
Session session = client.connect(address, null).get();
session.syn(SPDY.V2, new SynInfo(true), null);
Assert.assertTrue(latch.await(2 * maxIdleTime, TimeUnit.MILLISECONDS));
}
@Test
public void testClientNotEnforcingIdleTimeoutWithPendingStream() throws Exception
{
final long maxIdleTime = 1000;
final CountDownLatch latch = new CountDownLatch(1);
InetSocketAddress address = startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(true));
return null;
}
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
latch.countDown();
}
});
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName(threadPool.getName() + "-client");
clientFactory = newSPDYClientFactory(threadPool);
clientFactory.start();
SPDYClient client = clientFactory.newSPDYClient();
client.setMaxIdleTime(maxIdleTime);
Session session = client.connect(address, null).get();
final CountDownLatch replyLatch = new CountDownLatch(1);
session.syn(SPDY.V2, new SynInfo(true), new Stream.FrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
try
{
Thread.sleep(2 * maxIdleTime);
replyLatch.countDown();
}
catch (InterruptedException e)
{
Assert.fail();
}
}
});
Assert.assertFalse(latch.await(2 * maxIdleTime, TimeUnit.MILLISECONDS));
Assert.assertTrue(replyLatch.await(3 * maxIdleTime, TimeUnit.MILLISECONDS));
}
}