Fixes #2788 - Graceful close of HTTP/2 Connection.

Made HTTP2SessionContainer implement Graceful, so that it can be found
in the component hierarchy and can shutdown all sessions.
Modified HTTP2ServerSession to reject requests if already closed/shutdown.
Implemented shutdown in HTTP2Session.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-02-06 16:27:12 +01:00
parent 8197c12325
commit 92d9d46f8e
6 changed files with 272 additions and 56 deletions

View File

@ -25,9 +25,11 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -39,6 +41,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
@ -59,6 +62,7 @@ import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.Graceful;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
@ -66,6 +70,7 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -466,7 +471,7 @@ public class HTTP2Test extends AbstractTest
// The third stream must not be created.
MetaData.Request request3 = newRequest("GET", new HttpFields());
CountDownLatch maxStreamsLatch = new CountDownLatch(1);
session.newStream(new HeadersFrame(request3, null, false), new Promise.Adapter<Stream>()
session.newStream(new HeadersFrame(request3, null, false), new Promise.Adapter<>()
{
@Override
public void failed(Throwable x)
@ -494,7 +499,7 @@ public class HTTP2Test extends AbstractTest
// Create a fourth stream.
MetaData.Request request4 = newRequest("GET", new HttpFields());
CountDownLatch exchangeLatch4 = new CountDownLatch(2);
session.newStream(new HeadersFrame(request4, null, true), new Promise.Adapter<Stream>()
session.newStream(new HeadersFrame(request4, null, true), new Promise.Adapter<>()
{
@Override
public void succeeded(Stream result)
@ -893,6 +898,125 @@ public class HTTP2Test extends AbstractTest
assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testGracefulServerGoAway() throws Exception
{
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverSessionLatch = new CountDownLatch(1);
CountDownLatch dataLatch = new CountDownLatch(2);
start(new ServerSessionListener.Adapter()
{
@Override
public void onAccept(Session session)
{
serverSessionRef.set(session);
serverSessionLatch.countDown();
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
dataLatch.countDown();
if (frame.isEndStream())
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
}
};
}
});
// Avoid aggressive idle timeout to allow the test verifications.
connector.setShutdownIdleTimeout(connector.getIdleTimeout());
CountDownLatch clientCloseLatch = new CountDownLatch(1);
Session clientSession = newClient(new Session.Listener.Adapter()
{
@Override
public void onClose(Session session, GoAwayFrame frame)
{
clientCloseLatch.countDown();
}
});
assertTrue(serverSessionLatch.await(5, TimeUnit.SECONDS));
Session serverSession = serverSessionRef.get();
// Start 2 requests without completing them yet.
CountDownLatch responseLatch = new CountDownLatch(2);
MetaData.Request metaData1 = newRequest("GET", new HttpFields());
HeadersFrame request1 = new HeadersFrame(metaData1, null, false);
FuturePromise<Stream> promise1 = new FuturePromise<>();
Stream.Listener.Adapter listener = new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
assertEquals(HttpStatus.OK_200, response.getStatus());
responseLatch.countDown();
}
}
};
clientSession.newStream(request1, promise1, listener);
Stream stream1 = promise1.get(5, TimeUnit.SECONDS);
stream1.data(new DataFrame(stream1.getId(), ByteBuffer.allocate(1), false), Callback.NOOP);
MetaData.Request metaData2 = newRequest("GET", new HttpFields());
HeadersFrame request2 = new HeadersFrame(metaData2, null, false);
FuturePromise<Stream> promise2 = new FuturePromise<>();
clientSession.newStream(request2, promise2, listener);
Stream stream2 = promise2.get(5, TimeUnit.SECONDS);
stream2.data(new DataFrame(stream2.getId(), ByteBuffer.allocate(1), false), Callback.NOOP);
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
// Both requests are now on the server, shutdown gracefully the server session.
int port = connector.getLocalPort();
CompletableFuture<Void> shutdown = Graceful.shutdown(server);
// GOAWAY should not arrive to the client yet.
assertFalse(clientCloseLatch.await(1, TimeUnit.SECONDS));
// New requests should be immediately rejected.
HostPortHttpField authority3 = new HostPortHttpField("localhost" + ":" + port);
MetaData.Request metaData3 = new MetaData.Request("GET", HttpScheme.HTTP, authority3, servletPath, HttpVersion.HTTP_2, new HttpFields());
HeadersFrame request3 = new HeadersFrame(metaData3, null, false);
FuturePromise<Stream> promise3 = new FuturePromise<>();
CountDownLatch resetLatch = new CountDownLatch(1);
clientSession.newStream(request3, promise3, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
Stream stream3 = promise3.get(5, TimeUnit.SECONDS);
stream3.data(new DataFrame(stream3.getId(), ByteBuffer.allocate(1), true), Callback.NOOP);
assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
// Finish the previous requests and expect the responses.
stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
stream2.data(new DataFrame(stream2.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
assertNull(shutdown.get(5, TimeUnit.SECONDS));
// Now GOAWAY should arrive to the client.
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
// Wait to process the GOAWAY frames and close the EndPoints.
Thread.sleep(1000);
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverSession).getEndPoint().isOpen());
}
private static void sleep(long time)
{
try

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -77,6 +78,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private static final Logger LOG = Log.getLogger(HTTP2Session.class);
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final AtomicBiInteger streamCount = new AtomicBiInteger(); // Hi = closed, Lo = stream count
private final AtomicInteger localStreamIds = new AtomicInteger();
private final AtomicInteger lastRemoteStreamId = new AtomicInteger();
private final AtomicInteger localStreamCount = new AtomicInteger();
@ -100,6 +102,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private boolean connectProtocolEnabled;
private long idleTime;
private GoAwayFrame closeFrame;
private Callback.Completable closeCallback;
public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId)
{
@ -439,27 +442,23 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
while (true)
{
CloseState current = closed.get();
switch (current)
if (current == CloseState.NOT_CLOSED)
{
case NOT_CLOSED:
if (closed.compareAndSet(current, CloseState.REMOTELY_CLOSED))
{
if (closed.compareAndSet(current, CloseState.REMOTELY_CLOSED))
{
// We received a GO_AWAY, so try to write
// what's in the queue and then disconnect.
closeFrame = frame;
notifyClose(this, frame, new DisconnectCallback());
return;
}
break;
}
default:
{
if (LOG.isDebugEnabled())
LOG.debug("Ignored {}, already closed", frame);
// We received a GO_AWAY, so try to write
// what's in the queue and then disconnect.
closeFrame = frame;
notifyClose(this, frame, new DisconnectCallback());
return;
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Ignored {}, already closed", frame);
return;
}
}
}
@ -537,7 +536,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
protected void onConnectionFailure(int error, String reason, Callback callback)
{
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason, callback));
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new FailureCallback(error, reason, callback));
}
@Override
@ -675,26 +674,54 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
while (true)
{
CloseState current = closed.get();
switch (current)
if (current == CloseState.NOT_CLOSED)
{
case NOT_CLOSED:
{
if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED))
{
closeFrame = newGoAwayFrame(CloseState.LOCALLY_CLOSED, error, reason);
control(null, callback, closeFrame);
return true;
}
break;
}
default:
if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED))
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring close {}/{}, already closed", error, reason);
callback.succeeded();
return false;
LOG.debug("Closing {}/{}", error, reason);
closeFrame = newGoAwayFrame(CloseState.LOCALLY_CLOSED, error, reason);
control(null, callback, closeFrame);
return true;
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring close {}/{}, already closed", error, reason);
callback.succeeded();
return false;
}
}
}
@Override
public CompletableFuture<Void> shutdown()
{
while (true)
{
CloseState current = closed.get();
if (current == CloseState.NOT_CLOSED)
{
if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED))
{
if (LOG.isDebugEnabled())
LOG.debug("Shutting down {}", this);
closeFrame = newGoAwayFrame(CloseState.LOCALLY_CLOSED, ErrorCode.NO_ERROR.code, "shutdown");
closeCallback = new Callback.Completable();
// Only send the close frame when we can flip Hi and Lo = 0, see onStreamClosed().
if (streamCount.compareAndSet(0, 1, 0, 0))
control(null, closeCallback, closeFrame);
return closeCallback;
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring shutdown, already closed");
Callback.Completable result = closeCallback;
return result != null ? result : CompletableFuture.completedFuture(null);
}
}
}
@ -1041,10 +1068,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
protected void onStreamOpened(IStream stream)
{
streamCount.addAndGetLo(1);
}
protected void onStreamClosed(IStream stream)
{
if (streamCount.addAndGetLo(-1) == 0)
{
Callback.Completable callback = closeCallback;
// Only send the close frame if we can flip Hi, see shutdown().
if (callback != null && streamCount.compareAndSetHi(0, 1))
control(null, callback, closeFrame);
}
}
@Override
@ -1321,8 +1356,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
case HEADERS:
{
onStreamOpened(stream);
HeadersFrame headersFrame = (HeadersFrame)frame;
if (headersFrame.getMetaData().isRequest())
onStreamOpened(stream);
if (stream.updateClose(headersFrame.isEndStream(), CloseState.Event.AFTER_SEND))
removeStream(stream);
break;
@ -1598,12 +1634,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
private class CloseCallback extends Callback.Nested
private class FailureCallback extends Callback.Nested
{
private final int error;
private final String reason;
private CloseCallback(int error, String reason, Callback callback)
private FailureCallback(int error, String reason, Callback callback)
{
super(callback);
this.error = error;

View File

@ -322,7 +322,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
{
// It's a bad client, it does not deserve to be
// treated gently by just resetting the stream.
session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.NOOP);
((HTTP2Session)session).onConnectionFailure(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded");
callback.failed(new IOException("stream_window_exceeded"));
return;
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
@ -151,4 +152,12 @@ public interface ISession extends Session
* @param callback the callback to notify when the frame has been processed
*/
public void onData(DataFrame frame, Callback callback);
/**
* <p>Gracefully closes the session, returning a {@code CompletableFuture} that
* is completed when all the streams currently being processed are completed.</p>
*
* @return a {@code CompletableFuture} that is completed when all the streams are completed
*/
public CompletableFuture<Void> shutdown();
}

View File

@ -24,11 +24,14 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.Frame;
@ -46,6 +49,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.util.component.LifeCycle;
@ManagedObject
@ -296,22 +300,25 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
}
@ManagedObject("The container of HTTP/2 sessions")
public static class HTTP2SessionContainer implements Connection.Listener, Dumpable
public static class HTTP2SessionContainer implements Connection.Listener, Graceful, Dumpable
{
private final Set<Session> sessions = ConcurrentHashMap.newKeySet();
private final Set<ISession> sessions = ConcurrentHashMap.newKeySet();
private final AtomicReference<CompletableFuture<Void>> shutdown = new AtomicReference<>();
@Override
public void onOpened(Connection connection)
{
Session session = ((HTTP2Connection)connection).getSession();
ISession session = ((HTTP2Connection)connection).getSession();
sessions.add(session);
LifeCycle.start(session);
if (isShutdown())
shutdown(session);
}
@Override
public void onClosed(Connection connection)
{
Session session = ((HTTP2Connection)connection).getSession();
ISession session = ((HTTP2Connection)connection).getSession();
if (sessions.remove(session))
LifeCycle.stop(session);
}
@ -327,6 +334,39 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
return sessions.size();
}
@Override
public CompletableFuture<Void> shutdown()
{
CompletableFuture<Void> result = new CompletableFuture<>();
if (shutdown.compareAndSet(null, result))
{
CompletableFuture.allOf(sessions.stream().map(this::shutdown).toArray(CompletableFuture[]::new))
.whenComplete((v, x) ->
{
if (x == null)
result.complete(v);
else
result.completeExceptionally(x);
});
return result;
}
else
{
return shutdown.get();
}
}
@Override
public boolean isShutdown()
{
return shutdown.get() != null;
}
private CompletableFuture<Void> shutdown(ISession session)
{
return session.shutdown();
}
@Override
public String dump()
{

View File

@ -104,23 +104,30 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
}
else
{
stream = createRemoteStream(streamId, (MetaData.Request)metaData);
if (stream != null)
if (isClosed())
{
onStreamOpened(stream);
if (metaData instanceof MetaData.ConnectRequest)
reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP);
}
else
{
stream = createRemoteStream(streamId, (MetaData.Request)metaData);
if (stream != null)
{
if (!isConnectProtocolEnabled() && ((MetaData.ConnectRequest)metaData).getProtocol() != null)
{
stream.reset(new ResetFrame(streamId, ErrorCode.PROTOCOL_ERROR.code), Callback.NOOP);
return;
}
}
onStreamOpened(stream);
stream.process(frame, Callback.NOOP);
Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener);
if (metaData instanceof MetaData.ConnectRequest)
{
if (!isConnectProtocolEnabled() && ((MetaData.ConnectRequest)metaData).getProtocol() != null)
{
stream.reset(new ResetFrame(streamId, ErrorCode.PROTOCOL_ERROR.code), Callback.NOOP);
return;
}
}
stream.process(frame, Callback.NOOP);
Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener);
}
}
}
}