Issue #1759 - HTTP/2 producer can block in onReset.

Because now the failures are asynchronous, code that was executed after
invoking the failure listener must be now executed after the
asynchronous processing done by the listener and therefore Callbacks
are introduced.
This commit is contained in:
Simone Bordet 2017-08-22 15:54:40 +02:00
parent 1711642309
commit f9ff9e1226
6 changed files with 177 additions and 33 deletions

View File

@ -175,6 +175,7 @@ public class HTTP2Connection extends AbstractConnection
{
private final Callback fillCallback = new FillCallback();
private ByteBuffer buffer;
private boolean shutdown;
@Override
public Runnable produce()
@ -185,7 +186,7 @@ public class HTTP2Connection extends AbstractConnection
if (task != null)
return task;
if (isFillInterested())
if (isFillInterested() || shutdown)
return null;
if (buffer == null)
@ -221,6 +222,7 @@ public class HTTP2Connection extends AbstractConnection
else if (filled < 0)
{
release();
shutdown = true;
session.onShutdown();
return null;
}

View File

@ -421,8 +421,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
// We received a GO_AWAY, so try to write
// what's in the queue and then disconnect.
notifyClose(this, frame);
control(null, Callback.NOOP, new DisconnectFrame());
notifyClose(this, frame, new DisconnectCallback());
return;
}
break;
@ -462,8 +461,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override
public void onConnectionFailure(int error, String reason)
{
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)));
close(error, reason, Callback.NOOP);
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason));
}
@Override
@ -991,8 +989,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
protected void abort(Throwable failure)
{
notifyFailure(this, failure);
terminate(failure);
notifyFailure(this, failure, new TerminateCallback(failure));
}
public boolean isDisconnected()
@ -1054,11 +1051,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
protected void notifyClose(Session session, GoAwayFrame frame)
protected void notifyClose(Session session, GoAwayFrame frame, Callback callback)
{
try
{
listener.onClose(session, frame);
listener.onClose(session, frame, callback);
}
catch (Throwable x)
{
@ -1079,11 +1076,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
protected void notifyFailure(Session session, Throwable failure)
protected void notifyFailure(Session session, Throwable failure, Callback callback)
{
try
{
listener.onFailure(session, failure);
listener.onFailure(session, failure, callback);
}
catch (Throwable x)
{
@ -1322,4 +1319,81 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
promise.failed(x);
}
}
private class CloseCallback implements Callback.NonBlocking
{
private final int error;
private final String reason;
private CloseCallback(int error, String reason)
{
this.error = error;
this.reason = reason;
}
@Override
public void succeeded()
{
complete();
}
@Override
public void failed(Throwable x)
{
complete();
}
private void complete()
{
close(error, reason, Callback.NOOP);
}
}
private class DisconnectCallback implements Callback.NonBlocking
{
@Override
public void succeeded()
{
complete();
}
@Override
public void failed(Throwable x)
{
complete();
}
private void complete()
{
control(null, Callback.NOOP, new DisconnectFrame());
}
}
private class TerminateCallback implements Callback.NonBlocking
{
private final Throwable failure;
private TerminateCallback(Throwable failure)
{
this.failure = failure;
}
@Override
public void succeeded()
{
complete();
}
@Override
public void failed(Throwable x)
{
failure.addSuppressed(x);
complete();
}
private void complete()
{
terminate(failure);
}
}
}

View File

@ -195,9 +195,23 @@ public interface Session
/**
* <p>Callback method invoked when a GOAWAY frame has been received.</p>
*
* @param session the session
* @param frame the GOAWAY frame received
* @param session the session
* @param frame the GOAWAY frame received
* @param callback the callback to notify of the GOAWAY processing
*/
public default void onClose(Session session, GoAwayFrame frame, Callback callback)
{
try
{
onClose(session, frame);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
public void onClose(Session session, GoAwayFrame frame);
/**
@ -210,9 +224,23 @@ public interface Session
/**
* <p>Callback method invoked when a failure has been detected for this session.</p>
*
* @param session the session
* @param failure the failure
* @param session the session
* @param failure the failure
* @param callback the callback to notify of failure processing
*/
public default void onFailure(Session session, Throwable failure, Callback callback)
{
try
{
onFailure(session, failure);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
public void onFailure(Session session, Throwable failure);
/**

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
@ -55,6 +56,7 @@ import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
@ -162,17 +164,21 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
return result;
}
public void onStreamFailure(IStream stream, Throwable failure)
public void onStreamFailure(IStream stream, Throwable failure, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", stream, failure);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel != null)
{
Runnable task = channel.onFailure(failure);
Runnable task = channel.onFailure(failure, callback);
if (task != null)
offerTask(task, true);
}
else
{
callback.succeeded();
}
}
public boolean onSessionTimeout(Throwable failure)
@ -190,13 +196,22 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
return result;
}
public void onSessionFailure(Throwable failure)
public void onSessionFailure(Throwable failure, Callback callback)
{
ISession session = getSession();
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", session, failure);
for (Stream stream : session.getStreams())
onStreamFailure((IStream)stream, failure);
Collection<Stream> streams = session.getStreams();
if (streams.isEmpty())
{
callback.succeeded();
}
else
{
CountingCallback counter = new CountingCallback(callback, streams.size());
for (Stream stream : streams)
onStreamFailure((IStream)stream, failure, counter);
}
}
public void push(Connector connector, IStream stream, MetaData.Request request)

View File

@ -123,7 +123,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
}
@Override
public void onClose(Session session, GoAwayFrame frame)
public void onClose(Session session, GoAwayFrame frame, Callback callback)
{
ErrorCode error = ErrorCode.from(frame.getError());
if (error == null)
@ -131,13 +131,13 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
String reason = frame.tryConvertPayload();
if (reason != null && !reason.isEmpty())
reason = " (" + reason + ")";
getConnection().onSessionFailure(new EofException("HTTP/2 " + error + reason));
getConnection().onSessionFailure(new EofException("HTTP/2 " + error + reason), callback);
}
@Override
public void onFailure(Session session, Throwable failure)
public void onFailure(Session session, Throwable failure, Callback callback)
{
getConnection().onSessionFailure(failure);
getConnection().onSessionFailure(failure, callback);
}
@Override
@ -167,7 +167,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
ErrorCode error = ErrorCode.from(frame.getError());
if (error == null)
error = ErrorCode.CANCEL_STREAM_ERROR;
getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error));
getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error), Callback.NOOP);
}
@Override

View File

@ -300,18 +300,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel
return result;
}
public Runnable onFailure(Throwable failure)
public Runnable onFailure(Throwable failure, Callback callback)
{
getHttpTransport().onStreamFailure(failure);
boolean handle = getRequest().getHttpInput().failed(failure);
consumeInput();
return () ->
{
if (handle)
handleWithContext();
else
getState().asyncError(failure);
};
return new FailureTask(failure, callback, handle);
}
protected void consumeInput()
@ -366,4 +360,35 @@ public class HttpChannelOverHTTP2 extends HttpChannel
streamId = stream.getId();
return String.format("%s#%d", super.toString(), getStream() == null ? -1 : streamId);
}
private class FailureTask implements Runnable
{
private final Throwable failure;
private final Callback callback;
private final boolean handle;
public FailureTask(Throwable failure, Callback callback, boolean handle)
{
this.failure = failure;
this.callback = callback;
this.handle = handle;
}
@Override
public void run()
{
try
{
if (handle)
handleWithContext();
else
getState().asyncError(failure);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
}
}