Improved handling of asynchronous failures.

This commit is contained in:
Simone Bordet 2017-04-12 12:21:01 +02:00
parent 3359db09bb
commit 2dce90c98d
2 changed files with 32 additions and 11 deletions

View File

@ -40,6 +40,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
@ -329,9 +330,17 @@ public class HttpChannelOverHTTP2 extends HttpChannel
{
getHttpTransport().onStreamFailure(failure);
if (onEarlyEOF())
handle();
{
ContextHandler handler = getState().getContextHandler();
if (handler != null)
handler.handle(getRequest(), this);
else
handle();
}
else
{
getState().asyncError(failure);
}
}
protected void consumeInput()

View File

@ -268,14 +268,17 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
private State state = State.IDLE;
private Callback callback;
private Throwable failure;
private boolean commit;
public boolean start(Callback callback, boolean commit)
{
State state;
Throwable failure;
synchronized (this)
{
state = this.state;
failure = this.failure;
if (state == State.IDLE)
{
this.state = State.WRITING;
@ -284,7 +287,9 @@ public class HttpTransportOverHTTP2 implements HttpTransport
return true;
}
}
callback.failed(new IllegalStateException("Invalid transport state: " + state));
if (failure == null)
failure = new IllegalStateException("Invalid transport state: " + state);
callback.failed(failure);
return false;
}
@ -304,32 +309,36 @@ public class HttpTransportOverHTTP2 implements HttpTransport
}
}
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{}/{} {}",
LOG.debug("HTTP2 Response #{}/{} {} {}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
commit ? "committed" : "flushed content");
commit ? "commit" : "flush",
callback == null ? "failure" : "success");
if (callback != null)
callback.succeeded();
}
@Override
public void failed(Throwable x)
public void failed(Throwable failure)
{
boolean commit;
Callback callback = null;
synchronized (this)
{
commit = this.commit;
// Only fail pending writes, as we
// may need to write an error page.
if (state == State.WRITING)
{
this.state = State.FAILED;
callback = this.callback;
this.callback = null;
this.state = State.FAILED;
this.failure = failure;
}
}
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #" + stream.getId() + " failed to " + (commit ? "commit" : "flush"), x);
LOG.debug(String.format("HTTP2 Response #%d/%h failed to %s", stream.getId(), stream.getSession(), commit ? "commit" : "flush"), failure);
if (callback != null)
callback.failed(x);
callback.failed(failure);
}
@Override
@ -340,7 +349,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
callback = this.callback;
}
return callback.getInvocationType();
return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType();
}
private boolean onIdleTimeout(Throwable failure)
@ -349,16 +358,19 @@ public class HttpTransportOverHTTP2 implements HttpTransport
Callback callback = null;
synchronized (this)
{
// Ignore idle timeouts if not writing,
// as the application may be suspended.
result = state == State.WRITING;
if (result)
{
this.state = State.TIMEOUT;
callback = this.callback;
this.callback = null;
this.state = State.TIMEOUT;
this.failure = failure;
}
}
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #" + stream.getId() + " idle timeout", failure);
LOG.debug(String.format("HTTP2 Response #%d/%h idle timeout", stream.getId(), stream.getSession()), failure);
if (result)
callback.failed(failure);
return result;