Add suppressed failures in Callback failed (#11435)
If an exception is thrown during failure handling, then record the original failure as a suppressed Throwable on the thrown exception
This commit is contained in:
parent
56e05a973f
commit
4155e7bc25
|
@ -1114,7 +1114,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
}
|
||||
if (writeCallback == null)
|
||||
return null;
|
||||
return () -> writeCallback.failed(x);
|
||||
return () -> HttpChannelState.failed(writeCallback, x);
|
||||
}
|
||||
|
||||
public long getContentBytesWritten()
|
||||
|
@ -1229,7 +1229,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
if (writeFailure != null)
|
||||
{
|
||||
Throwable failure = writeFailure;
|
||||
httpChannelState._writeInvoker.run(() -> callback.failed(failure));
|
||||
httpChannelState._writeInvoker.run(() -> HttpChannelState.failed(callback, failure));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1297,7 +1297,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
httpChannel.lockedStreamSendCompleted(false);
|
||||
}
|
||||
if (callback != null)
|
||||
httpChannel._writeInvoker.run(() -> callback.failed(x));
|
||||
httpChannel._writeInvoker.run(() -> HttpChannelState.failed(callback, x));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1513,41 +1513,49 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
@Override
|
||||
public void failed(Throwable failure)
|
||||
{
|
||||
// Called when the request/response cycle is completing with a failure.
|
||||
HttpStream stream;
|
||||
ChannelRequest request;
|
||||
HttpChannelState httpChannelState;
|
||||
ErrorResponse errorResponse = null;
|
||||
try (AutoLock ignored = _request._lock.lock())
|
||||
try
|
||||
{
|
||||
if (lockedCompleteCallback())
|
||||
return;
|
||||
httpChannelState = _request._httpChannelState;
|
||||
stream = httpChannelState._stream;
|
||||
request = _request;
|
||||
// Called when the request/response cycle is completing with a failure.
|
||||
HttpStream stream;
|
||||
ChannelRequest request;
|
||||
HttpChannelState httpChannelState;
|
||||
ErrorResponse errorResponse = null;
|
||||
try (AutoLock ignored = _request._lock.lock())
|
||||
{
|
||||
if (lockedCompleteCallback())
|
||||
return;
|
||||
httpChannelState = _request._httpChannelState;
|
||||
stream = httpChannelState._stream;
|
||||
request = _request;
|
||||
|
||||
assert httpChannelState._callbackFailure == null;
|
||||
assert httpChannelState._callbackFailure == null;
|
||||
|
||||
httpChannelState._callbackFailure = failure;
|
||||
httpChannelState._callbackFailure = failure;
|
||||
|
||||
// Consume any input.
|
||||
Throwable unconsumed = stream.consumeAvailable();
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(failure, unconsumed);
|
||||
// Consume any input.
|
||||
Throwable unconsumed = stream.consumeAvailable();
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(failure, unconsumed);
|
||||
|
||||
ChannelResponse response = httpChannelState._response;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("failed stream.isCommitted={}, response.isCommitted={} {}", stream.isCommitted(), response.isCommitted(), this);
|
||||
ChannelResponse response = httpChannelState._response;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("failed stream.isCommitted={}, response.isCommitted={} {}", stream.isCommitted(), response.isCommitted(), this);
|
||||
|
||||
// There may have been an attempt to write an error response that failed.
|
||||
// Do not try to write again an error response if already committed.
|
||||
if (!stream.isCommitted())
|
||||
errorResponse = new ErrorResponse(request);
|
||||
// There may have been an attempt to write an error response that failed.
|
||||
// Do not try to write again an error response if already committed.
|
||||
if (!stream.isCommitted())
|
||||
errorResponse = new ErrorResponse(request);
|
||||
}
|
||||
|
||||
if (errorResponse != null)
|
||||
Response.writeError(request, errorResponse, new ErrorCallback(request, errorResponse, stream, failure), failure);
|
||||
else
|
||||
_request.getHttpChannelState()._handlerInvoker.failed(failure);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(t, failure);
|
||||
throw t;
|
||||
}
|
||||
|
||||
if (errorResponse != null)
|
||||
Response.writeError(request, errorResponse, new ErrorCallback(request, errorResponse, stream, failure), failure);
|
||||
else
|
||||
_request.getHttpChannelState()._handlerInvoker.failed(failure);
|
||||
}
|
||||
|
||||
private boolean lockedCompleteCallback()
|
||||
|
@ -1683,7 +1691,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
}
|
||||
else
|
||||
{
|
||||
httpChannelState._handlerInvoker.failed(failure);
|
||||
HttpChannelState.failed(httpChannelState._handlerInvoker, failure);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1705,9 +1713,8 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
httpChannelState = _request.lockedGetHttpChannelState();
|
||||
httpChannelState._response._status = _errorResponse._status;
|
||||
}
|
||||
if (ExceptionUtil.areNotAssociated(failure, x))
|
||||
failure.addSuppressed(x);
|
||||
httpChannelState._handlerInvoker.failed(failure);
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(failure, x);
|
||||
HttpChannelState.failed(httpChannelState._handlerInvoker, failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1736,8 +1743,16 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
completing();
|
||||
super.failed(x);
|
||||
try
|
||||
{
|
||||
completing();
|
||||
super.failed(x);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(t, x);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
private void completing()
|
||||
|
@ -1868,4 +1883,25 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke a callback failure, handling any {@link Throwable} thrown
|
||||
* by adding the passed {@code failure} as a suppressed with
|
||||
* {@link ExceptionUtil#addSuppressedIfNotAssociated(Throwable, Throwable)}.
|
||||
* @param callback The callback to fail
|
||||
* @param failure The failure
|
||||
* @throws RuntimeException If thrown, will have the {@code failure} added as a suppressed.
|
||||
*/
|
||||
private static void failed(Callback callback, Throwable failure)
|
||||
{
|
||||
try
|
||||
{
|
||||
callback.failed(failure);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(t, failure);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -121,7 +121,15 @@ public interface Callback extends Invocable
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
completable.completeExceptionally(x);
|
||||
try
|
||||
{
|
||||
completable.completeExceptionally(x);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(t, x);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -165,7 +173,15 @@ public interface Callback extends Invocable
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
failure.accept(x);
|
||||
try
|
||||
{
|
||||
failure.accept(x);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(t, x);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -272,14 +288,7 @@ public interface Callback extends Invocable
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
try
|
||||
{
|
||||
callback.failed(x);
|
||||
}
|
||||
finally
|
||||
{
|
||||
completed.accept(x);
|
||||
}
|
||||
Callback.failed(callback::failed, completed, x);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -307,22 +316,19 @@ public interface Callback extends Invocable
|
|||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
callback.failed(t);
|
||||
Callback.failed(callback, t);
|
||||
}
|
||||
}
|
||||
|
||||
private void completed(Throwable ignored)
|
||||
{
|
||||
completed.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
try
|
||||
{
|
||||
completed.run();
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
x.addSuppressed(t);
|
||||
}
|
||||
callback.failed(x);
|
||||
Callback.failed(this::completed, callback::failed, x);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -348,8 +354,8 @@ public interface Callback extends Invocable
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
cause.addSuppressed(x);
|
||||
callback.failed(cause);
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(cause, x);
|
||||
Callback.failed(callback, cause);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -381,7 +387,15 @@ public interface Callback extends Invocable
|
|||
@Override
|
||||
default void failed(Throwable x)
|
||||
{
|
||||
completed();
|
||||
try
|
||||
{
|
||||
completed();
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(t, x);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -408,6 +422,11 @@ public interface Callback extends Invocable
|
|||
{
|
||||
}
|
||||
|
||||
private void completed(Throwable ignored)
|
||||
{
|
||||
completed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
|
@ -424,14 +443,7 @@ public interface Callback extends Invocable
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
try
|
||||
{
|
||||
callback.failed(x);
|
||||
}
|
||||
finally
|
||||
{
|
||||
completed();
|
||||
}
|
||||
Callback.failed(callback::failed, this::completed, x);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -472,18 +484,7 @@ public interface Callback extends Invocable
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
try
|
||||
{
|
||||
cb1.failed(x);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(x, t);
|
||||
}
|
||||
finally
|
||||
{
|
||||
cb2.failed(x);
|
||||
}
|
||||
Callback.failed(cb1::failed, cb2::failed, x);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -533,8 +534,7 @@ public interface Callback extends Invocable
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
callback.failed(x);
|
||||
super.failed(x);
|
||||
Callback.failed(callback::failed, super::failed, x);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -592,4 +592,64 @@ public interface Callback extends Invocable
|
|||
return completable;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke a callback failure, handling any {@link Throwable} thrown
|
||||
* by adding the passed {@code failure} as a suppressed with
|
||||
* {@link ExceptionUtil#addSuppressedIfNotAssociated(Throwable, Throwable)}.
|
||||
* @param callback The callback to fail
|
||||
* @param failure The failure
|
||||
* @throws RuntimeException If thrown, will have the {@code failure} added as a suppressed.
|
||||
*/
|
||||
private static void failed(Callback callback, Throwable failure)
|
||||
{
|
||||
try
|
||||
{
|
||||
callback.failed(failure);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(t, failure);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke two consumers of a failure, handling any {@link Throwable} thrown
|
||||
* by adding the passed {@code failure} as a suppressed with
|
||||
* {@link ExceptionUtil#addSuppressedIfNotAssociated(Throwable, Throwable)}.
|
||||
* @param first The first consumer of a failure
|
||||
* @param second The first consumer of a failure
|
||||
* @param failure The failure
|
||||
* @throws RuntimeException If thrown, will have the {@code failure} added as a suppressed.
|
||||
*/
|
||||
private static void failed(Consumer<Throwable> first, Consumer<Throwable> second, Throwable failure)
|
||||
{
|
||||
// This is an improved version of:
|
||||
// try
|
||||
// {
|
||||
// first.accept(failure);
|
||||
// }
|
||||
// finally
|
||||
// {
|
||||
// second.accept(failure);
|
||||
// }
|
||||
try
|
||||
{
|
||||
first.accept(failure);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(failure, t);
|
||||
}
|
||||
try
|
||||
{
|
||||
second.accept(failure);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(t, failure);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -499,15 +499,24 @@ public class ServletChannel
|
|||
ExceptionUtil.addSuppressedIfNotAssociated(cause, x);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Could not perform error handling, aborting", cause);
|
||||
if (_state.isResponseCommitted())
|
||||
|
||||
try
|
||||
{
|
||||
// Perform the same behavior as when the callback is failed.
|
||||
_state.errorHandlingComplete(cause);
|
||||
if (_state.isResponseCommitted())
|
||||
{
|
||||
// Perform the same behavior as when the callback is failed.
|
||||
_state.errorHandlingComplete(cause);
|
||||
}
|
||||
else
|
||||
{
|
||||
getServletContextResponse().resetContent();
|
||||
sendErrorResponseAndComplete();
|
||||
}
|
||||
}
|
||||
else
|
||||
catch (Throwable t)
|
||||
{
|
||||
getServletContextResponse().resetContent();
|
||||
sendErrorResponseAndComplete();
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(t, cause);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
finally
|
||||
|
|
Loading…
Reference in New Issue