From f9ff9e1226ce873d154ce44273dd0387373eaee6 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 22 Aug 2017 15:54:40 +0200 Subject: [PATCH] Issue #1759 - HTTP/2 producer can block in onReset. Because now the failures are asynchronous, code that was executed after invoking the failure listener must be now executed after the asynchronous processing done by the listener and therefore Callbacks are introduced. --- .../eclipse/jetty/http2/HTTP2Connection.java | 4 +- .../org/eclipse/jetty/http2/HTTP2Session.java | 94 +++++++++++++++++-- .../org/eclipse/jetty/http2/api/Session.java | 36 ++++++- .../http2/server/HTTP2ServerConnection.java | 25 ++++- .../server/HTTP2ServerConnectionFactory.java | 10 +- .../http2/server/HttpChannelOverHTTP2.java | 41 ++++++-- 6 files changed, 177 insertions(+), 33 deletions(-) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 7cabc376a82..676e0ad0737 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -175,6 +175,7 @@ public class HTTP2Connection extends AbstractConnection { private final Callback fillCallback = new FillCallback(); private ByteBuffer buffer; + private boolean shutdown; @Override public Runnable produce() @@ -185,7 +186,7 @@ public class HTTP2Connection extends AbstractConnection if (task != null) return task; - if (isFillInterested()) + if (isFillInterested() || shutdown) return null; if (buffer == null) @@ -221,6 +222,7 @@ public class HTTP2Connection extends AbstractConnection else if (filled < 0) { release(); + shutdown = true; session.onShutdown(); return null; } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 1d1772c190a..383b2e12c57 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -421,8 +421,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { // We received a GO_AWAY, so try to write // what's in the queue and then disconnect. - notifyClose(this, frame); - control(null, Callback.NOOP, new DisconnectFrame()); + notifyClose(this, frame, new DisconnectCallback()); return; } break; @@ -462,8 +461,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void onConnectionFailure(int error, String reason) { - notifyFailure(this, new IOException(String.format("%d/%s", error, reason))); - close(error, reason, Callback.NOOP); + notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason)); } @Override @@ -991,8 +989,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio protected void abort(Throwable failure) { - notifyFailure(this, failure); - terminate(failure); + notifyFailure(this, failure, new TerminateCallback(failure)); } public boolean isDisconnected() @@ -1054,11 +1051,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - protected void notifyClose(Session session, GoAwayFrame frame) + protected void notifyClose(Session session, GoAwayFrame frame, Callback callback) { try { - listener.onClose(session, frame); + listener.onClose(session, frame, callback); } catch (Throwable x) { @@ -1079,11 +1076,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - protected void notifyFailure(Session session, Throwable failure) + protected void notifyFailure(Session session, Throwable failure, Callback callback) { try { - listener.onFailure(session, failure); + listener.onFailure(session, failure, callback); } catch (Throwable x) { @@ -1322,4 +1319,81 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio promise.failed(x); } } + + private class CloseCallback implements Callback.NonBlocking + { + private final int error; + private final String reason; + + private CloseCallback(int error, String reason) + { + this.error = error; + this.reason = reason; + } + + @Override + public void succeeded() + { + complete(); + } + + @Override + public void failed(Throwable x) + { + complete(); + } + + private void complete() + { + close(error, reason, Callback.NOOP); + } + } + + private class DisconnectCallback implements Callback.NonBlocking + { + @Override + public void succeeded() + { + complete(); + } + + @Override + public void failed(Throwable x) + { + complete(); + } + + private void complete() + { + control(null, Callback.NOOP, new DisconnectFrame()); + } + } + + private class TerminateCallback implements Callback.NonBlocking + { + private final Throwable failure; + + private TerminateCallback(Throwable failure) + { + this.failure = failure; + } + + @Override + public void succeeded() + { + complete(); + } + + @Override + public void failed(Throwable x) + { + failure.addSuppressed(x); + complete(); + } + + private void complete() + { + terminate(failure); + } + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java index e2c4dbb9ba9..10642d7351d 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java @@ -195,9 +195,23 @@ public interface Session /** *

Callback method invoked when a GOAWAY frame has been received.

