Merged branch 'jetty-9.3.x' into 'jetty-9.4.x'.

This commit is contained in:
Simone Bordet 2017-08-22 19:33:46 +02:00
commit 7e764bad3e
9 changed files with 206 additions and 39 deletions

View File

@ -181,6 +181,7 @@ public class HTTP2Connection extends AbstractConnection
{
private final Callback fillableCallback = new FillableCallback();
private ByteBuffer buffer;
private boolean shutdown;
@Override
public Runnable produce()
@ -191,7 +192,7 @@ public class HTTP2Connection extends AbstractConnection
if (task != null)
return task;
if (isFillInterested())
if (isFillInterested() || shutdown)
return null;
if (buffer == null)
@ -227,6 +228,7 @@ public class HTTP2Connection extends AbstractConnection
else if (filled < 0)
{
release();
shutdown = true;
session.onShutdown();
return null;
}

View File

@ -435,8 +435,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
// We received a GO_AWAY, so try to write
// what's in the queue and then disconnect.
notifyClose(this, frame);
control(null, Callback.NOOP, new DisconnectFrame());
notifyClose(this, frame, new DisconnectCallback());
return;
}
break;
@ -476,8 +475,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override
public void onConnectionFailure(int error, String reason)
{
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)));
close(error, reason, Callback.NOOP);
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason));
}
@Override
@ -1003,8 +1001,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
protected void abort(Throwable failure)
{
notifyFailure(this, failure);
terminate(failure);
notifyFailure(this, failure, new TerminateCallback(failure));
}
public boolean isDisconnected()
@ -1066,11 +1063,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
protected void notifyClose(Session session, GoAwayFrame frame)
protected void notifyClose(Session session, GoAwayFrame frame, Callback callback)
{
try
{
listener.onClose(session, frame);
listener.onClose(session, frame, callback);
}
catch (Throwable x)
{
@ -1091,11 +1088,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
protected void notifyFailure(Session session, Throwable failure)
protected void notifyFailure(Session session, Throwable failure, Callback callback)
{
try
{
listener.onFailure(session, failure);
listener.onFailure(session, failure, callback);
}
catch (Throwable x)
{
@ -1330,4 +1327,99 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
promise.failed(x);
}
}
private class CloseCallback implements Callback
{
private final int error;
private final String reason;
private CloseCallback(int error, String reason)
{
this.error = error;
this.reason = reason;
}
@Override
public void succeeded()
{
complete();
}
@Override
public void failed(Throwable x)
{
complete();
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
private void complete()
{
close(error, reason, Callback.NOOP);
}
}
private class DisconnectCallback implements Callback
{
@Override
public void succeeded()
{
complete();
}
@Override
public void failed(Throwable x)
{
complete();
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
private void complete()
{
control(null, Callback.NOOP, new DisconnectFrame());
}
}
private class TerminateCallback implements Callback
{
private final Throwable failure;
private TerminateCallback(Throwable failure)
{
this.failure = failure;
}
@Override
public void succeeded()
{
complete();
}
@Override
public void failed(Throwable x)
{
failure.addSuppressed(x);
complete();
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
private void complete()
{
terminate(failure);
}
}
}

View File

@ -195,9 +195,23 @@ public interface Session
/**
* <p>Callback method invoked when a GOAWAY frame has been received.</p>
*
* @param session the session
* @param frame the GOAWAY frame received
* @param session the session
* @param frame the GOAWAY frame received
* @param callback the callback to notify of the GOAWAY processing
*/
public default void onClose(Session session, GoAwayFrame frame, Callback callback)
{
try
{
onClose(session, frame);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
public void onClose(Session session, GoAwayFrame frame);
/**
@ -210,9 +224,23 @@ public interface Session
/**
* <p>Callback method invoked when a failure has been detected for this session.</p>
*
* @param session the session
* @param failure the failure
* @param session the session
* @param failure the failure
* @param callback the callback to notify of failure processing
*/
public default void onFailure(Session session, Throwable failure, Callback callback)
{
try
{
onFailure(session, failure);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
public void onFailure(Session session, Throwable failure);
/**

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
@ -55,6 +56,7 @@ import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
@ -205,17 +207,21 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
return result;
}
public void onStreamFailure(IStream stream, Throwable failure)
public void onStreamFailure(IStream stream, Throwable failure, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", stream, failure);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel != null)
{
Runnable task = channel.onFailure(failure);
Runnable task = channel.onFailure(failure, callback);
if (task != null)
offerTask(task, true);
}
else
{
callback.succeeded();
}
}
public boolean onSessionTimeout(Throwable failure)
@ -233,13 +239,22 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
return result;
}
public void onSessionFailure(Throwable failure)
public void onSessionFailure(Throwable failure, Callback callback)
{
ISession session = getSession();
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", session, failure);
for (Stream stream : session.getStreams())
onStreamFailure((IStream)stream, failure);
Collection<Stream> streams = session.getStreams();
if (streams.isEmpty())
{
callback.succeeded();
}
else
{
CountingCallback counter = new CountingCallback(callback, streams.size());
for (Stream stream : streams)
onStreamFailure((IStream)stream, failure, counter);
}
}
public void push(Connector connector, IStream stream, MetaData.Request request)

View File

@ -122,7 +122,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
}
@Override
public void onClose(Session session, GoAwayFrame frame)
public void onClose(Session session, GoAwayFrame frame, Callback callback)
{
ErrorCode error = ErrorCode.from(frame.getError());
if (error == null)
@ -130,13 +130,13 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
String reason = frame.tryConvertPayload();
if (reason != null && !reason.isEmpty())
reason = " (" + reason + ")";
getConnection().onSessionFailure(new EofException("HTTP/2 " + error + reason));
getConnection().onSessionFailure(new EofException("HTTP/2 " + error + reason), callback);
}
@Override
public void onFailure(Session session, Throwable failure)
public void onFailure(Session session, Throwable failure, Callback callback)
{
getConnection().onSessionFailure(failure);
getConnection().onSessionFailure(failure, callback);
}
@Override
@ -168,7 +168,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
ErrorCode error = ErrorCode.from(frame.getError());
if (error == null)
error = ErrorCode.CANCEL_STREAM_ERROR;
getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error));
getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error), Callback.NOOP);
}
@Override

View File

@ -322,18 +322,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
return result;
}
public Runnable onFailure(Throwable failure)
public Runnable onFailure(Throwable failure, Callback callback)
{
getHttpTransport().onStreamFailure(failure);
boolean handle = getRequest().getHttpInput().failed(failure);
consumeInput();
return () ->
{
if (handle)
handleWithContext();
else
getState().asyncError(failure);
};
return new FailureTask(failure, callback, handle);
}
protected void consumeInput()
@ -394,4 +388,35 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
streamId = stream.getId();
return String.format("%s#%d", super.toString(), getStream() == null ? -1 : streamId);
}
private class FailureTask implements Runnable
{
private final Throwable failure;
private final Callback callback;
private final boolean handle;
public FailureTask(Throwable failure, Callback callback, boolean handle)
{
this.failure = failure;
this.callback = callback;
this.handle = handle;
}
@Override
public void run()
{
try
{
if (handle)
handleWithContext();
else
getState().asyncError(failure);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
}
}

View File

@ -325,14 +325,12 @@ public class HttpTransportOverHTTP2 implements HttpTransport
synchronized (this)
{
commit = this.commit;
State state = this.state;
this.state = State.FAILED;
if (this.failure == null)
this.failure = failure;
else
this.failure.addSuppressed(failure);
// Only fail pending writes, as we
// may need to write an error page.
if (state == State.WRITING)
{
this.state = State.FAILED;
this.failure = failure;
callback = this.callback;
this.callback = null;
}

View File

@ -36,6 +36,11 @@ public interface Callback extends Invocable
*/
static Callback NOOP = new Callback()
{
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
};
/**

View File

@ -45,6 +45,8 @@ public class CountingCallback extends Callback.Nested
public CountingCallback(Callback callback, int count)
{
super(callback);
if (count < 1)
throw new IllegalArgumentException();
this.count = new AtomicInteger(count);
}