diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 7cabc376a82..676e0ad0737 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -175,6 +175,7 @@ public class HTTP2Connection extends AbstractConnection { private final Callback fillCallback = new FillCallback(); private ByteBuffer buffer; + private boolean shutdown; @Override public Runnable produce() @@ -185,7 +186,7 @@ public class HTTP2Connection extends AbstractConnection if (task != null) return task; - if (isFillInterested()) + if (isFillInterested() || shutdown) return null; if (buffer == null) @@ -221,6 +222,7 @@ public class HTTP2Connection extends AbstractConnection else if (filled < 0) { release(); + shutdown = true; session.onShutdown(); return null; } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 1d1772c190a..383b2e12c57 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -421,8 +421,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { // We received a GO_AWAY, so try to write // what's in the queue and then disconnect. - notifyClose(this, frame); - control(null, Callback.NOOP, new DisconnectFrame()); + notifyClose(this, frame, new DisconnectCallback()); return; } break; @@ -462,8 +461,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void onConnectionFailure(int error, String reason) { - notifyFailure(this, new IOException(String.format("%d/%s", error, reason))); - close(error, reason, Callback.NOOP); + notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason)); } @Override @@ -991,8 +989,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio protected void abort(Throwable failure) { - notifyFailure(this, failure); - terminate(failure); + notifyFailure(this, failure, new TerminateCallback(failure)); } public boolean isDisconnected() @@ -1054,11 +1051,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - protected void notifyClose(Session session, GoAwayFrame frame) + protected void notifyClose(Session session, GoAwayFrame frame, Callback callback) { try { - listener.onClose(session, frame); + listener.onClose(session, frame, callback); } catch (Throwable x) { @@ -1079,11 +1076,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - protected void notifyFailure(Session session, Throwable failure) + protected void notifyFailure(Session session, Throwable failure, Callback callback) { try { - listener.onFailure(session, failure); + listener.onFailure(session, failure, callback); } catch (Throwable x) { @@ -1322,4 +1319,81 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio promise.failed(x); } } + + private class CloseCallback implements Callback.NonBlocking + { + private final int error; + private final String reason; + + private CloseCallback(int error, String reason) + { + this.error = error; + this.reason = reason; + } + + @Override + public void succeeded() + { + complete(); + } + + @Override + public void failed(Throwable x) + { + complete(); + } + + private void complete() + { + close(error, reason, Callback.NOOP); + } + } + + private class DisconnectCallback implements Callback.NonBlocking + { + @Override + public void succeeded() + { + complete(); + } + + @Override + public void failed(Throwable x) + { + complete(); + } + + private void complete() + { + control(null, Callback.NOOP, new DisconnectFrame()); + } + } + + private class TerminateCallback implements Callback.NonBlocking + { + private final Throwable failure; + + private TerminateCallback(Throwable failure) + { + this.failure = failure; + } + + @Override + public void succeeded() + { + complete(); + } + + @Override + public void failed(Throwable x) + { + failure.addSuppressed(x); + complete(); + } + + private void complete() + { + terminate(failure); + } + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java index e2c4dbb9ba9..10642d7351d 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java @@ -195,9 +195,23 @@ public interface Session /** *

Callback method invoked when a GOAWAY frame has been received.

* - * @param session the session - * @param frame the GOAWAY frame received + * @param session the session + * @param frame the GOAWAY frame received + * @param callback the callback to notify of the GOAWAY processing */ + public default void onClose(Session session, GoAwayFrame frame, Callback callback) + { + try + { + onClose(session, frame); + callback.succeeded(); + } + catch (Throwable x) + { + callback.failed(x); + } + } + public void onClose(Session session, GoAwayFrame frame); /** @@ -210,9 +224,23 @@ public interface Session /** *

Callback method invoked when a failure has been detected for this session.

* - * @param session the session - * @param failure the failure + * @param session the session + * @param failure the failure + * @param callback the callback to notify of failure processing */ + public default void onFailure(Session session, Throwable failure, Callback callback) + { + try + { + onFailure(session, failure); + callback.succeeded(); + } + catch (Throwable x) + { + callback.failed(x); + } + } + public void onFailure(Session session, Throwable failure); /** diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index fbf1b9954ed..b981c5fb47a 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Queue; import java.util.concurrent.Executor; @@ -55,6 +56,7 @@ import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.CountingCallback; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.thread.ExecutionStrategy; @@ -162,17 +164,21 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection return result; } - public void onStreamFailure(IStream stream, Throwable failure) + public void onStreamFailure(IStream stream, Throwable failure, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("Processing failure on {}: {}", stream, failure); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); if (channel != null) { - Runnable task = channel.onFailure(failure); + Runnable task = channel.onFailure(failure, callback); if (task != null) offerTask(task, true); } + else + { + callback.succeeded(); + } } public boolean onSessionTimeout(Throwable failure) @@ -190,13 +196,22 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection return result; } - public void onSessionFailure(Throwable failure) + public void onSessionFailure(Throwable failure, Callback callback) { ISession session = getSession(); if (LOG.isDebugEnabled()) LOG.debug("Processing failure on {}: {}", session, failure); - for (Stream stream : session.getStreams()) - onStreamFailure((IStream)stream, failure); + Collection streams = session.getStreams(); + if (streams.isEmpty()) + { + callback.succeeded(); + } + else + { + CountingCallback counter = new CountingCallback(callback, streams.size()); + for (Stream stream : streams) + onStreamFailure((IStream)stream, failure, counter); + } } public void push(Connector connector, IStream stream, MetaData.Request request) diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java index f16b9a70be4..491b6c4ea7c 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java @@ -123,7 +123,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF } @Override - public void onClose(Session session, GoAwayFrame frame) + public void onClose(Session session, GoAwayFrame frame, Callback callback) { ErrorCode error = ErrorCode.from(frame.getError()); if (error == null) @@ -131,13 +131,13 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF String reason = frame.tryConvertPayload(); if (reason != null && !reason.isEmpty()) reason = " (" + reason + ")"; - getConnection().onSessionFailure(new EofException("HTTP/2 " + error + reason)); + getConnection().onSessionFailure(new EofException("HTTP/2 " + error + reason), callback); } @Override - public void onFailure(Session session, Throwable failure) + public void onFailure(Session session, Throwable failure, Callback callback) { - getConnection().onSessionFailure(failure); + getConnection().onSessionFailure(failure, callback); } @Override @@ -167,7 +167,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF ErrorCode error = ErrorCode.from(frame.getError()); if (error == null) error = ErrorCode.CANCEL_STREAM_ERROR; - getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error)); + getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error), Callback.NOOP); } @Override diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index 144a8cc8d36..79cdcc98290 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -300,18 +300,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel return result; } - public Runnable onFailure(Throwable failure) + public Runnable onFailure(Throwable failure, Callback callback) { getHttpTransport().onStreamFailure(failure); boolean handle = getRequest().getHttpInput().failed(failure); consumeInput(); - return () -> - { - if (handle) - handleWithContext(); - else - getState().asyncError(failure); - }; + return new FailureTask(failure, callback, handle); } protected void consumeInput() @@ -366,4 +360,35 @@ public class HttpChannelOverHTTP2 extends HttpChannel streamId = stream.getId(); return String.format("%s#%d", super.toString(), getStream() == null ? -1 : streamId); } + + private class FailureTask implements Runnable + { + private final Throwable failure; + private final Callback callback; + private final boolean handle; + + public FailureTask(Throwable failure, Callback callback, boolean handle) + { + this.failure = failure; + this.callback = callback; + this.handle = handle; + } + + @Override + public void run() + { + try + { + if (handle) + handleWithContext(); + else + getState().asyncError(failure); + callback.succeeded(); + } + catch (Throwable x) + { + callback.failed(x); + } + } + } }