Improved handling of the stream close state.

Now the stream close state is updated when the frame has been
successfully written, and when it is received.
The stream is closed in case of failures.
Just after the stream close state update, if the stream is closed
then it is removed from the session.
This commit is contained in:
Simone Bordet 2015-02-18 14:31:25 +01:00
parent 4b6d024c85
commit d4809e9b79
10 changed files with 190 additions and 53 deletions

View File

@ -57,8 +57,6 @@ public class HTTP2ClientSession extends HTTP2Session
{ {
stream.process(frame, Callback.Adapter.INSTANCE); stream.process(frame, Callback.Adapter.INSTANCE);
notifyHeaders(stream, frame); notifyHeaders(stream, frame);
if (stream.isClosed())
removeStream(stream, false);
} }
} }
@ -97,8 +95,6 @@ public class HTTP2ClientSession extends HTTP2Session
pushStream.process(frame, Callback.Adapter.INSTANCE); pushStream.process(frame, Callback.Adapter.INSTANCE);
Stream.Listener listener = notifyPush(stream, pushStream, frame); Stream.Listener listener = notifyPush(stream, pushStream, frame);
pushStream.setListener(listener); pushStream.setListener(listener);
if (pushStream.isClosed())
removeStream(pushStream, false);
} }
} }

View File

@ -19,12 +19,16 @@
package org.eclipse.jetty.http2.client; package org.eclipse.jetty.http2.client;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.HTTP2Stream; import org.eclipse.jetty.http2.HTTP2Stream;
import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.Stream;
@ -32,6 +36,7 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
@ -199,7 +204,7 @@ public class StreamCloseTest extends AbstractTest
} }
}); });
} }
}); }, new Stream.Listener.Adapter());
HeadersFrame response = new HeadersFrame(stream.getId(), new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()), null, true); HeadersFrame response = new HeadersFrame(stream.getId(), new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()), null, true);
stream.headers(response, Callback.Adapter.INSTANCE); stream.headers(response, Callback.Adapter.INSTANCE);
return null; return null;
@ -207,10 +212,9 @@ public class StreamCloseTest extends AbstractTest
}); });
Session session = newClient(new Session.Listener.Adapter()); Session session = newClient(new Session.Listener.Adapter());
HeadersFrame frame = new HeadersFrame(0, newRequest("GET", new HttpFields()), null, false); HeadersFrame frame = new HeadersFrame(0, newRequest("GET", new HttpFields()), null, true);
Promise<Stream> promise = new Promise.Adapter<>();
final CountDownLatch clientLatch = new CountDownLatch(1); final CountDownLatch clientLatch = new CountDownLatch(1);
session.newStream(frame, promise, new Stream.Listener.Adapter() session.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{ {
@Override @Override
public Stream.Listener onPush(Stream pushedStream, PushPromiseFrame frame) public Stream.Listener onPush(Stream pushedStream, PushPromiseFrame frame)
@ -231,4 +235,106 @@ public class StreamCloseTest extends AbstractTest
Assert.assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
} }
@Test
public void testPushedStreamResetIsClosed() throws Exception
{
final CountDownLatch serverLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(final Stream stream, HeadersFrame frame)
{
PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), 0, newRequest("GET", new HttpFields()));
stream.push(pushFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream pushedStream, ResetFrame frame)
{
Assert.assertTrue(pushedStream.isReset());
Assert.assertTrue(pushedStream.isClosed());
HeadersFrame response = new HeadersFrame(stream.getId(), new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()), null, true);
stream.headers(response, Callback.Adapter.INSTANCE);
serverLatch.countDown();
}
});
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
HeadersFrame frame = new HeadersFrame(0, newRequest("GET", new HttpFields()), null, true);
final CountDownLatch clientLatch = new CountDownLatch(2);
session.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public Stream.Listener onPush(final Stream pushedStream, PushPromiseFrame frame)
{
pushedStream.reset(new ResetFrame(pushedStream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code), new Callback.Adapter()
{
@Override
public void succeeded()
{
Assert.assertTrue(pushedStream.isReset());
Assert.assertTrue(pushedStream.isClosed());
clientLatch.countDown();
}
});
return null;
}
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
clientLatch.countDown();
}
});
Assert.assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testFailedSessionClosesIdleStream() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
final List<Stream> streams = new ArrayList<>();
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
streams.add(stream);
MetaData.Request request = (MetaData.Request)frame.getMetaData();
if ("GET".equals(request.getMethod()))
{
((HTTP2Session)stream.getSession()).getEndPoint().close();
// Try to write something to force an error.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true), Callback.Adapter.INSTANCE);
}
return null;
}
@Override
public void onFailure(Session session, Throwable failure)
{
Assert.assertEquals(0, session.getStreams().size());
for (Stream stream : streams)
Assert.assertTrue(stream.isClosed());
latch.countDown();
}
});
Session session = newClient(new Session.Listener.Adapter());
// First stream will be idle on server.
HeadersFrame request1 = new HeadersFrame(0, newRequest("HEAD", new HttpFields()), null, true);
session.newStream(request1, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter());
// Second stream will fail on server.
HeadersFrame request2 = new HeadersFrame(0, newRequest("GET", new HttpFields()), null, true);
session.newStream(request2, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter());
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
} }

