diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 66b6739e1a8..217f9c64387 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -77,7 +77,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory return customize(connection, context); } - private class HTTP2ClientConnection extends HTTP2Connection implements Callback + private static class HTTP2ClientConnection extends HTTP2Connection implements Callback { private final HTTP2Client client; private final Promise promise; @@ -154,7 +154,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory } } - private class ConnectionListener implements Connection.Listener + private static class ConnectionListener implements Connection.Listener { @Override public void onOpened(Connection connection) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java index 55898830c17..8e479eace11 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AbstractTest.java @@ -21,7 +21,6 @@ package org.eclipse.jetty.http2.client; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; - import javax.servlet.http.HttpServlet; import org.eclipse.jetty.http.HostPortHttpField; @@ -33,7 +32,7 @@ import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; -import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.HttpConfiguration; @@ -54,7 +53,7 @@ public class AbstractTest protected void start(HttpServlet servlet) throws Exception { - HTTP2ServerConnectionFactory connectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration()); + HTTP2CServerConnectionFactory connectionFactory = new HTTP2CServerConnectionFactory(new HttpConfiguration()); connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); prepareServer(connectionFactory); diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/InvalidServerTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/InvalidServerTest.java deleted file mode 100644 index fae29dcb27b..00000000000 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/InvalidServerTest.java +++ /dev/null @@ -1,80 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.http2.client; - -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.eclipse.jetty.http2.api.Session; -import org.eclipse.jetty.util.Promise; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class InvalidServerTest extends AbstractTest -{ - @Test - public void testInvalidPreface() throws Exception - { - try (ServerSocket server = new ServerSocket(0)) - { - prepareClient(); - client.start(); - - CountDownLatch failureLatch = new CountDownLatch(1); - Promise.Completable promise = new Promise.Completable<>(); - InetSocketAddress address = new InetSocketAddress("localhost", server.getLocalPort()); - client.connect(address, new Session.Listener.Adapter() - { - @Override - public void onFailure(Session session, Throwable failure) - { - failureLatch.countDown(); - } - }, promise); - - try (Socket socket = server.accept()) - { - OutputStream output = socket.getOutputStream(); - output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8)); - - Session session = promise.get(5, TimeUnit.SECONDS); - assertNotNull(session); - - assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); - - // Verify that the client closed the socket. - InputStream input = socket.getInputStream(); - while (true) - { - int read = input.read(); - if (read < 0) - break; - } - } - } - } -} diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PrefaceTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PrefaceTest.java index 71b9d6f18d5..01e90a6fc7a 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PrefaceTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PrefaceTest.java @@ -18,7 +18,11 @@ package org.eclipse.jetty.http2.client; +import java.io.InputStream; +import java.io.OutputStream; import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; @@ -41,6 +45,7 @@ import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PingFrame; import org.eclipse.jetty.http2.frames.PrefaceFrame; @@ -63,6 +68,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class PrefaceTest extends AbstractTest @@ -332,4 +338,71 @@ public class PrefaceTest extends AbstractTest assertTrue(clientSettingsLatch.await(5, TimeUnit.SECONDS)); } } + + @Test + public void testInvalidServerPreface() throws Exception + { + try (ServerSocket server = new ServerSocket(0)) + { + prepareClient(); + client.start(); + + CountDownLatch failureLatch = new CountDownLatch(1); + Promise.Completable promise = new Promise.Completable<>(); + InetSocketAddress address = new InetSocketAddress("localhost", server.getLocalPort()); + client.connect(address, new Session.Listener.Adapter() + { + @Override + public void onFailure(Session session, Throwable failure) + { + failureLatch.countDown(); + } + }, promise); + + try (Socket socket = server.accept()) + { + OutputStream output = socket.getOutputStream(); + output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8)); + + Session session = promise.get(5, TimeUnit.SECONDS); + assertNotNull(session); + + assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + + // Verify that the client closed the socket. + InputStream input = socket.getInputStream(); + while (true) + { + int read = input.read(); + if (read < 0) + break; + } + } + } + } + + @Test + public void testInvalidClientPreface() throws Exception + { + start(new ServerSessionListener.Adapter()); + + try (Socket client = new Socket("localhost", connector.getLocalPort())) + { + OutputStream output = client.getOutputStream(); + output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8)); + output.flush(); + + byte[] bytes = new byte[1024]; + InputStream input = client.getInputStream(); + int read = input.read(bytes); + if (read < 0) + { + // Closing the connection without GOAWAY frame is fine. + return; + } + + int type = bytes[3]; + assertEquals(FrameType.GO_AWAY.getType(), type); + } + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index aafbeb635e6..8766194a988 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -425,12 +425,21 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable super.failed(x); } + /** + * @return whether the entry is stale and must not be processed + */ private boolean isStale() { - return !isProtocol() && stream != null && stream.isReset(); + // If it is a protocol frame, process it. + if (isProtocolFrame(frame)) + return false; + // It's an application frame; is the stream gone already? + if (stream == null) + return true; + return stream.isReset(); } - private boolean isProtocol() + private boolean isProtocolFrame(Frame frame) { switch (frame.getType()) { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index b7a6aa21635..035c9dc04f1 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -439,7 +439,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio // We received a GO_AWAY, so try to write // what's in the queue and then disconnect. closeFrame = frame; - notifyClose(this, frame, new DisconnectCallback()); + onClose(frame, new DisconnectCallback()); return; } break; @@ -498,9 +498,15 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio public void onStreamFailure(int streamId, int error, String reason) { Callback callback = new ResetCallback(streamId, error, Callback.NOOP); + Throwable failure = toFailure("Stream failure", error, reason); + onStreamFailure(streamId, error, reason, failure, callback); + } + + private void onStreamFailure(int streamId, int error, String reason, Throwable failure, Callback callback) + { IStream stream = getStream(streamId); if (stream != null) - stream.process(new FailureFrame(error, reason), callback); + stream.process(new FailureFrame(error, reason, failure), callback); else callback.succeeded(); } @@ -513,7 +519,45 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio protected void onConnectionFailure(int error, String reason, Callback callback) { - notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason, callback)); + Throwable failure = toFailure("Session failure", error, reason); + onFailure(error, reason, failure, new CloseCallback(error, reason, callback)); + } + + protected void abort(Throwable failure) + { + onFailure(ErrorCode.NO_ERROR.code, null, failure, new TerminateCallback(failure)); + } + + private void onFailure(int error, String reason, Throwable failure, Callback callback) + { + Collection streams = getStreams(); + int count = streams.size(); + Callback countCallback = new CountingCallback(callback, count + 1); + for (Stream stream : streams) + { + onStreamFailure(stream.getId(), error, reason, failure, countCallback); + } + notifyFailure(this, failure, countCallback); + } + + private void onClose(GoAwayFrame frame, Callback callback) + { + int error = frame.getError(); + String reason = frame.tryConvertPayload(); + Throwable failure = toFailure("Session close", error, reason); + Collection streams = getStreams(); + int count = streams.size(); + Callback countCallback = new CountingCallback(callback, count + 1); + for (Stream stream : streams) + { + onStreamFailure(stream.getId(), error, reason, failure, countCallback); + } + notifyClose(this, frame, countCallback); + } + + private Throwable toFailure(String message, int error, String reason) + { + return new IOException(String.format("%s %s/%s", message, ErrorCode.toString(error, null), reason)); } @Override @@ -998,11 +1042,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - protected void abort(Throwable failure) - { - notifyFailure(this, failure, new TerminateCallback(failure)); - } - public boolean isDisconnected() { return !endPoint.isOpen(); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 08dc25f4ace..f8360ab0d09 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -138,7 +138,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa { if (writing.compareAndSet(null, callback)) return true; - close(); callback.failed(new WritePendingException()); return false; } @@ -177,7 +176,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa public boolean isRemotelyClosed() { CloseState state = closeState.get(); - return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING; + return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING || state == CloseState.CLOSED; } public boolean isLocallyClosed() @@ -358,6 +357,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa private void onFailure(FailureFrame frame, Callback callback) { + // Don't close or remove the stream, as the listener may + // want to use it, for example to send a RST_STREAM frame. notifyFailure(this, frame, callback); } @@ -608,7 +609,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa { try { - listener.onFailure(stream, frame.getError(), frame.getReason(), callback); + listener.onFailure(stream, frame.getError(), frame.getReason(), frame.getFailure(), callback); } catch (Throwable x) { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java index 083bae9a1b0..95d353bd54d 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java @@ -227,8 +227,24 @@ public interface Stream * @param stream the stream * @param error the error code * @param reason the error reason, or null + * @param failure the failure * @param callback the callback to complete when the failure has been handled */ + default void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback) + { + onFailure(stream, error, reason, callback); + } + + /** + *

Callback method invoked when the stream failed.

+ * + * @param stream the stream + * @param error the error code + * @param reason the error reason, or null + * @param callback the callback to complete when the failure has been handled + * @deprecated use {@link #onFailure(Stream, int, String, Throwable, Callback)} instead + */ + @Deprecated default void onFailure(Stream stream, int error, String reason, Callback callback) { callback.succeeded(); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/FailureFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/FailureFrame.java index 95c8dbadfbb..ea6de570d60 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/FailureFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/FailureFrame.java @@ -22,12 +22,14 @@ public class FailureFrame extends Frame { private final int error; private final String reason; + private final Throwable failure; - public FailureFrame(int error, String reason) + public FailureFrame(int error, String reason, Throwable failure) { super(FrameType.FAILURE); this.error = error; this.reason = reason; + this.failure = failure; } public int getError() @@ -39,4 +41,9 @@ public class FailureFrame extends Frame { return reason; } + + public Throwable getFailure() + { + return failure; + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/Generator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/Generator.java index f062041b723..4a1b8c6fa45 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/Generator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/Generator.java @@ -56,7 +56,7 @@ public class Generator this.generators[FrameType.WINDOW_UPDATE.getType()] = new WindowUpdateGenerator(headerGenerator); this.generators[FrameType.CONTINUATION.getType()] = null; // Never generated explicitly. this.generators[FrameType.PREFACE.getType()] = new PrefaceGenerator(); - this.generators[FrameType.DISCONNECT.getType()] = new DisconnectGenerator(); + this.generators[FrameType.DISCONNECT.getType()] = new NoOpGenerator(); this.dataGenerator = new DataGenerator(headerGenerator); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/DisconnectGenerator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/NoOpGenerator.java similarity index 92% rename from jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/DisconnectGenerator.java rename to jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/NoOpGenerator.java index e20c88d64cf..1168c4197ea 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/DisconnectGenerator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/NoOpGenerator.java @@ -21,9 +21,9 @@ package org.eclipse.jetty.http2.generator; import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.io.ByteBufferPool; -public class DisconnectGenerator extends FrameGenerator +public class NoOpGenerator extends FrameGenerator { - public DisconnectGenerator() + public NoOpGenerator() { super(null); } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index 563ad4632a3..854b6380538 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -195,9 +195,9 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen } @Override - public void onFailure(Stream stream, int error, String reason, Callback callback) + public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback) { - responseFailure(new IOException(String.format("%s/%s", ErrorCode.toString(error, null), reason))); + responseFailure(failure); callback.succeeded(); } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index 963a453776f..020c3afe2d3 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Base64; -import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.Queue; @@ -41,7 +40,6 @@ import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.HTTP2Connection; import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.IStream; -import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.Frame; @@ -58,7 +56,6 @@ import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.CountingCallback; import org.eclipse.jetty.util.TypeUtil; public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo @@ -214,13 +211,17 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection public void onStreamFailure(IStream stream, Throwable failure, Callback callback) { if (LOG.isDebugEnabled()) - LOG.debug("Processing failure on {}: {}", stream, failure); + LOG.debug("Processing stream failure on {}", stream, failure); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttachment(); if (channel != null) { Runnable task = channel.onFailure(failure, callback); if (task != null) + { + // We must dispatch to another thread because the task + // may call application code that performs blocking I/O. offerTask(task, true); + } } else { @@ -245,22 +246,10 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection public void onSessionFailure(Throwable failure, Callback callback) { - ISession session = getSession(); if (LOG.isDebugEnabled()) - LOG.debug("Processing failure on {}: {}", session, failure); - Collection streams = session.getStreams(); - if (streams.isEmpty()) - { - callback.succeeded(); - } - else - { - CountingCallback counter = new CountingCallback(callback, streams.size()); - for (Stream stream : streams) - { - onStreamFailure((IStream)stream, failure, counter); - } - } + LOG.debug("Processing session failure on {}", getSession(), failure); + // All the streams have already been failed, just succeed the callback. + callback.succeeded(); } public void push(Connector connector, IStream stream, MetaData.Request request) diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java index 09de52c2283..dc9f782c749 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java @@ -34,6 +34,7 @@ import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.QuietException; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.NegotiatingServerConnection.CipherDiscriminator; @@ -119,7 +120,8 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF String reason = frame.tryConvertPayload(); if (!StringUtil.isEmpty(reason)) reason = " (" + reason + ")"; - getConnection().onSessionFailure(new EofException(String.format("Close %s/%s", ErrorCode.toString(frame.getError(), null), reason)), callback); + EofException failure = new EofException(String.format("Close %s/%s", ErrorCode.toString(frame.getError(), null), reason)); + onFailure(session, failure, callback); } @Override @@ -154,13 +156,21 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF @Override public void onReset(Stream stream, ResetFrame frame, Callback callback) { - getConnection().onStreamFailure((IStream)stream, new EofException("Reset " + ErrorCode.toString(frame.getError(), null)), callback); + EofException failure = new EofException("Reset " + ErrorCode.toString(frame.getError(), null)); + onFailure(stream, failure, callback); } @Override - public void onFailure(Stream stream, int error, String reason, Callback callback) + public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback) { - getConnection().onStreamFailure((IStream)stream, new EofException(String.format("Failure %s/%s", ErrorCode.toString(error, null), reason)), callback); + if (!(failure instanceof QuietException)) + failure = new EofException(failure); + onFailure(stream, failure, callback); + } + + private void onFailure(Stream stream, Throwable failure, Callback callback) + { + getConnection().onStreamFailure((IStream)stream, failure, callback); } @Override diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index 6c9cc4343d2..af7c26db107 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.server; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Supplier; import org.eclipse.jetty.http.BadMessageException; @@ -103,8 +104,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport } else { - if (transportCallback.start(callback, false)) - sendHeadersFrame(info, false, transportCallback); + transportCallback.send(callback, false, c -> + sendHeadersFrame(info, false, c)); } } else @@ -138,24 +139,24 @@ public class HttpTransportOverHTTP2 implements HttpTransport HttpFields trailers = retrieveTrailers(); if (trailers != null) { - if (transportCallback.start(new SendTrailers(getCallback(), trailers), false)) - sendDataFrame(content, true, false, transportCallback); + transportCallback.send(new SendTrailers(getCallback(), trailers), false, c -> + sendDataFrame(content, true, false, c)); } else { - if (transportCallback.start(getCallback(), false)) - sendDataFrame(content, true, true, transportCallback); + transportCallback.send(getCallback(), false, c -> + sendDataFrame(content, true, true, c)); } } else { - if (transportCallback.start(getCallback(), false)) - sendDataFrame(content, false, false, transportCallback); + transportCallback.send(getCallback(), false, c -> + sendDataFrame(content, false, false, c)); } } }; - if (transportCallback.start(commitCallback, true)) - sendHeadersFrame(info, false, transportCallback); + transportCallback.send(commitCallback, true, c -> + sendHeadersFrame(info, false, c)); } else { @@ -164,19 +165,19 @@ public class HttpTransportOverHTTP2 implements HttpTransport HttpFields trailers = retrieveTrailers(); if (trailers != null) { - if (transportCallback.start(new SendTrailers(callback, trailers), true)) - sendHeadersFrame(info, false, transportCallback); + transportCallback.send(new SendTrailers(callback, trailers), true, c -> + sendHeadersFrame(info, false, c)); } else { - if (transportCallback.start(callback, true)) - sendHeadersFrame(info, true, transportCallback); + transportCallback.send(callback, true, c -> + sendHeadersFrame(info, true, c)); } } else { - if (transportCallback.start(callback, true)) - sendHeadersFrame(info, false, transportCallback); + transportCallback.send(callback, true, c -> + sendHeadersFrame(info, false, c)); } } } @@ -198,8 +199,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport SendTrailers sendTrailers = new SendTrailers(callback, trailers); if (hasContent) { - if (transportCallback.start(sendTrailers, false)) - sendDataFrame(content, true, false, transportCallback); + transportCallback.send(sendTrailers, false, c -> + sendDataFrame(content, true, false, c)); } else { @@ -208,14 +209,14 @@ public class HttpTransportOverHTTP2 implements HttpTransport } else { - if (transportCallback.start(callback, false)) - sendDataFrame(content, true, true, transportCallback); + transportCallback.send(callback, false, c -> + sendDataFrame(content, true, true, c)); } } else { - if (transportCallback.start(callback, false)) - sendDataFrame(content, false, false, transportCallback); + transportCallback.send(callback, false, c -> + sendDataFrame(content, false, false, c)); } } else @@ -317,7 +318,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport public boolean onStreamTimeout(Throwable failure) { - return transportCallback.onIdleTimeout(failure); + return transportCallback.idleTimeout(failure); } @Override @@ -350,119 +351,359 @@ public class HttpTransportOverHTTP2 implements HttpTransport stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); } + /** + *

Callback that controls sends initiated by the transport, by eventually + * notifying a nested callback.

+ *

There are 3 sources of concurrency after a send is initiated:

+ *
    + *
  • the completion of the send operation, either success or failure
  • + *
  • an asynchronous failure coming from the read side such as a stream + * being reset, or the connection being closed
  • + *
  • an asynchronous idle timeout
  • + *
+ *

The last 2 cases may happen during a send, when the frames + * are being generated in the flusher. + * In such cases, this class must avoid that the nested callback is notified + * while the frame generation is in progress, because the nested callback + * may modify other states (such as clearing the {@code HttpOutput._buffer}) + * that are accessed during frame generation.

+ *

The solution implemented in this class works by splitting the send + * operation in 3 parts: {@code pre-send}, {@code send} and {@code post-send}. + * Asynchronous state changes happening during {@code send} are stored + * and only executed in {@code post-send}, therefore never interfering + * with frame generation.

+ * + * @see State + */ private class TransportCallback implements Callback { - private State state = State.IDLE; - private Callback callback; - private Throwable failure; - private boolean commit; + private State _state = State.IDLE; + private Callback _callback; + private boolean _commit; + private Throwable _failure; - public boolean start(Callback callback, boolean commit) + private void reset(Throwable failure) { - State state; + assert Thread.holdsLock(this); + _state = failure != null ? State.FAILED : State.IDLE; + _callback = null; + _commit = false; + _failure = failure; + } + + private void send(Callback callback, boolean commit, Consumer sendFrame) + { + Throwable failure = sending(callback, commit); + if (failure == null) + { + sendFrame.accept(this); + pending(); + } + else + { + callback.failed(failure); + } + } + + private Throwable sending(Callback callback, boolean commit) + { + synchronized (this) + { + switch (_state) + { + case IDLE: + { + _state = State.SENDING; + _callback = callback; + _commit = commit; + return null; + } + case FAILED: + { + return _failure; + } + default: + { + return new IllegalStateException("Invalid transport state: " + _state); + } + } + } + } + + private void pending() + { + Callback callback; + boolean commit; Throwable failure; synchronized (this) { - state = this.state; - failure = this.failure; - if (state == State.IDLE) + switch (_state) { - this.state = State.WRITING; - this.callback = callback; - this.commit = commit; - return true; + case SENDING: + { + // The send has not completed the callback yet, + // wait for succeeded() or failed() to be called. + _state = State.PENDING; + return; + } + case SUCCEEDING: + { + // The send already completed successfully, but the + // call to succeeded() was delayed, so call it now. + callback = _callback; + commit = _commit; + failure = null; + reset(null); + break; + } + case FAILING: + { + // The send already completed with a failure, but + // the call to failed() was delayed, so call it now. + callback = _callback; + commit = _commit; + failure = _failure; + reset(failure); + break; + } + default: + { + callback = _callback; + commit = _commit; + failure = new IllegalStateException("Invalid transport state: " + _state); + reset(failure); + break; + } } } if (failure == null) - failure = new IllegalStateException("Invalid transport state: " + state); - callback.failed(failure); - return false; + succeed(callback, commit); + else + fail(callback, commit, failure); } @Override public void succeeded() { + Callback callback; boolean commit; - Callback callback = null; synchronized (this) { - commit = this.commit; - if (state == State.WRITING) + switch (_state) { - this.state = State.IDLE; - callback = this.callback; - this.callback = null; - this.commit = false; + case SENDING: + { + _state = State.SUCCEEDING; + // Succeeding the callback will be done in postSend(). + return; + } + case PENDING: + { + callback = _callback; + commit = _commit; + reset(null); + break; + } + default: + { + // This thread lost the race to succeed the current + // send, as other threads likely already failed it. + return; + } } } - if (LOG.isDebugEnabled()) - LOG.debug("HTTP2 Response #{}/{} {} {}", - stream.getId(), Integer.toHexString(stream.getSession().hashCode()), - commit ? "commit" : "flush", - callback == null ? "failure" : "success"); - if (callback != null) - callback.succeeded(); + succeed(callback, commit); } @Override public void failed(Throwable failure) { - boolean commit; Callback callback; + boolean commit; synchronized (this) { - commit = this.commit; - this.state = State.FAILED; - callback = this.callback; - this.callback = null; - this.failure = failure; + switch (_state) + { + case SENDING: + { + _state = State.FAILING; + _failure = failure; + // Failing the callback will be done in postSend(). + return; + } + case IDLE: + case PENDING: + { + callback = _callback; + commit = _commit; + reset(failure); + break; + } + default: + { + // This thread lost the race to fail the current send, + // as other threads already succeeded or failed it. + return; + } + } } + fail(callback, commit, failure); + } + + private boolean idleTimeout(Throwable failure) + { + Callback callback; + boolean timeout; + synchronized (this) + { + switch (_state) + { + case PENDING: + { + // The send was started but idle timed out, fail it. + callback = _callback; + timeout = true; + reset(failure); + break; + } + case IDLE: + // The application may be suspended, ignore the idle timeout. + case SENDING: + // A send has been started at the same time of an idle timeout; + // Ignore the idle timeout and let the write continue normally. + case SUCCEEDING: + case FAILING: + // An idle timeout during these transient states is ignored. + case FAILED: + // Already failed, ignore the idle timeout. + { + callback = null; + timeout = false; + break; + } + default: + { + // Should not happen, but just in case. + callback = _callback; + if (callback == null) + callback = Callback.NOOP; + timeout = true; + failure = new IllegalStateException("Invalid transport state: " + _state, failure); + reset(failure); + break; + } + } + } + idleTimeout(callback, timeout, failure); + return timeout; + } + + private void succeed(Callback callback, boolean commit) + { if (LOG.isDebugEnabled()) - LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(), - commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure); + LOG.debug("HTTP2 Response #{}/{} {} success", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + commit ? "commit" : "flush"); + callback.succeeded(); + } + + private void fail(Callback callback, boolean commit, Throwable failure) + { + if (LOG.isDebugEnabled()) + LOG.debug("HTTP2 Response #{}/{} {} failure", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + commit ? "commit" : "flush", + failure); if (callback != null) callback.failed(failure); } - private boolean onIdleTimeout(Throwable failure) + private void idleTimeout(Callback callback, boolean timeout, Throwable failure) { - boolean result; - Callback callback = null; - synchronized (this) - { - // Ignore idle timeouts if not writing, - // as the application may be suspended. - result = state == State.WRITING; - if (result) - { - this.state = State.TIMEOUT; - callback = this.callback; - this.callback = null; - this.failure = failure; - } - } if (LOG.isDebugEnabled()) - LOG.debug(String.format("HTTP2 Response #%d/%h idle timeout %s", stream.getId(), stream.getSession(), result ? "expired" : "ignored"), failure); - if (result) + LOG.debug("HTTP2 Response #{}/{} idle timeout {}", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + timeout ? "expired" : "ignored", + failure); + if (timeout) callback.failed(failure); - return result; - } - - @Override - public InvocationType getInvocationType() - { - Callback callback; - synchronized (this) - { - callback = this.callback; - } - return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType(); } } + /** + *

Send states for {@link TransportCallback}.

+ * + * @see TransportCallback + */ private enum State { - IDLE, WRITING, FAILED, TIMEOUT + /** + *

No send initiated or in progress.

+ *

Next states could be:

+ *
    + *
  • {@link #SENDING}, when {@link TransportCallback#send(Callback, boolean, Consumer)} + * is called by the transport to initiate a send
  • + *
  • {@link #FAILED}, when {@link TransportCallback#failed(Throwable)} + * is called by an asynchronous failure
  • + *
+ */ + IDLE, + /** + *

A send is initiated; the nested callback in {@link TransportCallback} + * cannot be notified while in this state.

+ *

Next states could be:

+ *
    + *
  • {@link #SUCCEEDING}, when {@link TransportCallback#succeeded()} + * is called synchronously because the send succeeded
  • + *
  • {@link #FAILING}, when {@link TransportCallback#failed(Throwable)} + * is called synchronously because the send failed
  • + *
  • {@link #PENDING}, when {@link TransportCallback#pending()} + * is called before the send completes
  • + *
+ */ + SENDING, + /** + *

A send was initiated and is now pending, waiting for the {@link TransportCallback} + * to be notified of success or failure.

+ *

Next states could be:

+ *
    + *
  • {@link #IDLE}, when {@link TransportCallback#succeeded()} + * is called because the send succeeded
  • + *
  • {@link #FAILED}, when {@link TransportCallback#failed(Throwable)} + * is called because either the send failed, or an asynchronous failure happened
  • + *
+ */ + PENDING, + /** + *

A send was initiated and succeeded, but {@link TransportCallback#pending()} + * has not been called yet.

+ *

This state indicates that the success actions (such as notifying the + * {@link TransportCallback} nested callback) must be performed when + * {@link TransportCallback#pending()} is called.

+ *

Next states could be:

+ *
    + *
  • {@link #IDLE}, when {@link TransportCallback#pending()} + * is called
  • + *
+ */ + SUCCEEDING, + /** + *

A send was initiated and failed, but {@link TransportCallback#pending()} + * has not been called yet.

+ *

This state indicates that the failure actions (such as notifying the + * {@link TransportCallback} nested callback) must be performed when + * {@link TransportCallback#pending()} is called.

+ *

Next states could be:

+ *
    + *
  • {@link #FAILED}, when {@link TransportCallback#pending()} + * is called
  • + *
+ */ + FAILING, + /** + *

The terminal state indicating failure of the send.

+ */ + FAILED } private class SendTrailers extends Callback.Nested @@ -478,8 +719,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport @Override public void succeeded() { - if (transportCallback.start(getCallback(), false)) - sendTrailersFrame(new MetaData(HttpVersion.HTTP_2, trailers), transportCallback); + transportCallback.send(getCallback(), false, c -> + sendTrailersFrame(new MetaData(HttpVersion.HTTP_2, trailers), c)); } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 8c7cf77a87b..60d494614b1 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -817,13 +817,16 @@ public class HttpChannelState // check the actions of the listeners synchronized (this) { - // If we are still async and nobody has called sendError if (_requestState == RequestState.ASYNC && !_sendError) - // Then the listeners did not invoke API methods - // and the container must provide a default error dispatch. + { + // The listeners did not invoke API methods and the + // container must provide a default error dispatch. sendError(th); - else + } + else if (_requestState != RequestState.COMPLETE) + { LOG.warn("unhandled in state " + _requestState, new IllegalStateException(th)); + } } }