Fixes #8811 - HTTP/2 session shutdown race may cause Server.stop() to block until stop timeout.

Now a completed future is returned from shutdown() if the session is already closed.

Moved the notification of the CompletableFuture to terminate(), which is always invoked.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-11-03 22:02:26 +01:00
parent 27f60b35f9
commit cd737489f9
2 changed files with 241 additions and 5 deletions

View File

@ -0,0 +1,230 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.util.component.LifeCycle;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GracefulShutdownTest extends AbstractTest
{
@Test
public void testGracefulShutdownWhileIdle() throws Exception
{
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
return null;
}
});
CountDownLatch clientRequestLatch = new CountDownLatch(1);
CountDownLatch clientGoAwayLatch = new CountDownLatch(2);
CountDownLatch clientCloseLatch = new CountDownLatch(1);
Session clientSession = newClient(new Session.Listener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
// One graceful GOAWAY and one normal GOAWAY.
clientGoAwayLatch.countDown();
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
clientCloseLatch.countDown();
}
});
MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY);
clientSession.newStream(new HeadersFrame(request, null, true), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200)
clientRequestLatch.countDown();
}
});
assertTrue(clientRequestLatch.await(5, TimeUnit.SECONDS));
// Initiate graceful shutdown on server side.
CompletableFuture<Void> completable = Graceful.shutdown(connector);
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertNull(completable.get(5, TimeUnit.SECONDS));
}
@Test
public void testGracefulShutdownWithPendingStream() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (frame.isEndStream())
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), callback);
}
else
{
serverLatch.countDown();
callback.succeeded();
}
}
};
}
});
CountDownLatch clientRequestLatch = new CountDownLatch(1);
CountDownLatch clientGoAwayLatch = new CountDownLatch(2);
CountDownLatch clientCloseLatch = new CountDownLatch(1);
Session clientSession = newClient(new Session.Listener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
// One graceful GOAWAY and one normal GOAWAY.
clientGoAwayLatch.countDown();
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
clientCloseLatch.countDown();
}
});
MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY);
Stream stream = clientSession.newStream(new HeadersFrame(request, null, false), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200)
clientRequestLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(stream.getId(), BufferUtil.toBuffer("hello"), false));
// Make sure the server has seen the stream.
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
// Initiate graceful shutdown on server side.
CompletableFuture<Void> completable = Graceful.shutdown(connector);
// Make sure the completable is not completed yet, waiting for the stream.
Thread.sleep(1000);
assertFalse(completable.isDone());
// Complete the stream.
stream.data(new DataFrame(stream.getId(), BufferUtil.toBuffer("world"), true));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertNull(completable.get(5, TimeUnit.SECONDS));
}
@Test
public void testGracefulShutdownAfterSessionAlreadyClosed() throws Exception
{
CountDownLatch serverCloseLatch = new CountDownLatch(1);
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
serverSessionRef.set(stream.getSession());
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
return null;
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
serverCloseLatch.countDown();
}
});
CountDownLatch clientRequestLatch = new CountDownLatch(1);
Session clientSession = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY);
clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200)
clientRequestLatch.countDown();
}
});
assertTrue(clientRequestLatch.await(5, TimeUnit.SECONDS));
LifeCycle.stop(clientSession);
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
HTTP2Session serverSession = (HTTP2Session)serverSessionRef.get();
assertNotNull(serverSession);
// Simulate a race condition where session.shutdown()
// is called after the session is closed.
CompletableFuture<Void> completable = serverSession.shutdown();
// Verify that it is completed.
assertTrue(completable.isDone());
}
}

View File

@ -1541,6 +1541,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
if (shutdownCallback != null)
return shutdownCallback;
if (closed == CloseState.CLOSED)
return CompletableFuture.completedFuture(null);
shutdownCallback = future = new Callback.Completable();
}
goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
@ -2028,7 +2030,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// such as onGoAway() may be in a race to finish,
// but only one moves to CLOSED and runs the action.
Runnable action = null;
CompletableFuture<Void> future;
try (AutoLock ignored = lock.lock())
{
long count = streamCount.get();
@ -2039,8 +2040,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
return;
}
future = shutdownCallback;
switch (closed)
{
case LOCALLY_CLOSED:
@ -2080,14 +2079,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
LOG.debug("Executing zero streams action on {}", HTTP2Session.this);
action.run();
}
if (future != null)
future.complete(null);
}
private void terminate(GoAwayFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Terminating {}", HTTP2Session.this);
CompletableFuture<Void> completable;
try (AutoLock ignored = lock.lock())
{
completable = shutdownCallback;
}
if (completable != null)
completable.complete(null);
HTTP2Session.this.terminate(failure);
notifyClose(HTTP2Session.this, frame, Callback.NOOP);
}