View File

@ -364,6 +364,11 @@ public class HTTP2Flusher extends IteratingCallback
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
if (stream != null)
{
stream.close();
stream.getSession().removeStream(stream, true);
}
callback.failed(x); callback.failed(x);
} }

View File

@ -181,8 +181,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength); flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
} }
}); });
if (stream.isClosed())
removeStream(stream, false);
} }
} }
else else
@ -214,9 +212,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
stream.process(frame, Callback.Adapter.INSTANCE); stream.process(frame, Callback.Adapter.INSTANCE);
else else
notifyReset(this, frame); notifyReset(this, frame);
if (stream != null)
removeStream(stream, false);
} }
@Override @Override
@ -416,7 +411,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
final IStream stream = createLocalStream(streamId, promise); final IStream stream = createLocalStream(streamId, promise);
if (stream == null) if (stream == null)
return; return;
stream.updateClose(frame.isEndStream(), true);
stream.setListener(listener); stream.setListener(listener);
ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream)); ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
@ -428,7 +422,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
} }
@Override @Override
public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame) public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener)
{ {
// Synchronization is necessary to atomically create // Synchronization is necessary to atomically create
// the stream id and enqueue the frame to be sent. // the stream id and enqueue the frame to be sent.
@ -441,7 +435,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
final IStream pushStream = createLocalStream(streamId, promise); final IStream pushStream = createLocalStream(streamId, promise);
if (pushStream == null) if (pushStream == null)
return; return;
pushStream.updateClose(true, false); pushStream.setListener(listener);
ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream)); ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
queued = flusher.append(entry); queued = flusher.append(entry);
@ -647,7 +641,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
return new HTTP2Stream(scheduler, this, streamId); return new HTTP2Stream(scheduler, this, streamId);
} }
protected void removeStream(IStream stream, boolean local) @Override
public void removeStream(IStream stream, boolean local)
{ {
IStream removed = streams.remove(stream.getId()); IStream removed = streams.remove(stream.getId());
if (removed != null) if (removed != null)
@ -845,8 +840,10 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{ {
if (closed.compareAndSet(current, CloseState.CLOSED)) if (closed.compareAndSet(current, CloseState.CLOSED))
{ {
// Close the flusher and disconnect.
flusher.close(); flusher.close();
for (IStream stream : streams.values())
stream.close();
streams.clear();
disconnect(); disconnect();
return; return;
} }
@ -988,14 +985,13 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
case HEADERS: case HEADERS:
{ {
HeadersFrame headersFrame = (HeadersFrame)frame; HeadersFrame headersFrame = (HeadersFrame)frame;
stream.updateClose(headersFrame.isEndStream(), true); if (stream.updateClose(headersFrame.isEndStream(), true))
if (stream.isClosed())
removeStream(stream, true); removeStream(stream, true);
break; break;
} }
case RST_STREAM: case RST_STREAM:
{ {
if (stream != null) stream.close();
removeStream(stream, true); removeStream(stream, true);
break; break;
} }
@ -1007,6 +1003,13 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
flowControl.updateInitialStreamWindow(HTTP2Session.this, initialWindow, true); flowControl.updateInitialStreamWindow(HTTP2Session.this, initialWindow, true);
break; break;
} }
case PUSH_PROMISE:
{
// Pushed streams are implicitly remotely closed.
// They are closed when sending an end-stream DATA frame.
stream.updateClose(true, false);
break;
}
case GO_AWAY: case GO_AWAY:
{ {
// We just sent a GO_AWAY, only shutdown the // We just sent a GO_AWAY, only shutdown the
@ -1097,8 +1100,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{ {
// Only now we can update the close state // Only now we can update the close state
// and eventually remove the stream. // and eventually remove the stream.
stream.updateClose(dataFrame.isEndStream(), true); if (stream.updateClose(dataFrame.isEndStream(), true))
if (stream.isClosed())
removeStream(stream, true); removeStream(stream, true);
callback.succeeded(); callback.succeeded();
} }

View File

@ -80,10 +80,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream
} }
@Override @Override
public void push(PushPromiseFrame frame, Promise<Stream> promise) public void push(PushPromiseFrame frame, Promise<Stream> promise, Listener listener)
{ {
notIdle(); notIdle();
session.push(this, promise, frame); session.push(this, promise, frame, listener);
} }
@Override @Override
@ -227,7 +227,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream
private void onHeaders(HeadersFrame frame, Callback callback) private void onHeaders(HeadersFrame frame, Callback callback)
{ {
updateClose(frame.isEndStream(), false); if (updateClose(frame.isEndStream(), false))
session.removeStream(this, false);
callback.succeeded(); callback.succeeded();
} }
@ -237,7 +238,9 @@ public class HTTP2Stream extends IdleTimeout implements IStream
{ {
// It's a bad client, it does not deserve to be // It's a bad client, it does not deserve to be
// treated gently by just resetting the stream. // treated gently by just resetting the stream.
session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", callback); session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.Adapter.INSTANCE);
callback.failed(new IOException("stream_window_exceeded"));
return;
} }
// SPEC: remotely closed streams must be replied with a reset. // SPEC: remotely closed streams must be replied with a reset.
@ -245,39 +248,46 @@ public class HTTP2Stream extends IdleTimeout implements IStream
{ {
reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.Adapter.INSTANCE); reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.Adapter.INSTANCE);
callback.failed(new EOFException("stream_closed")); callback.failed(new EOFException("stream_closed"));
return;
} }
if (isReset()) if (isReset())
{ {
// Just drop the frame. // Just drop the frame.
callback.failed(new IOException("stream_reset")); callback.failed(new IOException("stream_reset"));
return;
} }
updateClose(frame.isEndStream(), false); if (updateClose(frame.isEndStream(), false))
session.removeStream(this, false);
notifyData(this, frame, callback); notifyData(this, frame, callback);
} }
private void onReset(ResetFrame frame, Callback callback) private void onReset(ResetFrame frame, Callback callback)
{ {
remoteReset = true; remoteReset = true;
close();
session.removeStream(this, false);
callback.succeeded(); callback.succeeded();
notifyReset(this, frame); notifyReset(this, frame);
} }
private void onPush(PushPromiseFrame frame, Callback callback) private void onPush(PushPromiseFrame frame, Callback callback)
{ {
// Pushed streams are implicitly locally closed.
// They are closed when receiving an end-stream DATA frame.
updateClose(true, true); updateClose(true, true);
callback.succeeded(); callback.succeeded();
} }
@Override @Override
public void updateClose(boolean update, boolean local) public boolean updateClose(boolean update, boolean local)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Update close for {} close={} local={}", this, update, local); LOG.debug("Update close for {} close={} local={}", this, update, local);
if (!update) if (!update)
return; return false;
while (true) while (true)
{ {
@ -288,24 +298,26 @@ public class HTTP2Stream extends IdleTimeout implements IStream
{ {
CloseState newValue = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED; CloseState newValue = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
if (closeState.compareAndSet(current, newValue)) if (closeState.compareAndSet(current, newValue))
return; return false;
break; break;
} }
case LOCALLY_CLOSED: case LOCALLY_CLOSED:
{ {
if (!local) if (local)
return false;
close(); close();
return; return true;
} }
case REMOTELY_CLOSED: case REMOTELY_CLOSED:
{ {
if (local) if (!local)
return false;
close(); close();
return; return true;
} }
default: default:
{ {
return; return false;
} }
} }
} }
@ -334,7 +346,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream
return recvWindow.getAndAdd(delta); return recvWindow.getAndAdd(delta);
} }
private void close() @Override
public void close()
{ {
closeState.set(CloseState.CLOSED); closeState.set(CloseState.CLOSED);
onClose(); onClose();

View File

@ -37,6 +37,14 @@ public interface ISession extends Session
@Override @Override
public IStream getStream(int streamId); public IStream getStream(int streamId);
/**
* <p>Removes the given {@code stream}.</p>
*
* @param stream the stream to remove
* @param local whether the stream is local or remote
*/
public void removeStream(IStream stream, boolean local);
/** /**
* <p>Enqueues the given frames to be written to the connection.</p> * <p>Enqueues the given frames to be written to the connection.</p>
* *
@ -55,8 +63,9 @@ public interface ISession extends Session
* @param stream the stream associated to the pushed stream * @param stream the stream associated to the pushed stream
* @param promise the promise that gets notified of the pushed stream creation * @param promise the promise that gets notified of the pushed stream creation
* @param frame the PUSH_PROMISE frame to enqueue * @param frame the PUSH_PROMISE frame to enqueue
* @param listener the listener that gets notified of pushed stream events
*/ */
public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame); public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener);
/** /**
* <p>Enqueues the given DATA frame to be written to the connection.</p> * <p>Enqueues the given DATA frame to be written to the connection.</p>

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.http2; package org.eclipse.jetty.http2;
import java.io.Closeable;
import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -27,7 +29,7 @@ import org.eclipse.jetty.util.Callback;
* <p>This class extends {@link Stream} by adding the methods required to * <p>This class extends {@link Stream} by adding the methods required to
* implement the HTTP/2 stream functionalities.</p> * implement the HTTP/2 stream functionalities.</p>
*/ */
public interface IStream extends Stream public interface IStream extends Stream, Closeable
{ {
/** /**
* <p>The constant used as attribute key to store/retrieve the HTTP * <p>The constant used as attribute key to store/retrieve the HTTP
@ -67,9 +69,15 @@ public interface IStream extends Stream
* @param local whether the update comes from a local operation * @param local whether the update comes from a local operation
* (such as sending a frame that ends the stream) * (such as sending a frame that ends the stream)
* or a remote operation (such as receiving a frame * or a remote operation (such as receiving a frame
* that ends the stream). * @return whether the stream has been fully closed by this invocation
*/ */
public void updateClose(boolean update, boolean local); public boolean updateClose(boolean update, boolean local);
/**
* <p>Forcibly closes this stream.</p>
*/
@Override
public void close();
/** /**
* @return the current value of the stream send window * @return the current value of the stream send window

View File

@ -63,8 +63,9 @@ public interface Stream
* *
* @param frame the PUSH_PROMISE frame to send * @param frame the PUSH_PROMISE frame to send
* @param promise the promise that gets notified of the pushed stream creation * @param promise the promise that gets notified of the pushed stream creation
* @param listener the listener that gets notified of stream events
*/ */
public void push(PushPromiseFrame frame, Promise<Stream> promise); public void push(PushPromiseFrame frame, Promise<Stream> promise, Listener listener);
/** /**
* <p>Sends the given DATA {@code frame}.</p> * <p>Sends the given DATA {@code frame}.</p>

View File

@ -77,9 +77,6 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
stream.process(frame, Callback.Adapter.INSTANCE); stream.process(frame, Callback.Adapter.INSTANCE);
Stream.Listener listener = notifyNewStream(stream, frame); Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener); stream.setListener(listener);
// The listener may have sent a frame that closed the stream.
if (stream.isClosed())
removeStream(stream, false);
} }
} }
else else

View File

@ -160,7 +160,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Could not push " + request, x); LOG.debug("Could not push " + request, x);
} }
}); }, new Stream.Listener.Adapter()); // TODO: handle reset from the client ?
} }
private void commit(MetaData.Response info, boolean endStream, Callback callback) private void commit(MetaData.Response info, boolean endStream, Callback callback)