Implemented idle timeout functionality for both client and server.

This commit is contained in:
Simone Bordet 2014-06-18 11:18:15 +02:00
parent 690cd01933
commit 9c95e29088
9 changed files with 337 additions and 24 deletions

View File

@ -53,6 +53,7 @@ public class HTTP2Client extends ContainerLifeCycle
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final SelectorManager selector;
private final ByteBufferPool byteBufferPool;
private long idleTimeout;
public HTTP2Client()
{
@ -100,6 +101,16 @@ public class HTTP2Client extends ContainerLifeCycle
sessions.clear();
}
public long getIdleTimeout()
{
return idleTimeout;
}
public void setIdleTimeout(long idleTimeout)
{
this.idleTimeout = idleTimeout;
}
private class ClientSelectorManager extends SelectorManager
{
private ClientSelectorManager(Executor executor, Scheduler scheduler)
@ -110,7 +121,7 @@ public class HTTP2Client extends ContainerLifeCycle
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), 30000);
return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout());
}
@Override
@ -150,6 +161,15 @@ public class HTTP2Client extends ContainerLifeCycle
sessions.remove(session);
}
@Override
protected boolean onReadTimeout()
{
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout {}ms expired on {}", getEndPoint().getIdleTimeout(), this);
session.close(ErrorCode.NO_ERROR, "idle_timeout", closeCallback);
return false;
}
@Override
public void succeeded()
{

View File

@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServlet;
import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpVersion;
@ -42,9 +43,9 @@ import org.junit.After;
public class AbstractTest
{
private ServerConnector connector;
protected ServerConnector connector;
private String path = "/test";
private HTTP2Client client;
protected HTTP2Client client;
private Server server;
protected void startServer(HttpServlet servlet) throws Exception
@ -106,6 +107,6 @@ public class AbstractTest
String host = "localhost";
int port = connector.getLocalPort();
String authority = host + ":" + port;
return new MetaData.Request(HttpVersion.HTTP_2, HttpScheme.HTTP, method, authority, host, port, path, fields);
return new MetaData.Request(HttpVersion.HTTP_2, HttpScheme.HTTP, method, new HostPortHttpField(authority), path, fields);
}
}

View File

@ -105,7 +105,7 @@ public class FlowControlTest extends AbstractTest
// Two SETTINGS frames, the initial one and the one we send from the server.
final CountDownLatch settingsLatch = new CountDownLatch(2);
Session client = newClient(new Session.Listener.Adapter()
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
@ -116,7 +116,7 @@ public class FlowControlTest extends AbstractTest
MetaData.Request request = newRequest("POST", new HttpFields());
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(new HeadersFrame(0, request, null, false), promise, new Stream.Listener.Adapter());
session.newStream(new HeadersFrame(0, request, null, false), promise, new Stream.Listener.Adapter());
Stream stream = promise.get(5, TimeUnit.SECONDS);
// Send first chunk that exceeds the window.
@ -356,11 +356,11 @@ public class FlowControlTest extends AbstractTest
}
});
Session client = newClient(new Session.Listener.Adapter());
Session session = newClient(new Session.Listener.Adapter());
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, windowSize);
client.settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE);
session.settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE);
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
@ -369,7 +369,7 @@ public class FlowControlTest extends AbstractTest
// First request will consume half the session window.
MetaData.Request request1 = newRequest("GET", new HttpFields());
client.newStream(new HeadersFrame(0, request1, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
session.newStream(new HeadersFrame(0, request1, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
@ -382,7 +382,7 @@ public class FlowControlTest extends AbstractTest
// Second request will consume the session window, which is now stalled.
// A third request will not be able to receive data.
MetaData.Request request2 = newRequest("GET", new HttpFields());
client.newStream(new HeadersFrame(0, request2, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
session.newStream(new HeadersFrame(0, request2, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
@ -395,7 +395,7 @@ public class FlowControlTest extends AbstractTest
// Third request is now stalled.
final CountDownLatch latch = new CountDownLatch(1);
MetaData.Request request3 = newRequest("GET", new HttpFields());
client.newStream(new HeadersFrame(0, request3, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
session.newStream(new HeadersFrame(0, request3, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
@ -438,12 +438,12 @@ public class FlowControlTest extends AbstractTest
}
});
Session client = newClient(new Session.Listener.Adapter());
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, true);
final byte[] bytes = new byte[data.length];
final CountDownLatch latch = new CountDownLatch(1);
client.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
session.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
private int received;

View File

@ -46,13 +46,13 @@ public class HTTP2Test extends AbstractTest
{
startServer(new EmptyHttpServlet());
Session client = newClient(new Session.Listener.Adapter());
Session session = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(1, metaData, null, true);
final CountDownLatch latch = new CountDownLatch(1);
client.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
session.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
@ -86,13 +86,13 @@ public class HTTP2Test extends AbstractTest
}
});
Session client = newClient(new Session.Listener.Adapter());
Session session = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(1, metaData, null, true);
final CountDownLatch latch = new CountDownLatch(2);
client.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
session.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)

View File

@ -0,0 +1,253 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Test;
public class IdleTimeoutTest extends AbstractTest
{
private final int idleTimeout = 1000;
@Test
public void testServerEnforcingIdleTimeout() throws Exception
{
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
{
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
return null;
}
});
connector.setIdleTimeout(idleTimeout);
final CountDownLatch latch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onClose(Session session, GoAwayFrame frame)
{
latch.countDown();
}
});
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, true);
session.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter());
Assert.assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test
public void testServerEnforcingIdleTimeoutWithUnrespondedStream() throws Exception
{
startServer(new ServerSessionListener.Adapter());
connector.setIdleTimeout(idleTimeout);
final CountDownLatch latch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onClose(Session session, GoAwayFrame frame)
{
latch.countDown();
}
});
// The request is not replied, and the server should idle timeout.
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, true);
session.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter());
Assert.assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test
public void testServerNotEnforcingIdleTimeoutWithPendingStream() throws Exception
{
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
try
{
Thread.sleep(2 * idleTimeout);
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
return null;
}
catch (InterruptedException x)
{
Assert.fail();
return null;
}
}
});
connector.setIdleTimeout(idleTimeout);
final CountDownLatch closeLatch = new CountDownLatch(1);
Session session = newClient(new ServerSessionListener.Adapter()
{
@Override
public void onClose(Session session, GoAwayFrame frame)
{
closeLatch.countDown();
}
});
final CountDownLatch replyLatch = new CountDownLatch(1);
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, true);
session.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
replyLatch.countDown();
}
});
Assert.assertTrue(replyLatch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
// Just make sure onClose() has never been called, but don't wait too much
Assert.assertFalse(closeLatch.await(idleTimeout / 2, TimeUnit.MILLISECONDS));
}
@Test
public void testClientEnforcingIdleTimeout() throws Exception
{
final CountDownLatch closeLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
return null;
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
closeLatch.countDown();
}
});
client.setIdleTimeout(idleTimeout);
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, true);
session.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter());
Assert.assertTrue(closeLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test
public void testClientEnforcingIdleTimeoutWithUnrespondedStream() throws Exception
{
final CountDownLatch closeLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
{
@Override
public void onClose(Session session, GoAwayFrame frame)
{
closeLatch.countDown();
}
});
client.setIdleTimeout(idleTimeout);
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, true);
session.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter());
Assert.assertTrue(closeLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test
public void testClientNotEnforcingIdleTimeoutWithPendingStream() throws Exception
{
final CountDownLatch closeLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
return null;
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
closeLatch.countDown();
}
});
client.setIdleTimeout(idleTimeout);
Session session = newClient(new Session.Listener.Adapter());
final CountDownLatch replyLatch = new CountDownLatch(1);
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, true);
session.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
try
{
Thread.sleep(2 * idleTimeout);
replyLatch.countDown();
}
catch (InterruptedException e)
{
Assert.fail();
}
}
});
Assert.assertFalse(closeLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertTrue(replyLatch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
}
}

View File

@ -39,7 +39,7 @@ public class PingTest extends AbstractTest
final byte[] payload = new byte[8];
new Random().nextBytes(payload);
final CountDownLatch latch = new CountDownLatch(1);
Session client = newClient(new Session.Listener.Adapter()
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onPing(Session session, PingFrame frame)
@ -51,7 +51,7 @@ public class PingTest extends AbstractTest
});
PingFrame frame = new PingFrame(payload, false);
client.ping(frame, Callback.Adapter.INSTANCE);
session.ping(frame, Callback.Adapter.INSTANCE);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -33,6 +34,14 @@ public class HTTP2Connection extends AbstractConnection
{
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
protected final Callback closeCallback = new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
close();
}
};
private final ByteBufferPool byteBufferPool;
private final Parser parser;
private final int bufferSize;

View File

@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.http2.api.Session;
@ -77,6 +78,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final AtomicInteger lastStreamId = new AtomicInteger();
private final AtomicInteger streamCount = new AtomicInteger();
private final AtomicInteger windowSize = new AtomicInteger();
private final AtomicBoolean closed = new AtomicBoolean();
private final EndPoint endPoint;
private final Generator generator;
private final Listener listener;
@ -223,6 +225,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
flusher.close();
disconnect();
notifyClose(this, frame);
return false;
}
@ -315,11 +320,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public void close(int error, String reason, Callback callback)
{
byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8);
GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
if (LOG.isDebugEnabled())
LOG.debug("Sending {}: {}", frame.getType(), reason);
control(null, frame, callback);
if (closed.compareAndSet(false, true))
{
byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8);
GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
if (LOG.isDebugEnabled())
LOG.debug("Sending {}: {}", frame.getType(), reason);
control(null, frame, callback);
}
}
@Override
@ -490,6 +498,18 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
protected void notifyClose(Session session, GoAwayFrame frame)
{
try
{
listener.onClose(session, frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
@Override
public String toString()
{

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.ErrorCode;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.io.ByteBufferPool;
@ -111,6 +112,15 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
notifyConnect(session);
}
@Override
protected boolean onReadTimeout()
{
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout {}ms expired on {}", getEndPoint().getIdleTimeout(), this);
session.close(ErrorCode.NO_ERROR, "idle_timeout", closeCallback);
return false;
}
private void notifyConnect(Session session)
{
try