Implemented idle timeout functionality for streams.

This commit is contained in:
Simone Bordet 2014-06-18 13:57:37 +02:00
parent 4dca6a71d3
commit 9d9260e634
9 changed files with 325 additions and 42 deletions

View File

@ -129,7 +129,7 @@ public class HTTP2Client extends ContainerLifeCycle
{
Context context = (Context)attachment;
Generator generator = new Generator(byteBufferPool, 4096);
HTTP2Session session = new HTTP2ClientSession(endpoint, generator, context.listener, new HTTP2FlowControl(65535));
HTTP2Session session = new HTTP2ClientSession(getScheduler(), endpoint, generator, context.listener, new HTTP2FlowControl(65535));
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
return new HTTP2ClientConnection(byteBufferPool, getExecutor(), endpoint, parser, 8192, context.promise, session);
}

View File

@ -30,14 +30,15 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2ClientSession extends HTTP2Session
{
private static final Logger LOG = Log.getLogger(HTTP2ClientSession.class);
public HTTP2ClientSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl)
public HTTP2ClientSession(Scheduler scheduler, EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl)
{
super(endPoint, generator, listener, flowControl, -1, 1);
super(scheduler, endPoint, generator, listener, flowControl, -1, 1);
}
@Override
@ -48,7 +49,7 @@ public class HTTP2ClientSession extends HTTP2Session
if (stream == null)
{
ResetFrame reset = new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR);
reset(reset, disconnectCallback);
reset(reset, disconnectOnFailure());
}
else
{

View File

@ -19,8 +19,15 @@
package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
@ -28,13 +35,18 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertThat;
public class IdleTimeoutTest extends AbstractTest
{
private final int idleTimeout = 1000;
@ -250,4 +262,161 @@ public class IdleTimeoutTest extends AbstractTest
Assert.assertFalse(closeLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertTrue(replyLatch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test
public void testClientEnforcingStreamIdleTimeout() throws Exception
{
final int idleTimeout = 1000;
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
try
{
Thread.sleep(2 * idleTimeout);
}
catch (InterruptedException x)
{
throw new RuntimeException(x);
}
}
});
Session session = newClient(new Session.Listener.Adapter());
final CountDownLatch dataLatch = new CountDownLatch(1);
final CountDownLatch timeoutLatch = new CountDownLatch(1);
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, true);
session.newStream(requestFrame, new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream stream)
{
stream.setIdleTimeout(idleTimeout);
}
}, new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
dataLatch.countDown();
}
@Override
public void onFailure(Stream stream, Throwable x)
{
assertThat(x, instanceOf(TimeoutException.class));
timeoutLatch.countDown();
}
});
Assert.assertTrue(timeoutLatch.await(5, TimeUnit.SECONDS));
// We must not receive any DATA frame.
Assert.assertFalse(dataLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Stream must be gone.
Assert.assertTrue(session.getStreams().isEmpty());
}
@Test
public void testServerEnforcingStreamIdleTimeout() throws Exception
{
final CountDownLatch timeoutLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
stream.setIdleTimeout(idleTimeout);
return new Stream.Listener.Adapter()
{
@Override
public void onFailure(Stream stream, Throwable x)
{
timeoutLatch.countDown();
}
};
}
});
final CountDownLatch resetLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onReset(Session session, ResetFrame frame)
{
resetLatch.countDown();
}
});
MetaData.Request metaData = newRequest("GET", new HttpFields());
// Stream does not end here, but we won't send any DATA frame.
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, false);
session.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter());
Assert.assertTrue(timeoutLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
// Stream must be gone.
Assert.assertTrue(session.getStreams().isEmpty());
}
@Test
public void testIdleTimeoutIsInterruptedWhenReceiving() throws Exception
{
final CountDownLatch timeoutLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
stream.setIdleTimeout(idleTimeout);
return new Stream.Listener.Adapter()
{
@Override
public void onFailure(Stream stream, Throwable x)
{
timeoutLatch.countDown();
}
};
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, false);
session.newStream(requestFrame, new Promise.Adapter<Stream>()
{
@Override
public void succeeded(final Stream stream)
{
sleep(idleTimeout / 2);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), new Callback.Adapter()
{
private int sends;
@Override
public void succeeded()
{
sleep(idleTimeout / 2);
boolean last = ++sends == 2;
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), last), last ? INSTANCE : this);
}
});
}
}, new Stream.Listener.Adapter());
Assert.assertFalse(timeoutLatch.await(1, TimeUnit.SECONDS));
}
private void sleep(long value)
{
try
{
TimeUnit.MILLISECONDS.sleep(value);
}
catch (InterruptedException x)
{
Assert.fail();
}
}
}

View File