* - * @param session the session - * @param frame the GOAWAY frame received + * @param session the session + * @param frame the GOAWAY frame received + * @param callback the callback to notify of the GOAWAY processing */ + public default void onClose(Session session, GoAwayFrame frame, Callback callback) + { + try + { + onClose(session, frame); + callback.succeeded(); + } + catch (Throwable x) + { + callback.failed(x); + } + } + public void onClose(Session session, GoAwayFrame frame); /** @@ -210,9 +224,23 @@ public interface Session /** *

Callback method invoked when a failure has been detected for this session.

* - * @param session the session - * @param failure the failure + * @param session the session + * @param failure the failure + * @param callback the callback to notify of failure processing */ + public default void onFailure(Session session, Throwable failure, Callback callback) + { + try + { + onFailure(session, failure); + callback.succeeded(); + } + catch (Throwable x) + { + callback.failed(x); + } + } + public void onFailure(Session session, Throwable failure); /** diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index fbf1b9954ed..b981c5fb47a 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Queue; import java.util.concurrent.Executor; @@ -55,6 +56,7 @@ import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.CountingCallback; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.thread.ExecutionStrategy; @@ -162,17 +164,21 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection return result; } - public void onStreamFailure(IStream stream, Throwable failure) + public void onStreamFailure(IStream stream, Throwable failure, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("Processing failure on {}: {}", stream, failure); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); if (channel != null) { - Runnable task = channel.onFailure(failure); + Runnable task = channel.onFailure(failure, callback); if (task != null) offerTask(task, true); } + else + { + callback.succeeded(); + } } public boolean onSessionTimeout(Throwable failure) @@ -190,13 +196,22 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection return result; } - public void onSessionFailure(Throwable failure) + public void onSessionFailure(Throwable failure, Callback callback) { ISession session = getSession(); if (LOG.isDebugEnabled()) LOG.debug("Processing failure on {}: {}", session, failure); - for (Stream stream : session.getStreams()) - onStreamFailure((IStream)stream, failure); + Collection streams = session.getStreams(); + if (streams.isEmpty()) + { + callback.succeeded(); + } + else + { + CountingCallback counter = new CountingCallback(callback, streams.size()); + for (Stream stream : streams) + onStreamFailure((IStream)stream, failure, counter); + } } public void push(Connector connector, IStream stream, MetaData.Request request) diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java index f16b9a70be4..491b6c4ea7c 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java @@ -123,7 +123,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF } @Override - public void onClose(Session session, GoAwayFrame frame) + public void onClose(Session session, GoAwayFrame frame, Callback callback) { ErrorCode error = ErrorCode.from(frame.getError()); if (error == null) @@ -131,13 +131,13 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF String reason = frame.tryConvertPayload(); if (reason != null && !reason.isEmpty()) reason = " (" + reason + ")"; - getConnection().onSessionFailure(new EofException("HTTP/2 " + error + reason)); + getConnection().onSessionFailure(new EofException("HTTP/2 " + error + reason), callback); } @Override - public void onFailure(Session session, Throwable failure) + public void onFailure(Session session, Throwable failure, Callback callback) { - getConnection().onSessionFailure(failure); + getConnection().onSessionFailure(failure, callback); } @Override @@ -167,7 +167,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF ErrorCode error = ErrorCode.from(frame.getError()); if (error == null) error = ErrorCode.CANCEL_STREAM_ERROR; - getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error)); + getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error), Callback.NOOP); } @Override diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index 144a8cc8d36..79cdcc98290 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -300,18 +300,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel return result; } - public Runnable onFailure(Throwable failure) + public Runnable onFailure(Throwable failure, Callback callback) { getHttpTransport().onStreamFailure(failure); boolean handle = getRequest().getHttpInput().failed(failure); consumeInput(); - return () -> - { - if (handle) - handleWithContext(); - else - getState().asyncError(failure); - }; + return new FailureTask(failure, callback, handle); } protected void consumeInput() @@ -366,4 +360,35 @@ public class HttpChannelOverHTTP2 extends HttpChannel streamId = stream.getId(); return String.format("%s#%d", super.toString(), getStream() == null ? -1 : streamId); } + + private class FailureTask implements Runnable + { + private final Throwable failure; + private final Callback callback; + private final boolean handle; + + public FailureTask(Throwable failure, Callback callback, boolean handle) + { + this.failure = failure; + this.callback = callback; + this.handle = handle; + } + + @Override + public void run() + { + try + { + if (handle) + handleWithContext(); + else + getState().asyncError(failure); + callback.succeeded(); + } + catch (Throwable x) + { + callback.failed(x); + } + } + } }