diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GracefulShutdownTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GracefulShutdownTest.java new file mode 100644 index 00000000000..5fd1c1c4b48 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GracefulShutdownTest.java @@ -0,0 +1,230 @@ +// +// ======================================================================== +// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.GoAwayFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.component.Graceful; +import org.eclipse.jetty.util.component.LifeCycle; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class GracefulShutdownTest extends AbstractTest +{ + @Test + public void testGracefulShutdownWhileIdle() throws Exception + { + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + return null; + } + }); + + CountDownLatch clientRequestLatch = new CountDownLatch(1); + CountDownLatch clientGoAwayLatch = new CountDownLatch(2); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + // One graceful GOAWAY and one normal GOAWAY. + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + clientSession.newStream(new HeadersFrame(request, null, true), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientRequestLatch.countDown(); + } + }); + + assertTrue(clientRequestLatch.await(5, TimeUnit.SECONDS)); + + // Initiate graceful shutdown on server side. + CompletableFuture completable = Graceful.shutdown(connector); + + assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + assertNull(completable.get(5, TimeUnit.SECONDS)); + } + + @Test + public void testGracefulShutdownWithPendingStream() throws Exception + { + CountDownLatch serverLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (frame.isEndStream()) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), callback); + } + else + { + serverLatch.countDown(); + callback.succeeded(); + } + } + }; + } + }); + + CountDownLatch clientRequestLatch = new CountDownLatch(1); + CountDownLatch clientGoAwayLatch = new CountDownLatch(2); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + // One graceful GOAWAY and one normal GOAWAY. + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + Stream stream = clientSession.newStream(new HeadersFrame(request, null, false), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientRequestLatch.countDown(); + } + }).get(5, TimeUnit.SECONDS); + stream.data(new DataFrame(stream.getId(), BufferUtil.toBuffer("hello"), false)); + // Make sure the server has seen the stream. + assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); + + // Initiate graceful shutdown on server side. + CompletableFuture completable = Graceful.shutdown(connector); + + // Make sure the completable is not completed yet, waiting for the stream. + Thread.sleep(1000); + assertFalse(completable.isDone()); + + // Complete the stream. + stream.data(new DataFrame(stream.getId(), BufferUtil.toBuffer("world"), true)); + + assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + assertNull(completable.get(5, TimeUnit.SECONDS)); + } + + @Test + public void testGracefulShutdownAfterSessionAlreadyClosed() throws Exception + { + CountDownLatch serverCloseLatch = new CountDownLatch(1); + AtomicReference serverSessionRef = new AtomicReference<>(); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverSessionRef.set(stream.getSession()); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + return null; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientRequestLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter()); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientRequestLatch.countDown(); + } + }); + + assertTrue(clientRequestLatch.await(5, TimeUnit.SECONDS)); + + LifeCycle.stop(clientSession); + + assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + + HTTP2Session serverSession = (HTTP2Session)serverSessionRef.get(); + assertNotNull(serverSession); + + // Simulate a race condition where session.shutdown() + // is called after the session is closed. + CompletableFuture completable = serverSession.shutdown(); + // Verify that it is completed. + assertTrue(completable.isDone()); + } +} diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index f4b0b78cf10..3860aa99f19 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -1541,6 +1541,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { if (shutdownCallback != null) return shutdownCallback; + if (closed == CloseState.CLOSED) + return CompletableFuture.completedFuture(null); shutdownCallback = future = new Callback.Completable(); } goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); @@ -2028,7 +2030,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio // such as onGoAway() may be in a race to finish, // but only one moves to CLOSED and runs the action. Runnable action = null; - CompletableFuture future; try (AutoLock ignored = lock.lock()) { long count = streamCount.get(); @@ -2039,8 +2040,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio return; } - future = shutdownCallback; - switch (closed) { case LOCALLY_CLOSED: @@ -2080,14 +2079,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio LOG.debug("Executing zero streams action on {}", HTTP2Session.this); action.run(); } - if (future != null) - future.complete(null); } private void terminate(GoAwayFrame frame) { if (LOG.isDebugEnabled()) LOG.debug("Terminating {}", HTTP2Session.this); + + CompletableFuture completable; + try (AutoLock ignored = lock.lock()) + { + completable = shutdownCallback; + } + if (completable != null) + completable.complete(null); + HTTP2Session.this.terminate(failure); notifyClose(HTTP2Session.this, frame, Callback.NOOP); }