@ -59,12 +59,13 @@ import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public abstract class HTTP2Session implements ISession, Parser.Listener
{
private static final Logger LOG = Log.getLogger(HTTP2Session.class);
protected final Callback disconnectCallback = new Callback.Adapter()
private final Callback disconnectOnFailure = new Callback.Adapter()
{
@Override
public void failed(Throwable x)
@ -72,13 +73,13 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
disconnect();
}
};
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final AtomicInteger streamIds = new AtomicInteger();
private final AtomicInteger lastStreamId = new AtomicInteger();
private final AtomicInteger streamCount = new AtomicInteger();
private final AtomicInteger windowSize = new AtomicInteger();
private final AtomicBoolean closed = new AtomicBoolean();
private final Scheduler scheduler;
private final EndPoint endPoint;
private final Generator generator;
private final Listener listener;
@ -86,8 +87,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final Flusher flusher;
private volatile int maxStreamCount;
public HTTP2Session(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int maxStreams, int initialStreamId)
public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int maxStreams, int initialStreamId)
{
this.scheduler = scheduler;
this.endPoint = endPoint;
this.generator = generator;
this.listener = listener;
@ -103,16 +105,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
return generator;
}
public int getMaxStreamCount()
{
return maxStreamCount;
}
public FlowControl getFlowControl()
{
return flowControl;
}
@Override
public boolean onData(final DataFrame frame)
{
@ -140,7 +132,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
else
{
ResetFrame resetFrame = new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR);
reset(resetFrame, disconnectCallback);
reset(resetFrame, disconnectOnFailure());
return false;
}
}
@ -157,6 +149,18 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public boolean onReset(ResetFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
IStream stream = getStream(frame.getStreamId());
if (stream != null)
stream.process(frame, Callback.Adapter.INSTANCE);
notifyReset(this, frame);
if (stream != null)
removeStream(stream, false);
return false;
}
@ -186,7 +190,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
// SPEC: SETTINGS frame MUST be replied.
SettingsFrame reply = new SettingsFrame(Collections.<Integer, Integer>emptyMap(), true);
settings(reply, disconnectCallback);
settings(reply, disconnectOnFailure());
return false;
}
@ -209,7 +213,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
else
{
PingFrame reply = new PingFrame(frame.getPayload(), true);
control(null, reply, disconnectCallback);
control(null, reply, disconnectOnFailure());
}
return false;
}
@ -265,7 +269,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public void onConnectionFailure(int error, String reason)
{
close(error, reason, disconnectCallback);
close(error, reason, disconnectOnFailure());
}
@Override
@ -314,7 +318,15 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public void reset(ResetFrame frame, Callback callback)
{
control(null, frame, callback);
if (closed.get())
{
callback.succeeded();
}
else
{
// TODO: think about moving reset() to Stream.
control(getStream(frame.getStreamId()), frame, callback);
}
}
@Override
@ -352,7 +364,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
flusher.iterate();
}
protected void disconnect()
public void disconnect()
{
if (LOG.isDebugEnabled())
LOG.debug("Disconnecting");
@ -365,6 +377,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
int streamId = stream.getId();
if (streams.putIfAbsent(streamId, stream) == null)
{
stream.setIdleTimeout(endPoint.getIdleTimeout());
flowControl.onNewStream(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created local {}", stream);
@ -387,7 +400,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
int maxStreams = maxStreamCount;
if (maxStreams >= 0 && currentStreams >= maxStreams)
{
reset(new ResetFrame(streamId, ErrorCode.PROTOCOL_ERROR), disconnectCallback);
reset(new ResetFrame(streamId, ErrorCode.PROTOCOL_ERROR), disconnectOnFailure());
return null;
}
if (streamCount.compareAndSet(currentStreams, currentStreams + 1))
@ -400,6 +413,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (streams.putIfAbsent(streamId, stream) == null)
{
updateLastStreamId(streamId);
stream.setIdleTimeout(endPoint.getIdleTimeout());
flowControl.onNewStream(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created remote {}", stream);
@ -407,14 +421,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
else
{
close(ErrorCode.PROTOCOL_ERROR, "duplicate_stream", disconnectCallback);
close(ErrorCode.PROTOCOL_ERROR, "duplicate_stream", disconnectOnFailure());
return null;
}
}
protected IStream newStream(HeadersFrame frame)
{
return new HTTP2Stream(this, frame);
return new HTTP2Stream(scheduler, this, frame);
}
protected void removeStream(IStream stream, boolean local)
@ -461,6 +475,11 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
Atomics.updateMax(lastStreamId, streamId);
}
protected Callback disconnectOnFailure()
{
return disconnectOnFailure;
}
protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame)
{
try
@ -498,6 +517,18 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
protected void notifyReset(Session session, ResetFrame frame)
{
try
{
listener.onReset(session, frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
protected void notifyClose(Session session, GoAwayFrame frame)
{
try
@ -755,6 +786,24 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public void succeeded()
{
switch (frame.getType())
{
case RST_STREAM:
{
if (stream != null)
removeStream(stream, true);
break;
}
case GO_AWAY:
{
disconnect();
break;
}
default:
{
break;
}
}
callback.succeeded();
}
@ -812,7 +861,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
stream.updateClose(dataFrame.isEndStream(), true);
if (stream.isClosed())
removeStream(stream, true);
super.succeeded();
callback.succeeded();
}
}
}

View File

@ -20,30 +20,45 @@ package org.eclipse.jetty.http2;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.parser.ErrorCode;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2Stream implements IStream
public class HTTP2Stream extends IdleTimeout implements IStream
{
private static final Logger LOG = Log.getLogger(HTTP2Stream.class);
private final Callback disconnectOnFailure = new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
session.disconnect();
}
};
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
private final AtomicInteger windowSize = new AtomicInteger();
private final ISession session;
private final HeadersFrame frame;
private Listener listener;
private volatile Listener listener;
private volatile boolean reset = false;
public HTTP2Stream(ISession session, HeadersFrame frame)
public HTTP2Stream(Scheduler scheduler, ISession session, HeadersFrame frame)
{
super(scheduler);
this.session = session;
this.frame = frame;
}
@ -102,6 +117,28 @@ public class HTTP2Stream implements IStream
return closeState.get() == CloseState.CLOSED;
}
@Override
public boolean isOpen()
{
return !isClosed();
}
@Override
protected void onIdleExpired(TimeoutException timeout)
{
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout {}ms expired on {}", getIdleTimeout(), this);
// The stream is now gone, we must close it to
// avoid that its idle timeout is rescheduled.
closeState.set(CloseState.CLOSED);
onClose();
session.reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR), disconnectOnFailure);
notifyFailure(this, timeout);
}
private ConcurrentMap<String, Object> attributes()
{
ConcurrentMap<String, Object> map = attributes.get();
@ -131,14 +168,21 @@ public class HTTP2Stream implements IStream
@Override
public boolean process(Frame frame, Callback callback)
{
notIdle();
switch (frame.getType())
{
case DATA:
{
return notifyData((DataFrame)frame, callback);
// TODO: handle cases where:
// TODO: A) stream already remotely close.
// TODO: B) DATA before HEADERS.
notifyData(this, (DataFrame)frame, callback);
return false;
}
case HEADERS:
{
// TODO: handle case where HEADERS after DATA.
return false;
}
case RST_STREAM:
@ -210,28 +254,41 @@ public class HTTP2Stream implements IStream
return windowSize.getAndAdd(delta);
}
protected boolean notifyData(DataFrame frame, Callback callback)
protected void notifyData(Stream stream, DataFrame frame, Callback callback)
{
final Listener listener = this.listener;
if (listener == null)
return false;
return;
try
{
listener.onData(this, frame, callback);
return false;
listener.onData(stream, frame, callback);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
private void notifyFailure(Stream stream, Throwable failure)
{
Listener listener = this.listener;
if (listener == null)
return;
try
{
listener.onFailure(stream, failure);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return false;
}
}
@Override
public String toString()
{
return String.format("%s@%x{id=%d,windowSize=%s,%s}", getClass().getSimpleName(),
hashCode(), getId(), windowSize, closeState);
return String.format("%s@%x{id=%d,windowSize=%s,reset=%b,%s}", getClass().getSimpleName(),
hashCode(), getId(), windowSize, reset, closeState);
}
private enum CloseState

View File

@ -33,4 +33,6 @@ public interface ISession extends Session
public void data(IStream stream, DataFrame frame, Callback callback);
public int updateWindowSize(int delta);
public void disconnect();
}

View File

@ -42,6 +42,10 @@ public interface Stream
public boolean isClosed();
public long getIdleTimeout();
public void setIdleTimeout(long idleTimeout);
// TODO: see SPDY's Stream
public interface Listener

View File

@ -81,7 +81,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
ServerSessionListener listener = newSessionListener(connector, endPoint);
Generator generator = new Generator(connector.getByteBufferPool(), getMaxHeaderTableSize());
HTTP2ServerSession session = new HTTP2ServerSession(endPoint, generator, listener,
HTTP2ServerSession session = new HTTP2ServerSession(connector.getScheduler(), endPoint, generator, listener,
new HTTP2FlowControl(getInitialWindowSize()), getMaxConcurrentStreams());
Parser parser = newServerParser(connector.getByteBufferPool(), session);

View File

@ -35,6 +35,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Listener
{
@ -42,9 +43,9 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
private final ServerSessionListener listener;
public HTTP2ServerSession(EndPoint endPoint, Generator generator, ServerSessionListener listener, FlowControl flowControl, int maxStreams)
public HTTP2ServerSession(Scheduler scheduler, EndPoint endPoint, Generator generator, ServerSessionListener listener, FlowControl flowControl, int maxStreams)
{
super(endPoint, generator, listener, flowControl, maxStreams, 2);
super(scheduler, endPoint, generator, listener, flowControl, maxStreams, 2);
this.listener = listener;
}
@ -56,7 +57,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
if (settings == null)
settings = Collections.emptyMap();
SettingsFrame frame = new SettingsFrame(settings, false);
settings(frame, disconnectCallback);
settings(frame, disconnectOnFailure());
return false;
}