Jetty 9.4.x 4855 h2spec failures (#4946)
* Fixes #4855 - Occasional h2spec failures on CI In case of bad usage of the HTTP/2 API, we don't want to close() the stream but just fail the callback, because the stream may be performing actions triggered by a legit API usage. In case of a call to `AsyncListener.onError()`, applications may decide to call AsyncContext.complete() and that would be a correct usage of the Servlet API. This case was not well handled and was wrongly producing a WARN log with an `IllegalStateException`. Completely rewritten `HttpTransportOverHTTP2.TransportCallback`. The rewrite handles correctly asynchronous failures that now are executed sequentially (and not concurrently) with writes. If a write is in progress, the failure will just change the state and at the end of the write a check on the state will determine what actions to take. A session failure is now handled in HTTP2Session by first failing all the streams - which notifies the Stream.Listeners - and then failing the session - which notifies the Session.Listener. The stream failures are executed concurrently by dispatching each one to a different thread; this means that the stream failure callbacks are executed concurrently (likely sending RST_STREAM frames). The session failure callback is completed only when all the stream failure callbacks have completed, to ensure that a GOAWAY frame is processed after all the RST_STREAM frames. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
7c1d290d89
commit
56bda1b3ae
|
@ -77,7 +77,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
||||||
return customize(connection, context);
|
return customize(connection, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
private class HTTP2ClientConnection extends HTTP2Connection implements Callback
|
private static class HTTP2ClientConnection extends HTTP2Connection implements Callback
|
||||||
{
|
{
|
||||||
private final HTTP2Client client;
|
private final HTTP2Client client;
|
||||||
private final Promise<Session> promise;
|
private final Promise<Session> promise;
|
||||||
|
@ -154,7 +154,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class ConnectionListener implements Connection.Listener
|
private static class ConnectionListener implements Connection.Listener
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void onOpened(Connection connection)
|
public void onOpened(Connection connection)
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.eclipse.jetty.http2.client;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import javax.servlet.http.HttpServlet;
|
import javax.servlet.http.HttpServlet;
|
||||||
|
|
||||||
import org.eclipse.jetty.http.HostPortHttpField;
|
import org.eclipse.jetty.http.HostPortHttpField;
|
||||||
|
@ -33,7 +32,7 @@ import org.eclipse.jetty.http2.FlowControlStrategy;
|
||||||
import org.eclipse.jetty.http2.api.Session;
|
import org.eclipse.jetty.http2.api.Session;
|
||||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||||
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
|
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
|
||||||
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
|
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
|
||||||
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
|
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
|
||||||
import org.eclipse.jetty.server.ConnectionFactory;
|
import org.eclipse.jetty.server.ConnectionFactory;
|
||||||
import org.eclipse.jetty.server.HttpConfiguration;
|
import org.eclipse.jetty.server.HttpConfiguration;
|
||||||
|
@ -54,7 +53,7 @@ public class AbstractTest
|
||||||
|
|
||||||
protected void start(HttpServlet servlet) throws Exception
|
protected void start(HttpServlet servlet) throws Exception
|
||||||
{
|
{
|
||||||
HTTP2ServerConnectionFactory connectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration());
|
HTTP2CServerConnectionFactory connectionFactory = new HTTP2CServerConnectionFactory(new HttpConfiguration());
|
||||||
connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||||
connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||||
prepareServer(connectionFactory);
|
prepareServer(connectionFactory);
|
||||||
|
|
|
@ -1,80 +0,0 @@
|
||||||
//
|
|
||||||
// ========================================================================
|
|
||||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
|
||||||
// ------------------------------------------------------------------------
|
|
||||||
// All rights reserved. This program and the accompanying materials
|
|
||||||
// are made available under the terms of the Eclipse Public License v1.0
|
|
||||||
// and Apache License v2.0 which accompanies this distribution.
|
|
||||||
//
|
|
||||||
// The Eclipse Public License is available at
|
|
||||||
// http://www.eclipse.org/legal/epl-v10.html
|
|
||||||
//
|
|
||||||
// The Apache License v2.0 is available at
|
|
||||||
// http://www.opensource.org/licenses/apache2.0.php
|
|
||||||
//
|
|
||||||
// You may elect to redistribute this code under either of these licenses.
|
|
||||||
// ========================================================================
|
|
||||||
//
|
|
||||||
|
|
||||||
package org.eclipse.jetty.http2.client;
|
|
||||||
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.ServerSocket;
|
|
||||||
import java.net.Socket;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.eclipse.jetty.http2.api.Session;
|
|
||||||
import org.eclipse.jetty.util.Promise;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
||||||
|
|
||||||
public class InvalidServerTest extends AbstractTest
|
|
||||||
{
|
|
||||||
@Test
|
|
||||||
public void testInvalidPreface() throws Exception
|
|
||||||
{
|
|
||||||
try (ServerSocket server = new ServerSocket(0))
|
|
||||||
{
|
|
||||||
prepareClient();
|
|
||||||
client.start();
|
|
||||||
|
|
||||||
CountDownLatch failureLatch = new CountDownLatch(1);
|
|
||||||
Promise.Completable<Session> promise = new Promise.Completable<>();
|
|
||||||
InetSocketAddress address = new InetSocketAddress("localhost", server.getLocalPort());
|
|
||||||
client.connect(address, new Session.Listener.Adapter()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void onFailure(Session session, Throwable failure)
|
|
||||||
{
|
|
||||||
failureLatch.countDown();
|
|
||||||
}
|
|
||||||
}, promise);
|
|
||||||
|
|
||||||
try (Socket socket = server.accept())
|
|
||||||
{
|
|
||||||
OutputStream output = socket.getOutputStream();
|
|
||||||
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));
|
|
||||||
|
|
||||||
Session session = promise.get(5, TimeUnit.SECONDS);
|
|
||||||
assertNotNull(session);
|
|
||||||
|
|
||||||
assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
|
|
||||||
|
|
||||||
// Verify that the client closed the socket.
|
|
||||||
InputStream input = socket.getInputStream();
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
int read = input.read();
|
|
||||||
if (read < 0)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -18,7 +18,11 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.http2.client;
|
package org.eclipse.jetty.http2.client;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.ServerSocket;
|
||||||
|
import java.net.Socket;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
@ -41,6 +45,7 @@ import org.eclipse.jetty.http2.ErrorCode;
|
||||||
import org.eclipse.jetty.http2.api.Session;
|
import org.eclipse.jetty.http2.api.Session;
|
||||||
import org.eclipse.jetty.http2.api.Stream;
|
import org.eclipse.jetty.http2.api.Stream;
|
||||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||||
|
import org.eclipse.jetty.http2.frames.FrameType;
|
||||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||||
import org.eclipse.jetty.http2.frames.PingFrame;
|
import org.eclipse.jetty.http2.frames.PingFrame;
|
||||||
import org.eclipse.jetty.http2.frames.PrefaceFrame;
|
import org.eclipse.jetty.http2.frames.PrefaceFrame;
|
||||||
|
@ -63,6 +68,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class PrefaceTest extends AbstractTest
|
public class PrefaceTest extends AbstractTest
|
||||||
|
@ -332,4 +338,71 @@ public class PrefaceTest extends AbstractTest
|
||||||
assertTrue(clientSettingsLatch.await(5, TimeUnit.SECONDS));
|
assertTrue(clientSettingsLatch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidServerPreface() throws Exception
|
||||||
|
{
|
||||||
|
try (ServerSocket server = new ServerSocket(0))
|
||||||
|
{
|
||||||
|
prepareClient();
|
||||||
|
client.start();
|
||||||
|
|
||||||
|
CountDownLatch failureLatch = new CountDownLatch(1);
|
||||||
|
Promise.Completable<Session> promise = new Promise.Completable<>();
|
||||||
|
InetSocketAddress address = new InetSocketAddress("localhost", server.getLocalPort());
|
||||||
|
client.connect(address, new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onFailure(Session session, Throwable failure)
|
||||||
|
{
|
||||||
|
failureLatch.countDown();
|
||||||
|
}
|
||||||
|
}, promise);
|
||||||
|
|
||||||
|
try (Socket socket = server.accept())
|
||||||
|
{
|
||||||
|
OutputStream output = socket.getOutputStream();
|
||||||
|
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));
|
||||||
|
|
||||||
|
Session session = promise.get(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(session);
|
||||||
|
|
||||||
|
assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Verify that the client closed the socket.
|
||||||
|
InputStream input = socket.getInputStream();
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
int read = input.read();
|
||||||
|
if (read < 0)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidClientPreface() throws Exception
|
||||||
|
{
|
||||||
|
start(new ServerSessionListener.Adapter());
|
||||||
|
|
||||||
|
try (Socket client = new Socket("localhost", connector.getLocalPort()))
|
||||||
|
{
|
||||||
|
OutputStream output = client.getOutputStream();
|
||||||
|
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));
|
||||||
|
output.flush();
|
||||||
|
|
||||||
|
byte[] bytes = new byte[1024];
|
||||||
|
InputStream input = client.getInputStream();
|
||||||
|
int read = input.read(bytes);
|
||||||
|
if (read < 0)
|
||||||
|
{
|
||||||
|
// Closing the connection without GOAWAY frame is fine.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int type = bytes[3];
|
||||||
|
assertEquals(FrameType.GO_AWAY.getType(), type);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -425,12 +425,21 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
||||||
super.failed(x);
|
super.failed(x);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return whether the entry is stale and must not be processed
|
||||||
|
*/
|
||||||
private boolean isStale()
|
private boolean isStale()
|
||||||
{
|
{
|
||||||
return !isProtocol() && stream != null && stream.isReset();
|
// If it is a protocol frame, process it.
|
||||||
|
if (isProtocolFrame(frame))
|
||||||
|
return false;
|
||||||
|
// It's an application frame; is the stream gone already?
|
||||||
|
if (stream == null)
|
||||||
|
return true;
|
||||||
|
return stream.isReset();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isProtocol()
|
private boolean isProtocolFrame(Frame frame)
|
||||||
{
|
{
|
||||||
switch (frame.getType())
|
switch (frame.getType())
|
||||||
{
|
{
|
||||||
|
|
|
@ -439,7 +439,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
||||||
// We received a GO_AWAY, so try to write
|
// We received a GO_AWAY, so try to write
|
||||||
// what's in the queue and then disconnect.
|
// what's in the queue and then disconnect.
|
||||||
closeFrame = frame;
|
closeFrame = frame;
|
||||||
notifyClose(this, frame, new DisconnectCallback());
|
onClose(frame, new DisconnectCallback());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -498,9 +498,15 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
||||||
public void onStreamFailure(int streamId, int error, String reason)
|
public void onStreamFailure(int streamId, int error, String reason)
|
||||||
{
|
{
|
||||||
Callback callback = new ResetCallback(streamId, error, Callback.NOOP);
|
Callback callback = new ResetCallback(streamId, error, Callback.NOOP);
|
||||||
|
Throwable failure = toFailure("Stream failure", error, reason);
|
||||||
|
onStreamFailure(streamId, error, reason, failure, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onStreamFailure(int streamId, int error, String reason, Throwable failure, Callback callback)
|
||||||
|
{
|
||||||
IStream stream = getStream(streamId);
|
IStream stream = getStream(streamId);
|
||||||
if (stream != null)
|
if (stream != null)
|
||||||
stream.process(new FailureFrame(error, reason), callback);
|
stream.process(new FailureFrame(error, reason, failure), callback);
|
||||||
else
|
else
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
}
|
}
|
||||||
|
@ -513,7 +519,45 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
||||||
|
|
||||||
protected void onConnectionFailure(int error, String reason, Callback callback)
|
protected void onConnectionFailure(int error, String reason, Callback callback)
|
||||||
{
|
{
|
||||||
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason, callback));
|
Throwable failure = toFailure("Session failure", error, reason);
|
||||||
|
onFailure(error, reason, failure, new CloseCallback(error, reason, callback));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void abort(Throwable failure)
|
||||||
|
{
|
||||||
|
onFailure(ErrorCode.NO_ERROR.code, null, failure, new TerminateCallback(failure));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onFailure(int error, String reason, Throwable failure, Callback callback)
|
||||||
|
{
|
||||||
|
Collection<Stream> streams = getStreams();
|
||||||
|
int count = streams.size();
|
||||||
|
Callback countCallback = new CountingCallback(callback, count + 1);
|
||||||
|
for (Stream stream : streams)
|
||||||
|
{
|
||||||
|
onStreamFailure(stream.getId(), error, reason, failure, countCallback);
|
||||||
|
}
|
||||||
|
notifyFailure(this, failure, countCallback);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onClose(GoAwayFrame frame, Callback callback)
|
||||||
|
{
|
||||||
|
int error = frame.getError();
|
||||||
|
String reason = frame.tryConvertPayload();
|
||||||
|
Throwable failure = toFailure("Session close", error, reason);
|
||||||
|
Collection<Stream> streams = getStreams();
|
||||||
|
int count = streams.size();
|
||||||
|
Callback countCallback = new CountingCallback(callback, count + 1);
|
||||||
|
for (Stream stream : streams)
|
||||||
|
{
|
||||||
|
onStreamFailure(stream.getId(), error, reason, failure, countCallback);
|
||||||
|
}
|
||||||
|
notifyClose(this, frame, countCallback);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Throwable toFailure(String message, int error, String reason)
|
||||||
|
{
|
||||||
|
return new IOException(String.format("%s %s/%s", message, ErrorCode.toString(error, null), reason));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -998,11 +1042,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void abort(Throwable failure)
|
|
||||||
{
|
|
||||||
notifyFailure(this, failure, new TerminateCallback(failure));
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isDisconnected()
|
public boolean isDisconnected()
|
||||||
{
|
{
|
||||||
return !endPoint.isOpen();
|
return !endPoint.isOpen();
|
||||||
|
|
|
@ -138,7 +138,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||||
{
|
{
|
||||||
if (writing.compareAndSet(null, callback))
|
if (writing.compareAndSet(null, callback))
|
||||||
return true;
|
return true;
|
||||||
close();
|
|
||||||
callback.failed(new WritePendingException());
|
callback.failed(new WritePendingException());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -177,7 +176,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||||
public boolean isRemotelyClosed()
|
public boolean isRemotelyClosed()
|
||||||
{
|
{
|
||||||
CloseState state = closeState.get();
|
CloseState state = closeState.get();
|
||||||
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING;
|
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING || state == CloseState.CLOSED;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isLocallyClosed()
|
public boolean isLocallyClosed()
|
||||||
|
@ -358,6 +357,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||||
|
|
||||||
private void onFailure(FailureFrame frame, Callback callback)
|
private void onFailure(FailureFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
|
// Don't close or remove the stream, as the listener may
|
||||||
|
// want to use it, for example to send a RST_STREAM frame.
|
||||||
notifyFailure(this, frame, callback);
|
notifyFailure(this, frame, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -608,7 +609,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
listener.onFailure(stream, frame.getError(), frame.getReason(), callback);
|
listener.onFailure(stream, frame.getError(), frame.getReason(), frame.getFailure(), callback);
|
||||||
}
|
}
|
||||||
catch (Throwable x)
|
catch (Throwable x)
|
||||||
{
|
{
|
||||||
|
|
|
@ -227,8 +227,24 @@ public interface Stream
|
||||||
* @param stream the stream
|
* @param stream the stream
|
||||||
* @param error the error code
|
* @param error the error code
|
||||||
* @param reason the error reason, or null
|
* @param reason the error reason, or null
|
||||||
|
* @param failure the failure
|
||||||
* @param callback the callback to complete when the failure has been handled
|
* @param callback the callback to complete when the failure has been handled
|
||||||
*/
|
*/
|
||||||
|
default void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
|
||||||
|
{
|
||||||
|
onFailure(stream, error, reason, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Callback method invoked when the stream failed.</p>
|
||||||
|
*
|
||||||
|
* @param stream the stream
|
||||||
|
* @param error the error code
|
||||||
|
* @param reason the error reason, or null
|
||||||
|
* @param callback the callback to complete when the failure has been handled
|
||||||
|
* @deprecated use {@link #onFailure(Stream, int, String, Throwable, Callback)} instead
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
default void onFailure(Stream stream, int error, String reason, Callback callback)
|
default void onFailure(Stream stream, int error, String reason, Callback callback)
|
||||||
{
|
{
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
|
|
|
@ -22,12 +22,14 @@ public class FailureFrame extends Frame
|
||||||
{
|
{
|
||||||
private final int error;
|
private final int error;
|
||||||
private final String reason;
|
private final String reason;
|
||||||
|
private final Throwable failure;
|
||||||
|
|
||||||
public FailureFrame(int error, String reason)
|
public FailureFrame(int error, String reason, Throwable failure)
|
||||||
{
|
{
|
||||||
super(FrameType.FAILURE);
|
super(FrameType.FAILURE);
|
||||||
this.error = error;
|
this.error = error;
|
||||||
this.reason = reason;
|
this.reason = reason;
|
||||||
|
this.failure = failure;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getError()
|
public int getError()
|
||||||
|
@ -39,4 +41,9 @@ public class FailureFrame extends Frame
|
||||||
{
|
{
|
||||||
return reason;
|
return reason;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Throwable getFailure()
|
||||||
|
{
|
||||||
|
return failure;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class Generator
|
||||||
this.generators[FrameType.WINDOW_UPDATE.getType()] = new WindowUpdateGenerator(headerGenerator);
|
this.generators[FrameType.WINDOW_UPDATE.getType()] = new WindowUpdateGenerator(headerGenerator);
|
||||||
this.generators[FrameType.CONTINUATION.getType()] = null; // Never generated explicitly.
|
this.generators[FrameType.CONTINUATION.getType()] = null; // Never generated explicitly.
|
||||||
this.generators[FrameType.PREFACE.getType()] = new PrefaceGenerator();
|
this.generators[FrameType.PREFACE.getType()] = new PrefaceGenerator();
|
||||||
this.generators[FrameType.DISCONNECT.getType()] = new DisconnectGenerator();
|
this.generators[FrameType.DISCONNECT.getType()] = new NoOpGenerator();
|
||||||
|
|
||||||
this.dataGenerator = new DataGenerator(headerGenerator);
|
this.dataGenerator = new DataGenerator(headerGenerator);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,9 +21,9 @@ package org.eclipse.jetty.http2.generator;
|
||||||
import org.eclipse.jetty.http2.frames.Frame;
|
import org.eclipse.jetty.http2.frames.Frame;
|
||||||
import org.eclipse.jetty.io.ByteBufferPool;
|
import org.eclipse.jetty.io.ByteBufferPool;
|
||||||
|
|
||||||
public class DisconnectGenerator extends FrameGenerator
|
public class NoOpGenerator extends FrameGenerator
|
||||||
{
|
{
|
||||||
public DisconnectGenerator()
|
public NoOpGenerator()
|
||||||
{
|
{
|
||||||
super(null);
|
super(null);
|
||||||
}
|
}
|
|
@ -195,9 +195,9 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Stream stream, int error, String reason, Callback callback)
|
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
|
||||||
{
|
{
|
||||||
responseFailure(new IOException(String.format("%s/%s", ErrorCode.toString(error, null), reason)));
|
responseFailure(failure);
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
@ -41,7 +40,6 @@ import org.eclipse.jetty.http2.ErrorCode;
|
||||||
import org.eclipse.jetty.http2.HTTP2Connection;
|
import org.eclipse.jetty.http2.HTTP2Connection;
|
||||||
import org.eclipse.jetty.http2.ISession;
|
import org.eclipse.jetty.http2.ISession;
|
||||||
import org.eclipse.jetty.http2.IStream;
|
import org.eclipse.jetty.http2.IStream;
|
||||||
import org.eclipse.jetty.http2.api.Stream;
|
|
||||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||||
import org.eclipse.jetty.http2.frames.DataFrame;
|
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||||
import org.eclipse.jetty.http2.frames.Frame;
|
import org.eclipse.jetty.http2.frames.Frame;
|
||||||
|
@ -58,7 +56,6 @@ import org.eclipse.jetty.server.Connector;
|
||||||
import org.eclipse.jetty.server.HttpConfiguration;
|
import org.eclipse.jetty.server.HttpConfiguration;
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.CountingCallback;
|
|
||||||
import org.eclipse.jetty.util.TypeUtil;
|
import org.eclipse.jetty.util.TypeUtil;
|
||||||
|
|
||||||
public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
|
public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
|
||||||
|
@ -214,14 +211,18 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
|
||||||
public void onStreamFailure(IStream stream, Throwable failure, Callback callback)
|
public void onStreamFailure(IStream stream, Throwable failure, Callback callback)
|
||||||
{
|
{
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Processing failure on {}: {}", stream, failure);
|
LOG.debug("Processing stream failure on {}", stream, failure);
|
||||||
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttachment();
|
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttachment();
|
||||||
if (channel != null)
|
if (channel != null)
|
||||||
{
|
{
|
||||||
Runnable task = channel.onFailure(failure, callback);
|
Runnable task = channel.onFailure(failure, callback);
|
||||||
if (task != null)
|
if (task != null)
|
||||||
|
{
|
||||||
|
// We must dispatch to another thread because the task
|
||||||
|
// may call application code that performs blocking I/O.
|
||||||
offerTask(task, true);
|
offerTask(task, true);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
|
@ -245,23 +246,11 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
|
||||||
|
|
||||||
public void onSessionFailure(Throwable failure, Callback callback)
|
public void onSessionFailure(Throwable failure, Callback callback)
|
||||||
{
|
{
|
||||||
ISession session = getSession();
|
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Processing failure on {}: {}", session, failure);
|
LOG.debug("Processing session failure on {}", getSession(), failure);
|
||||||
Collection<Stream> streams = session.getStreams();
|
// All the streams have already been failed, just succeed the callback.
|
||||||
if (streams.isEmpty())
|
|
||||||
{
|
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
CountingCallback counter = new CountingCallback(callback, streams.size());
|
|
||||||
for (Stream stream : streams)
|
|
||||||
{
|
|
||||||
onStreamFailure((IStream)stream, failure, counter);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void push(Connector connector, IStream stream, MetaData.Request request)
|
public void push(Connector connector, IStream stream, MetaData.Request request)
|
||||||
{
|
{
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.eclipse.jetty.http2.frames.PushPromiseFrame;
|
||||||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||||
import org.eclipse.jetty.io.EndPoint;
|
import org.eclipse.jetty.io.EndPoint;
|
||||||
import org.eclipse.jetty.io.EofException;
|
import org.eclipse.jetty.io.EofException;
|
||||||
|
import org.eclipse.jetty.io.QuietException;
|
||||||
import org.eclipse.jetty.server.Connector;
|
import org.eclipse.jetty.server.Connector;
|
||||||
import org.eclipse.jetty.server.HttpConfiguration;
|
import org.eclipse.jetty.server.HttpConfiguration;
|
||||||
import org.eclipse.jetty.server.NegotiatingServerConnection.CipherDiscriminator;
|
import org.eclipse.jetty.server.NegotiatingServerConnection.CipherDiscriminator;
|
||||||
|
@ -119,7 +120,8 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
|
||||||
String reason = frame.tryConvertPayload();
|
String reason = frame.tryConvertPayload();
|
||||||
if (!StringUtil.isEmpty(reason))
|
if (!StringUtil.isEmpty(reason))
|
||||||
reason = " (" + reason + ")";
|
reason = " (" + reason + ")";
|
||||||
getConnection().onSessionFailure(new EofException(String.format("Close %s/%s", ErrorCode.toString(frame.getError(), null), reason)), callback);
|
EofException failure = new EofException(String.format("Close %s/%s", ErrorCode.toString(frame.getError(), null), reason));
|
||||||
|
onFailure(session, failure, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -154,13 +156,21 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
|
||||||
@Override
|
@Override
|
||||||
public void onReset(Stream stream, ResetFrame frame, Callback callback)
|
public void onReset(Stream stream, ResetFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
getConnection().onStreamFailure((IStream)stream, new EofException("Reset " + ErrorCode.toString(frame.getError(), null)), callback);
|
EofException failure = new EofException("Reset " + ErrorCode.toString(frame.getError(), null));
|
||||||
|
onFailure(stream, failure, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Stream stream, int error, String reason, Callback callback)
|
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
|
||||||
{
|
{
|
||||||
getConnection().onStreamFailure((IStream)stream, new EofException(String.format("Failure %s/%s", ErrorCode.toString(error, null), reason)), callback);
|
if (!(failure instanceof QuietException))
|
||||||
|
failure = new EofException(failure);
|
||||||
|
onFailure(stream, failure, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onFailure(Stream stream, Throwable failure, Callback callback)
|
||||||
|
{
|
||||||
|
getConnection().onStreamFailure((IStream)stream, failure, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.server;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.eclipse.jetty.http.BadMessageException;
|
import org.eclipse.jetty.http.BadMessageException;
|
||||||
|
@ -103,8 +104,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (transportCallback.start(callback, false))
|
transportCallback.send(callback, false, c ->
|
||||||
sendHeadersFrame(info, false, transportCallback);
|
sendHeadersFrame(info, false, c));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -138,24 +139,24 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
||||||
HttpFields trailers = retrieveTrailers();
|
HttpFields trailers = retrieveTrailers();
|
||||||
if (trailers != null)
|
if (trailers != null)
|
||||||
{
|
{
|
||||||
if (transportCallback.start(new SendTrailers(getCallback(), trailers), false))
|
transportCallback.send(new SendTrailers(getCallback(), trailers), false, c ->
|
||||||
sendDataFrame(content, true, false, transportCallback);
|
sendDataFrame(content, true, false, c));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (transportCallback.start(getCallback(), false))
|
transportCallback.send(getCallback(), false, c ->
|
||||||
sendDataFrame(content, true, true, transportCallback);
|
sendDataFrame(content, true, true, c));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (transportCallback.start(getCallback(), false))
|
transportCallback.send(getCallback(), false, c ->
|
||||||
sendDataFrame(content, false, false, transportCallback);
|
sendDataFrame(content, false, false, c));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (transportCallback.start(commitCallback, true))
|
transportCallback.send(commitCallback, true, c ->
|
||||||
sendHeadersFrame(info, false, transportCallback);
|
sendHeadersFrame(info, false, c));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -164,19 +165,19 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
||||||
HttpFields trailers = retrieveTrailers();
|
HttpFields trailers = retrieveTrailers();
|
||||||
if (trailers != null)
|
if (trailers != null)
|
||||||
{
|
{
|
||||||
if (transportCallback.start(new SendTrailers(callback, trailers), true))
|
transportCallback.send(new SendTrailers(callback, trailers), true, c ->
|
||||||
sendHeadersFrame(info, false, transportCallback);
|
sendHeadersFrame(info, false, c));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (transportCallback.start(callback, true))
|
transportCallback.send(callback, true, c ->
|
||||||
sendHeadersFrame(info, true, transportCallback);
|
sendHeadersFrame(info, true, c));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (transportCallback.start(callback, true))
|
transportCallback.send(callback, true, c ->
|
||||||
sendHeadersFrame(info, false, transportCallback);
|
sendHeadersFrame(info, false, c));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -198,8 +199,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
||||||
SendTrailers sendTrailers = new SendTrailers(callback, trailers);
|
SendTrailers sendTrailers = new SendTrailers(callback, trailers);
|
||||||
if (hasContent)
|
if (hasContent)
|
||||||
{
|
{
|
||||||
if (transportCallback.start(sendTrailers, false))
|
transportCallback.send(sendTrailers, false, c ->
|
||||||
sendDataFrame(content, true, false, transportCallback);
|
sendDataFrame(content, true, false, c));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -208,14 +209,14 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (transportCallback.start(callback, false))
|
transportCallback.send(callback, false, c ->
|
||||||
sendDataFrame(content, true, true, transportCallback);
|
sendDataFrame(content, true, true, c));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (transportCallback.start(callback, false))
|
transportCallback.send(callback, false, c ->
|
||||||
sendDataFrame(content, false, false, transportCallback);
|
sendDataFrame(content, false, false, c));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -317,7 +318,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
||||||
|
|
||||||
public boolean onStreamTimeout(Throwable failure)
|
public boolean onStreamTimeout(Throwable failure)
|
||||||
{
|
{
|
||||||
return transportCallback.onIdleTimeout(failure);
|
return transportCallback.idleTimeout(failure);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -350,119 +351,359 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
||||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Callback that controls sends initiated by the transport, by eventually
|
||||||
|
* notifying a nested callback.</p>
|
||||||
|
* <p>There are 3 sources of concurrency after a send is initiated:</p>
|
||||||
|
* <ul>
|
||||||
|
* <li>the completion of the send operation, either success or failure</li>
|
||||||
|
* <li>an asynchronous failure coming from the read side such as a stream
|
||||||
|
* being reset, or the connection being closed</li>
|
||||||
|
* <li>an asynchronous idle timeout</li>
|
||||||
|
* </ul>
|
||||||
|
* <p>The last 2 cases may happen <em>during</em> a send, when the frames
|
||||||
|
* are being generated in the flusher.
|
||||||
|
* In such cases, this class must avoid that the nested callback is notified
|
||||||
|
* while the frame generation is in progress, because the nested callback
|
||||||
|
* may modify other states (such as clearing the {@code HttpOutput._buffer})
|
||||||
|
* that are accessed during frame generation.</p>
|
||||||
|
* <p>The solution implemented in this class works by splitting the send
|
||||||
|
* operation in 3 parts: {@code pre-send}, {@code send} and {@code post-send}.
|
||||||
|
* Asynchronous state changes happening during {@code send} are stored
|
||||||
|
* and only executed in {@code post-send}, therefore never interfering
|
||||||
|
* with frame generation.</p>
|
||||||
|
*
|
||||||
|
* @see State
|
||||||
|
*/
|
||||||
private class TransportCallback implements Callback
|
private class TransportCallback implements Callback
|
||||||
{
|
{
|
||||||
private State state = State.IDLE;
|
private State _state = State.IDLE;
|
||||||
private Callback callback;
|
private Callback _callback;
|
||||||
private Throwable failure;
|
private boolean _commit;
|
||||||
private boolean commit;
|
private Throwable _failure;
|
||||||
|
|
||||||
public boolean start(Callback callback, boolean commit)
|
private void reset(Throwable failure)
|
||||||
{
|
{
|
||||||
State state;
|
assert Thread.holdsLock(this);
|
||||||
|
_state = failure != null ? State.FAILED : State.IDLE;
|
||||||
|
_callback = null;
|
||||||
|
_commit = false;
|
||||||
|
_failure = failure;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void send(Callback callback, boolean commit, Consumer<Callback> sendFrame)
|
||||||
|
{
|
||||||
|
Throwable failure = sending(callback, commit);
|
||||||
|
if (failure == null)
|
||||||
|
{
|
||||||
|
sendFrame.accept(this);
|
||||||
|
pending();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
callback.failed(failure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Throwable sending(Callback callback, boolean commit)
|
||||||
|
{
|
||||||
|
synchronized (this)
|
||||||
|
{
|
||||||
|
switch (_state)
|
||||||
|
{
|
||||||
|
case IDLE:
|
||||||
|
{
|
||||||
|
_state = State.SENDING;
|
||||||
|
_callback = callback;
|
||||||
|
_commit = commit;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
case FAILED:
|
||||||
|
{
|
||||||
|
return _failure;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
return new IllegalStateException("Invalid transport state: " + _state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void pending()
|
||||||
|
{
|
||||||
|
Callback callback;
|
||||||
|
boolean commit;
|
||||||
Throwable failure;
|
Throwable failure;
|
||||||
synchronized (this)
|
synchronized (this)
|
||||||
{
|
{
|
||||||
state = this.state;
|
switch (_state)
|
||||||
failure = this.failure;
|
|
||||||
if (state == State.IDLE)
|
|
||||||
{
|
{
|
||||||
this.state = State.WRITING;
|
case SENDING:
|
||||||
this.callback = callback;
|
{
|
||||||
this.commit = commit;
|
// The send has not completed the callback yet,
|
||||||
return true;
|
// wait for succeeded() or failed() to be called.
|
||||||
|
_state = State.PENDING;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case SUCCEEDING:
|
||||||
|
{
|
||||||
|
// The send already completed successfully, but the
|
||||||
|
// call to succeeded() was delayed, so call it now.
|
||||||
|
callback = _callback;
|
||||||
|
commit = _commit;
|
||||||
|
failure = null;
|
||||||
|
reset(null);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case FAILING:
|
||||||
|
{
|
||||||
|
// The send already completed with a failure, but
|
||||||
|
// the call to failed() was delayed, so call it now.
|
||||||
|
callback = _callback;
|
||||||
|
commit = _commit;
|
||||||
|
failure = _failure;
|
||||||
|
reset(failure);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
callback = _callback;
|
||||||
|
commit = _commit;
|
||||||
|
failure = new IllegalStateException("Invalid transport state: " + _state);
|
||||||
|
reset(failure);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (failure == null)
|
if (failure == null)
|
||||||
failure = new IllegalStateException("Invalid transport state: " + state);
|
succeed(callback, commit);
|
||||||
callback.failed(failure);
|
else
|
||||||
return false;
|
fail(callback, commit, failure);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
{
|
{
|
||||||
|
Callback callback;
|
||||||
boolean commit;
|
boolean commit;
|
||||||
Callback callback = null;
|
|
||||||
synchronized (this)
|
synchronized (this)
|
||||||
{
|
{
|
||||||
commit = this.commit;
|
switch (_state)
|
||||||
if (state == State.WRITING)
|
|
||||||
{
|
{
|
||||||
this.state = State.IDLE;
|
case SENDING:
|
||||||
callback = this.callback;
|
{
|
||||||
this.callback = null;
|
_state = State.SUCCEEDING;
|
||||||
this.commit = false;
|
// Succeeding the callback will be done in postSend().
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case PENDING:
|
||||||
|
{
|
||||||
|
callback = _callback;
|
||||||
|
commit = _commit;
|
||||||
|
reset(null);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
// This thread lost the race to succeed the current
|
||||||
|
// send, as other threads likely already failed it.
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled())
|
}
|
||||||
LOG.debug("HTTP2 Response #{}/{} {} {}",
|
succeed(callback, commit);
|
||||||
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
|
|
||||||
commit ? "commit" : "flush",
|
|
||||||
callback == null ? "failure" : "success");
|
|
||||||
if (callback != null)
|
|
||||||
callback.succeeded();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable failure)
|
public void failed(Throwable failure)
|
||||||
{
|
{
|
||||||
boolean commit;
|
|
||||||
Callback callback;
|
Callback callback;
|
||||||
|
boolean commit;
|
||||||
synchronized (this)
|
synchronized (this)
|
||||||
{
|
{
|
||||||
commit = this.commit;
|
switch (_state)
|
||||||
this.state = State.FAILED;
|
{
|
||||||
callback = this.callback;
|
case SENDING:
|
||||||
this.callback = null;
|
{
|
||||||
this.failure = failure;
|
_state = State.FAILING;
|
||||||
|
_failure = failure;
|
||||||
|
// Failing the callback will be done in postSend().
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
case IDLE:
|
||||||
|
case PENDING:
|
||||||
|
{
|
||||||
|
callback = _callback;
|
||||||
|
commit = _commit;
|
||||||
|
reset(failure);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
// This thread lost the race to fail the current send,
|
||||||
|
// as other threads already succeeded or failed it.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fail(callback, commit, failure);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean idleTimeout(Throwable failure)
|
||||||
|
{
|
||||||
|
Callback callback;
|
||||||
|
boolean timeout;
|
||||||
|
synchronized (this)
|
||||||
|
{
|
||||||
|
switch (_state)
|
||||||
|
{
|
||||||
|
case PENDING:
|
||||||
|
{
|
||||||
|
// The send was started but idle timed out, fail it.
|
||||||
|
callback = _callback;
|
||||||
|
timeout = true;
|
||||||
|
reset(failure);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case IDLE:
|
||||||
|
// The application may be suspended, ignore the idle timeout.
|
||||||
|
case SENDING:
|
||||||
|
// A send has been started at the same time of an idle timeout;
|
||||||
|
// Ignore the idle timeout and let the write continue normally.
|
||||||
|
case SUCCEEDING:
|
||||||
|
case FAILING:
|
||||||
|
// An idle timeout during these transient states is ignored.
|
||||||
|
case FAILED:
|
||||||
|
// Already failed, ignore the idle timeout.
|
||||||
|
{
|
||||||
|
callback = null;
|
||||||
|
timeout = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
// Should not happen, but just in case.
|
||||||
|
callback = _callback;
|
||||||
|
if (callback == null)
|
||||||
|
callback = Callback.NOOP;
|
||||||
|
timeout = true;
|
||||||
|
failure = new IllegalStateException("Invalid transport state: " + _state, failure);
|
||||||
|
reset(failure);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
idleTimeout(callback, timeout, failure);
|
||||||
|
return timeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void succeed(Callback callback, boolean commit)
|
||||||
|
{
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(),
|
LOG.debug("HTTP2 Response #{}/{} {} success",
|
||||||
commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure);
|
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
|
||||||
|
commit ? "commit" : "flush");
|
||||||
|
callback.succeeded();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fail(Callback callback, boolean commit, Throwable failure)
|
||||||
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("HTTP2 Response #{}/{} {} failure",
|
||||||
|
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
|
||||||
|
commit ? "commit" : "flush",
|
||||||
|
failure);
|
||||||
if (callback != null)
|
if (callback != null)
|
||||||
callback.failed(failure);
|
callback.failed(failure);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean onIdleTimeout(Throwable failure)
|
private void idleTimeout(Callback callback, boolean timeout, Throwable failure)
|
||||||
{
|
{
|
||||||
boolean result;
|
|
||||||
Callback callback = null;
|
|
||||||
synchronized (this)
|
|
||||||
{
|
|
||||||
// Ignore idle timeouts if not writing,
|
|
||||||
// as the application may be suspended.
|
|
||||||
result = state == State.WRITING;
|
|
||||||
if (result)
|
|
||||||
{
|
|
||||||
this.state = State.TIMEOUT;
|
|
||||||
callback = this.callback;
|
|
||||||
this.callback = null;
|
|
||||||
this.failure = failure;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug(String.format("HTTP2 Response #%d/%h idle timeout %s", stream.getId(), stream.getSession(), result ? "expired" : "ignored"), failure);
|
LOG.debug("HTTP2 Response #{}/{} idle timeout {}",
|
||||||
if (result)
|
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
|
||||||
|
timeout ? "expired" : "ignored",
|
||||||
|
failure);
|
||||||
|
if (timeout)
|
||||||
callback.failed(failure);
|
callback.failed(failure);
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InvocationType getInvocationType()
|
|
||||||
{
|
|
||||||
Callback callback;
|
|
||||||
synchronized (this)
|
|
||||||
{
|
|
||||||
callback = this.callback;
|
|
||||||
}
|
|
||||||
return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Send states for {@link TransportCallback}.</p>
|
||||||
|
*
|
||||||
|
* @see TransportCallback
|
||||||
|
*/
|
||||||
private enum State
|
private enum State
|
||||||
{
|
{
|
||||||
IDLE, WRITING, FAILED, TIMEOUT
|
/**
|
||||||
|
* <p>No send initiated or in progress.</p>
|
||||||
|
* <p>Next states could be:</p>
|
||||||
|
* <ul>
|
||||||
|
* <li>{@link #SENDING}, when {@link TransportCallback#send(Callback, boolean, Consumer)}
|
||||||
|
* is called by the transport to initiate a send</li>
|
||||||
|
* <li>{@link #FAILED}, when {@link TransportCallback#failed(Throwable)}
|
||||||
|
* is called by an asynchronous failure</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
IDLE,
|
||||||
|
/**
|
||||||
|
* <p>A send is initiated; the nested callback in {@link TransportCallback}
|
||||||
|
* cannot be notified while in this state.</p>
|
||||||
|
* <p>Next states could be:</p>
|
||||||
|
* <ul>
|
||||||
|
* <li>{@link #SUCCEEDING}, when {@link TransportCallback#succeeded()}
|
||||||
|
* is called synchronously because the send succeeded</li>
|
||||||
|
* <li>{@link #FAILING}, when {@link TransportCallback#failed(Throwable)}
|
||||||
|
* is called synchronously because the send failed</li>
|
||||||
|
* <li>{@link #PENDING}, when {@link TransportCallback#pending()}
|
||||||
|
* is called before the send completes</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
SENDING,
|
||||||
|
/**
|
||||||
|
* <p>A send was initiated and is now pending, waiting for the {@link TransportCallback}
|
||||||
|
* to be notified of success or failure.</p>
|
||||||
|
* <p>Next states could be:</p>
|
||||||
|
* <ul>
|
||||||
|
* <li>{@link #IDLE}, when {@link TransportCallback#succeeded()}
|
||||||
|
* is called because the send succeeded</li>
|
||||||
|
* <li>{@link #FAILED}, when {@link TransportCallback#failed(Throwable)}
|
||||||
|
* is called because either the send failed, or an asynchronous failure happened</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
PENDING,
|
||||||
|
/**
|
||||||
|
* <p>A send was initiated and succeeded, but {@link TransportCallback#pending()}
|
||||||
|
* has not been called yet.</p>
|
||||||
|
* <p>This state indicates that the success actions (such as notifying the
|
||||||
|
* {@link TransportCallback} nested callback) must be performed when
|
||||||
|
* {@link TransportCallback#pending()} is called.</p>
|
||||||
|
* <p>Next states could be:</p>
|
||||||
|
* <ul>
|
||||||
|
* <li>{@link #IDLE}, when {@link TransportCallback#pending()}
|
||||||
|
* is called</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
SUCCEEDING,
|
||||||
|
/**
|
||||||
|
* <p>A send was initiated and failed, but {@link TransportCallback#pending()}
|
||||||
|
* has not been called yet.</p>
|
||||||
|
* <p>This state indicates that the failure actions (such as notifying the
|
||||||
|
* {@link TransportCallback} nested callback) must be performed when
|
||||||
|
* {@link TransportCallback#pending()} is called.</p>
|
||||||
|
* <p>Next states could be:</p>
|
||||||
|
* <ul>
|
||||||
|
* <li>{@link #FAILED}, when {@link TransportCallback#pending()}
|
||||||
|
* is called</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
FAILING,
|
||||||
|
/**
|
||||||
|
* <p>The terminal state indicating failure of the send.</p>
|
||||||
|
*/
|
||||||
|
FAILED
|
||||||
}
|
}
|
||||||
|
|
||||||
private class SendTrailers extends Callback.Nested
|
private class SendTrailers extends Callback.Nested
|
||||||
|
@ -478,8 +719,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
{
|
{
|
||||||
if (transportCallback.start(getCallback(), false))
|
transportCallback.send(getCallback(), false, c ->
|
||||||
sendTrailersFrame(new MetaData(HttpVersion.HTTP_2, trailers), transportCallback);
|
sendTrailersFrame(new MetaData(HttpVersion.HTTP_2, trailers), c));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -817,15 +817,18 @@ public class HttpChannelState
|
||||||
// check the actions of the listeners
|
// check the actions of the listeners
|
||||||
synchronized (this)
|
synchronized (this)
|
||||||
{
|
{
|
||||||
// If we are still async and nobody has called sendError
|
|
||||||
if (_requestState == RequestState.ASYNC && !_sendError)
|
if (_requestState == RequestState.ASYNC && !_sendError)
|
||||||
// Then the listeners did not invoke API methods
|
{
|
||||||
// and the container must provide a default error dispatch.
|
// The listeners did not invoke API methods and the
|
||||||
|
// container must provide a default error dispatch.
|
||||||
sendError(th);
|
sendError(th);
|
||||||
else
|
}
|
||||||
|
else if (_requestState != RequestState.COMPLETE)
|
||||||
|
{
|
||||||
LOG.warn("unhandled in state " + _requestState, new IllegalStateException(th));
|
LOG.warn("unhandled in state " + _requestState, new IllegalStateException(th));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void sendError(Throwable th)
|
private void sendError(Throwable th)
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue