diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 881e0b6ae14..504da0c8ef6 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -181,6 +181,7 @@ public class HTTP2Connection extends AbstractConnection { private final Callback fillableCallback = new FillableCallback(); private ByteBuffer buffer; + private boolean shutdown; @Override public Runnable produce() @@ -191,7 +192,7 @@ public class HTTP2Connection extends AbstractConnection if (task != null) return task; - if (isFillInterested()) + if (isFillInterested() || shutdown) return null; if (buffer == null) @@ -227,6 +228,7 @@ public class HTTP2Connection extends AbstractConnection else if (filled < 0) { release(); + shutdown = true; session.onShutdown(); return null; } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 7859c238946..e9516a487ed 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -435,8 +435,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { // We received a GO_AWAY, so try to write // what's in the queue and then disconnect. - notifyClose(this, frame); - control(null, Callback.NOOP, new DisconnectFrame()); + notifyClose(this, frame, new DisconnectCallback()); return; } break; @@ -476,8 +475,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void onConnectionFailure(int error, String reason) { - notifyFailure(this, new IOException(String.format("%d/%s", error, reason))); - close(error, reason, Callback.NOOP); + notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason)); } @Override @@ -1003,8 +1001,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio protected void abort(Throwable failure) { - notifyFailure(this, failure); - terminate(failure); + notifyFailure(this, failure, new TerminateCallback(failure)); } public boolean isDisconnected() @@ -1066,11 +1063,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - protected void notifyClose(Session session, GoAwayFrame frame) + protected void notifyClose(Session session, GoAwayFrame frame, Callback callback) { try { - listener.onClose(session, frame); + listener.onClose(session, frame, callback); } catch (Throwable x) { @@ -1091,11 +1088,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - protected void notifyFailure(Session session, Throwable failure) + protected void notifyFailure(Session session, Throwable failure, Callback callback) { try { - listener.onFailure(session, failure); + listener.onFailure(session, failure, callback); } catch (Throwable x) { @@ -1330,4 +1327,99 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio promise.failed(x); } } + + private class CloseCallback implements Callback + { + private final int error; + private final String reason; + + private CloseCallback(int error, String reason) + { + this.error = error; + this.reason = reason; + } + + @Override + public void succeeded() + { + complete(); + } + + @Override + public void failed(Throwable x) + { + complete(); + } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + + private void complete() + { + close(error, reason, Callback.NOOP); + } + } + + private class DisconnectCallback implements Callback + { + @Override + public void succeeded() + { + complete(); + } + + @Override + public void failed(Throwable x) + { + complete(); + } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + + private void complete() + { + control(null, Callback.NOOP, new DisconnectFrame()); + } + } + + private class TerminateCallback implements Callback + { + private final Throwable failure; + + private TerminateCallback(Throwable failure) + { + this.failure = failure; + } + + @Override + public void succeeded() + { + complete(); + } + + @Override + public void failed(Throwable x) + { + failure.addSuppressed(x); + complete(); + } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + + private void complete() + { + terminate(failure); + } + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java index e2c4dbb9ba9..10642d7351d 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java @@ -195,9 +195,23 @@ public interface Session /** *

Callback method invoked when a GOAWAY frame has been received.

* - * @param session the session - * @param frame the GOAWAY frame received + * @param session the session + * @param frame the GOAWAY frame received + * @param callback the callback to notify of the GOAWAY processing */ + public default void onClose(Session session, GoAwayFrame frame, Callback callback) + { + try + { + onClose(session, frame); + callback.succeeded(); + } + catch (Throwable x) + { + callback.failed(x); + } + } + public void onClose(Session session, GoAwayFrame frame); /** @@ -210,9 +224,23 @@ public interface Session /** *

Callback method invoked when a failure has been detected for this session.

* - * @param session the session - * @param failure the failure + * @param session the session + * @param failure the failure + * @param callback the callback to notify of failure processing */ + public default void onFailure(Session session, Throwable failure, Callback callback) + { + try + { + onFailure(session, failure); + callback.succeeded(); + } + catch (Throwable x) + { + callback.failed(x); + } + } + public void onFailure(Session session, Throwable failure); /** diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index a06550195cf..dad81238067 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Queue; import java.util.concurrent.atomic.AtomicLong; @@ -55,6 +56,7 @@ import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.CountingCallback; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; @@ -205,17 +207,21 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection return result; } - public void onStreamFailure(IStream stream, Throwable failure) + public void onStreamFailure(IStream stream, Throwable failure, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("Processing failure on {}: {}", stream, failure); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); if (channel != null) { - Runnable task = channel.onFailure(failure); + Runnable task = channel.onFailure(failure, callback); if (task != null) offerTask(task, true); } + else + { + callback.succeeded(); + } } public boolean onSessionTimeout(Throwable failure) @@ -233,13 +239,22 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection return result; } - public void onSessionFailure(Throwable failure) + public void onSessionFailure(Throwable failure, Callback callback) { ISession session = getSession(); if (LOG.isDebugEnabled()) LOG.debug("Processing failure on {}: {}", session, failure); - for (Stream stream : session.getStreams()) - onStreamFailure((IStream)stream, failure); + Collection streams = session.getStreams(); + if (streams.isEmpty()) + { + callback.succeeded(); + } + else + { + CountingCallback counter = new CountingCallback(callback, streams.size()); + for (Stream stream : streams) + onStreamFailure((IStream)stream, failure, counter); + } } public void push(Connector connector, IStream stream, MetaData.Request request) diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java index 7b5356b1707..10c654e7bc9 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java @@ -122,7 +122,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF } @Override - public void onClose(Session session, GoAwayFrame frame) + public void onClose(Session session, GoAwayFrame frame, Callback callback) { ErrorCode error = ErrorCode.from(frame.getError()); if (error == null) @@ -130,13 +130,13 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF String reason = frame.tryConvertPayload(); if (reason != null && !reason.isEmpty()) reason = " (" + reason + ")"; - getConnection().onSessionFailure(new EofException("HTTP/2 " + error + reason)); + getConnection().onSessionFailure(new EofException("HTTP/2 " + error + reason), callback); } @Override - public void onFailure(Session session, Throwable failure) + public void onFailure(Session session, Throwable failure, Callback callback) { - getConnection().onSessionFailure(failure); + getConnection().onSessionFailure(failure, callback); } @Override @@ -168,7 +168,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF ErrorCode error = ErrorCode.from(frame.getError()); if (error == null) error = ErrorCode.CANCEL_STREAM_ERROR; - getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error)); + getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error), Callback.NOOP); } @Override diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index 74f9643a55b..93506c31c74 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -322,18 +322,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable return result; } - public Runnable onFailure(Throwable failure) + public Runnable onFailure(Throwable failure, Callback callback) { getHttpTransport().onStreamFailure(failure); boolean handle = getRequest().getHttpInput().failed(failure); consumeInput(); - return () -> - { - if (handle) - handleWithContext(); - else - getState().asyncError(failure); - }; + return new FailureTask(failure, callback, handle); } protected void consumeInput() @@ -394,4 +388,35 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable streamId = stream.getId(); return String.format("%s#%d", super.toString(), getStream() == null ? -1 : streamId); } + + private class FailureTask implements Runnable + { + private final Throwable failure; + private final Callback callback; + private final boolean handle; + + public FailureTask(Throwable failure, Callback callback, boolean handle) + { + this.failure = failure; + this.callback = callback; + this.handle = handle; + } + + @Override + public void run() + { + try + { + if (handle) + handleWithContext(); + else + getState().asyncError(failure); + callback.succeeded(); + } + catch (Throwable x) + { + callback.failed(x); + } + } + } } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index 7b1c339ff62..6ada43f3221 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -325,14 +325,12 @@ public class HttpTransportOverHTTP2 implements HttpTransport synchronized (this) { commit = this.commit; - State state = this.state; - this.state = State.FAILED; - if (this.failure == null) - this.failure = failure; - else - this.failure.addSuppressed(failure); + // Only fail pending writes, as we + // may need to write an error page. if (state == State.WRITING) { + this.state = State.FAILED; + this.failure = failure; callback = this.callback; this.callback = null; } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index b270ee510bc..73c1899901c 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -36,6 +36,11 @@ public interface Callback extends Invocable */ static Callback NOOP = new Callback() { + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } }; /** diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/CountingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/CountingCallback.java index f6538d7621b..4530deeb23d 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/CountingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/CountingCallback.java @@ -45,6 +45,8 @@ public class CountingCallback extends Callback.Nested public CountingCallback(Callback callback, int count) { super(callback); + if (count < 1) + throw new IllegalArgumentException(); this.count = new AtomicInteger(count); }