Issue #2788 - Graceful close of HTTP/2 Connection.

Updates after review.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-03-10 01:29:13 +01:00
parent 916b3e56ec
commit 3a6c9b8049
1 changed files with 52 additions and 70 deletions

View File

@ -102,7 +102,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private boolean connectProtocolEnabled; private boolean connectProtocolEnabled;
private long idleTime; private long idleTime;
private GoAwayFrame closeFrame; private GoAwayFrame closeFrame;
private Callback.Completable closeCallback; private Callback.Completable shutdownCallback;
public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId) public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId)
{ {
@ -439,12 +439,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame); LOG.debug("Received {}", frame);
while (true) if (closed.compareAndSet(CloseState.NOT_CLOSED, CloseState.REMOTELY_CLOSED))
{
CloseState current = closed.get();
if (current == CloseState.NOT_CLOSED)
{
if (closed.compareAndSet(current, CloseState.REMOTELY_CLOSED))
{ {
// We received a GO_AWAY, so try to write // We received a GO_AWAY, so try to write
// what's in the queue and then disconnect. // what's in the queue and then disconnect.
@ -452,14 +447,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
notifyClose(this, frame, new DisconnectCallback()); notifyClose(this, frame, new DisconnectCallback());
return; return;
} }
}
else
{
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Ignored {}, already closed", frame); LOG.debug("Ignored {}, already closed", frame);
return;
}
}
} }
@Override @Override
@ -671,12 +661,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override @Override
public boolean close(int error, String reason, Callback callback) public boolean close(int error, String reason, Callback callback)
{ {
while (true) if (closed.compareAndSet(CloseState.NOT_CLOSED, CloseState.LOCALLY_CLOSED))
{
CloseState current = closed.get();
if (current == CloseState.NOT_CLOSED)
{
if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED))
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Closing {}/{}", error, reason); LOG.debug("Closing {}/{}", error, reason);
@ -684,48 +669,35 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
control(null, callback, closeFrame); control(null, callback, closeFrame);
return true; return true;
} }
}
else
{
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Ignoring close {}/{}, already closed", error, reason); LOG.debug("Ignoring close {}/{}, already closed", error, reason);
callback.succeeded(); callback.succeeded();
return false; return false;
} }
}
}
@Override @Override
public CompletableFuture<Void> shutdown() public CompletableFuture<Void> shutdown()
{ {
while (true) if (closed.compareAndSet(CloseState.NOT_CLOSED, CloseState.LOCALLY_CLOSED))
{
CloseState current = closed.get();
if (current == CloseState.NOT_CLOSED)
{
if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED))
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Shutting down {}", this); LOG.debug("Shutting down {}", this);
closeFrame = newGoAwayFrame(CloseState.LOCALLY_CLOSED, ErrorCode.NO_ERROR.code, "shutdown"); closeFrame = newGoAwayFrame(CloseState.LOCALLY_CLOSED, ErrorCode.NO_ERROR.code, "shutdown");
closeCallback = new Callback.Completable(); shutdownCallback = new Callback.Completable();
// Only send the close frame when we can flip Hi and Lo = 0, see onStreamClosed(). // Only send the close frame when we can flip Hi and Lo = 0, see onStreamClosed().
if (streamCount.compareAndSet(0, 1, 0, 0)) if (streamCount.compareAndSet(0, 1, 0, 0))
control(null, closeCallback, closeFrame); control(null, shutdownCallback, closeFrame);
return closeCallback; return shutdownCallback;
} }
}
else
{
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Ignoring shutdown, already closed"); LOG.debug("Ignoring shutdown, already closed");
Callback.Completable result = closeCallback; Callback.Completable result = shutdownCallback;
// Result may be null if the shutdown is in progress, // Result may be null if the shutdown is in progress,
// don't wait and return a completed CompletableFuture. // don't wait and return a completed CompletableFuture.
return result != null ? result : CompletableFuture.completedFuture(null); return result != null ? result : CompletableFuture.completedFuture(null);
} }
}
}
private GoAwayFrame newGoAwayFrame(CloseState closeState, int error, String reason) private GoAwayFrame newGoAwayFrame(CloseState closeState, int error, String reason)
{ {
@ -1075,13 +1047,23 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
protected void onStreamClosed(IStream stream) protected void onStreamClosed(IStream stream)
{ {
if (streamCount.addAndGetLo(-1) == 0) Callback callback = null;
while (true)
{ {
Callback.Completable callback = closeCallback; long encoded = streamCount.get();
// Only send the close frame if we can flip Hi, see shutdown(). int closed = AtomicBiInteger.getHi(encoded);
if (callback != null && streamCount.compareAndSet(0, 1, 0, 0)) int streams = AtomicBiInteger.getLo(encoded) - 1;
control(null, callback, closeFrame); if (streams == 0 && closed == 0)
{
callback = shutdownCallback;
closed = 1;
} }
if (streamCount.compareAndSet(encoded, closed, streams))
break;
}
// Only send the close frame if we can flip Hi and Lo = 0, see shutdown().
if (callback != null)
control(null, callback, closeFrame);
} }
@Override @Override