Serialize onCompleteFailure for #9059 (#9062)

Serialize onCompleteFailure for #9059

* Fixed case where process() throws an exception.
   Before, exiting the processing loop would always skip to invoke onCompleteFailure(), causing the callback to not be completed.
   Now we fall through and possibly invoke onCompleteFailure() if it was not already invoked.

* Updated javadocs.

* Code cleanups.

Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Greg Wilkins 2022-12-17 09:43:07 +11:00 committed by GitHub
parent 6e82e70edf
commit d24a521930
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 158 additions and 107 deletions

View File

@ -19,109 +19,118 @@ import org.eclipse.jetty.util.thread.AutoLock;
/** /**
* This specialized callback implements a pattern that allows * This specialized callback implements a pattern that allows
* a large job to be broken into smaller tasks using iteration * a large asynchronous task to be broken into smaller
* rather than recursion. * asynchronous sub-tasks using iteration rather than recursion.
* <p> * <p>
* A typical example is the write of a large content to a socket, * A typical example is the write of a large content to a socket,
* divided in chunks. Chunk C1 is written by thread T1, which * divided in chunks. Chunk C1 is written by thread T1, which
* also invokes the callback, which writes chunk C2, which invokes * also invokes the callback, which writes chunk C2, which invokes
* the callback again, which writes chunk C3, and so forth. * the callback again, which writes chunk C3, and so forth.
* </p>
* <p> * <p>
* The problem with the example is that if the callback thread * The problem with the example above is that if the callback thread
* is the same that performs the I/O operation, then the process * is the same that performs the I/O operation, then the process
* is recursive and may result in a stack overflow. * is recursive and may result in a stack overflow.
* To avoid the stack overflow, a thread dispatch must be performed, * To avoid the stack overflow, a thread dispatch must be performed,
* causing context switching and cache misses, affecting performance. * causing context switching and cache misses, affecting performance.
* </p>
* <p> * <p>
* To avoid this issue, this callback uses an AtomicReference to * To avoid this issue, this callback atomically records whether
* record whether success callback has been called during the processing * the callback for an asynchronous sub-task has been called
* of a sub task, and if so then the processing iterates rather than * during the processing of the asynchronous sub-task, and if so
* recurring. * then the processing of the large asynchronous task iterates
* </p> * rather than recursing.
* <p> * <p>
* Subclasses must implement method {@link #process()} where the sub * Subclasses must implement method {@link #process()} where the
* task is executed and a suitable {@link IteratingCallback.Action} is * asynchronous sub-task is initiated and a suitable {@link Action}
* returned to this callback to indicate the overall progress of the job. * is returned to this callback to indicate the overall progress of
* This callback is passed to the asynchronous execution of each sub * the large asynchronous task.
* task and a call the {@link #succeeded()} on this callback represents * This callback is passed to the asynchronous sub-task, and a call
* the completion of the sub task. * to {@link #succeeded()} on this callback represents the successful
* </p> * completion of the asynchronous sub-task, while a call to
* {@link #failed(Throwable)} on this callback represents the
* completion with a failure of the large asynchronous task.
*/ */
public abstract class IteratingCallback implements Callback public abstract class IteratingCallback implements Callback
{ {
/** /**
* The internal states of this callback * The internal states of this callback.
*/ */
private enum State private enum State
{ {
/** /**
* This callback is IDLE, ready to iterate. * This callback is idle, ready to iterate.
*/ */
IDLE, IDLE,
/** /**
* This callback is iterating calls to {@link #process()} and is dealing with * This callback is just about to call {@link #process()},
* the returns. To get into processing state, it much of held the lock state * or within it, or just exited from it, either normally
* and set iterating to true. * or by throwing.
*/ */
PROCESSING, PROCESSING,
/** /**
* Waiting for a schedule callback * Method {@link #process()} returned {@link Action#SCHEDULED}
* and this callback is waiting for the asynchronous sub-task
* to complete.
*/ */
PENDING, PENDING,
/** /**
* Called by a schedule callback * The asynchronous sub-task was completed successfully
* via a call to {@link #succeeded()} while in
* {@link #PROCESSING} state.
*/ */
CALLED, CALLED,
/** /**
* The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return * The iteration terminated successfully as indicated by
* from {@link IteratingCallback#process()} * {@link Action#SUCCEEDED} returned from
* {@link IteratingCallback#process()}.
*/ */
SUCCEEDED, SUCCEEDED,
/** /**
* The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Throwable)} * The iteration terminated with a failure via a call
* to {@link IteratingCallback#failed(Throwable)}.
*/ */
FAILED, FAILED,
/** /**
* This callback has been closed and cannot be reset. * This callback has been {@link #close() closed} and
* cannot be {@link #reset() reset}.
*/ */
CLOSED CLOSED
} }
/** /**
* The indication of the overall progress of the overall job that * The indication of the overall progress of the iteration
* implementations of {@link #process()} must return. * that implementations of {@link #process()} must return.
*/ */
protected enum Action protected enum Action
{ {
/** /**
* Indicates that {@link #process()} has no more work to do, * Indicates that {@link #process()} has no more work to do,
* but the overall job is not completed yet, probably waiting * but the iteration is not completed yet, probably waiting
* for additional events to trigger more work. * for additional events to trigger more work.
*/ */
IDLE, IDLE,
/** /**
* Indicates that {@link #process()} is executing asynchronously * Indicates that {@link #process()} has initiated an asynchronous
* a sub task, where the execution has started but the callback * sub-task, where the execution has started but the callback
* that signals the completion of the asynchronous sub-task
* may have not yet been invoked. * may have not yet been invoked.
*/ */
SCHEDULED, SCHEDULED,
/** /**
* Indicates that {@link #process()} has completed the overall job. * Indicates that {@link #process()} has completed the whole
* iteration successfully.
*/ */
SUCCEEDED SUCCEEDED
} }
private final AutoLock _lock = new AutoLock(); private final AutoLock _lock = new AutoLock();
private State _state; private State _state;
private Throwable _failure;
private boolean _iterate; private boolean _iterate;
protected IteratingCallback() protected IteratingCallback()
@ -135,11 +144,10 @@ public abstract class IteratingCallback implements Callback
} }
/** /**
* Method called by {@link #iterate()} to process the sub task. * Method called by {@link #iterate()} to process the asynchronous sub-task.
* <p> * <p>
* Implementations must start the asynchronous execution of the sub task * Implementations must initiate the asynchronous execution of the sub-task
* (if any) and return an appropriate action: * (if any) and return an appropriate action:
* </p>
* <ul> * <ul>
* <li>{@link Action#IDLE} when no sub tasks are available for execution * <li>{@link Action#IDLE} when no sub tasks are available for execution
* but the overall job is not completed yet</li> * but the overall job is not completed yet</li>
@ -149,7 +157,7 @@ public abstract class IteratingCallback implements Callback
* </ul> * </ul>
* *
* @return the appropriate Action * @return the appropriate Action
* @throws Throwable if the sub task processing throws * @throws Throwable if the sub-task processing throws
*/ */
protected abstract Action process() throws Throwable; protected abstract Action process() throws Throwable;
@ -174,16 +182,18 @@ public abstract class IteratingCallback implements Callback
/** /**
* This method must be invoked by applications to start the processing * This method must be invoked by applications to start the processing
* of sub tasks. It can be called at any time by any thread, and it's * of asynchronous sub-tasks.
* contract is that when called, then the {@link #process()} method will * <p>
* be called during or soon after, either by the calling thread or by * It can be called at any time by any thread, and its contract is that
* another thread. * when called, then the {@link #process()} method will be called during
* or soon after, either by the calling thread or by another thread, but
* in either case by one thread only.
*/ */
public void iterate() public void iterate()
{ {
boolean process = false; boolean process = false;
try (AutoLock lock = _lock.lock()) try (AutoLock ignored = _lock.lock())
{ {
switch (_state) switch (_state)
{ {
@ -219,14 +229,15 @@ public abstract class IteratingCallback implements Callback
// This should only ever be called when in processing state, however a failed or close call // This should only ever be called when in processing state, however a failed or close call
// may happen concurrently, so state is not assumed. // may happen concurrently, so state is not assumed.
boolean onCompleteSuccess = false; boolean notifyCompleteSuccess = false;
Throwable notifyCompleteFailure = null;
// While we are processing // While we are processing
processing: processing:
while (true) while (true)
{ {
// Call process to get the action that we have to take. // Call process to get the action that we have to take.
Action action; Action action = null;
try try
{ {
action = process(); action = process();
@ -234,15 +245,17 @@ public abstract class IteratingCallback implements Callback
catch (Throwable x) catch (Throwable x)
{ {
failed(x); failed(x);
break; // Fall through to possibly invoke onCompleteFailure().
} }
// acted on the action we have just received // acted on the action we have just received
try (AutoLock lock = _lock.lock()) try (AutoLock ignored = _lock.lock())
{ {
switch (_state) switch (_state)
{ {
case PROCESSING: case PROCESSING:
{
if (action != null)
{ {
switch (action) switch (action)
{ {
@ -253,34 +266,33 @@ public abstract class IteratingCallback implements Callback
{ {
// yes, so skip idle and keep processing // yes, so skip idle and keep processing
_iterate = false; _iterate = false;
_state = State.PROCESSING; continue;
continue processing;
} }
// No, so we can go idle // No, so we can go idle
_state = State.IDLE; _state = State.IDLE;
break processing; break processing;
} }
case SCHEDULED: case SCHEDULED:
{ {
// we won the race against the callback, so the callback has to process and we can break processing // we won the race against the callback, so the callback has to process and we can break processing
_state = State.PENDING; _state = State.PENDING;
break processing; break processing;
} }
case SUCCEEDED: case SUCCEEDED:
{ {
// we lost the race against the callback, // we lost the race against the callback,
_iterate = false; _iterate = false;
_state = State.SUCCEEDED; _state = State.SUCCEEDED;
onCompleteSuccess = true; notifyCompleteSuccess = true;
break processing; break processing;
} }
default: default:
{
break; break;
} }
}
}
throw new IllegalStateException(String.format("%s[action=%s]", this, action)); throw new IllegalStateException(String.format("%s[action=%s]", this, action));
} }
@ -290,12 +302,16 @@ public abstract class IteratingCallback implements Callback
throw new IllegalStateException(String.format("%s[action=%s]", this, action)); throw new IllegalStateException(String.format("%s[action=%s]", this, action));
// we lost the race, so we have to keep processing // we lost the race, so we have to keep processing
_state = State.PROCESSING; _state = State.PROCESSING;
continue processing; continue;
} }
case SUCCEEDED:
case FAILED: case FAILED:
case CLOSED: case CLOSED:
notifyCompleteFailure = _failure;
_failure = null;
break processing;
case SUCCEEDED:
break processing; break processing;
case IDLE: case IDLE:
@ -306,20 +322,23 @@ public abstract class IteratingCallback implements Callback
} }
} }
if (onCompleteSuccess) if (notifyCompleteSuccess)
onCompleteSuccess(); onCompleteSuccess();
else if (notifyCompleteFailure != null)
onCompleteFailure(notifyCompleteFailure);
} }
/** /**
* Invoked when the sub task succeeds. * Method to invoke when the asynchronous sub-task succeeds.
* Subclasses that override this method must always remember to call * <p>
* {@code super.succeeded()}. * Subclasses that override this method must always remember
* to call {@code super.succeeded()}.
*/ */
@Override @Override
public void succeeded() public void succeeded()
{ {
boolean process = false; boolean process = false;
try (AutoLock lock = _lock.lock()) try (AutoLock ignored = _lock.lock())
{ {
switch (_state) switch (_state)
{ {
@ -351,15 +370,24 @@ public abstract class IteratingCallback implements Callback
} }
/** /**
* Invoked when the sub task fails. * Method to invoke when the asynchronous sub-task fails,
* Subclasses that override this method must always remember to call * or to fail the overall asynchronous task and therefore
* {@code super.failed(Throwable)}. * terminate the iteration.
* <p>
* Subclasses that override this method must always remember
* to call {@code super.failed(Throwable)}.
* <p>
* Eventually, {@link #onCompleteFailure(Throwable)} is
* called, either by the caller thread or by the processing
* thread.
*
* @see #isFailed()
*/ */
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
boolean failure = false; boolean failure = false;
try (AutoLock lock = _lock.lock()) try (AutoLock ignored = _lock.lock())
{ {
switch (_state) switch (_state)
{ {
@ -370,12 +398,15 @@ public abstract class IteratingCallback implements Callback
case CALLED: case CALLED:
// too late!. // too late!.
break; break;
case PENDING: case PENDING:
{
failure = true;
break;
}
case PROCESSING: case PROCESSING:
{ {
_state = State.FAILED; _state = State.FAILED;
failure = true; _failure = x;
break; break;
} }
default: default:
@ -386,10 +417,19 @@ public abstract class IteratingCallback implements Callback
onCompleteFailure(x); onCompleteFailure(x);
} }
/**
* Method to invoke to forbid further invocations to {@link #iterate()}
* and {@link #reset()}.
* <p>
* When this method is invoked during processing, it behaves like invoking
* {@link #failed(Throwable)}.
*
* @see #isClosed()
*/
public void close() public void close()
{ {
String failure = null; String failure = null;
try (AutoLock lock = _lock.lock()) try (AutoLock ignored = _lock.lock())
{ {
switch (_state) switch (_state)
{ {
@ -399,12 +439,18 @@ public abstract class IteratingCallback implements Callback
_state = State.CLOSED; _state = State.CLOSED;
break; break;
case PROCESSING:
_failure = new IOException(String.format("Close %s in state %s", this, _state));
_state = State.CLOSED;
break;
case CLOSED: case CLOSED:
break; break;
default: default:
failure = String.format("Close %s in state %s", this, _state); failure = String.format("Close %s in state %s", this, _state);
_state = State.CLOSED; _state = State.CLOSED;
break;
} }
} }
@ -412,43 +458,47 @@ public abstract class IteratingCallback implements Callback
onCompleteFailure(new IOException(failure)); onCompleteFailure(new IOException(failure));
} }
/* /**
* only for testing * @return whether this callback is idle, and {@link #iterate()} needs to be called
* @return whether this callback is idle and {@link #iterate()} needs to be called
*/ */
boolean isIdle() boolean isIdle()
{ {
try (AutoLock lock = _lock.lock()) try (AutoLock ignored = _lock.lock())
{ {
return _state == State.IDLE; return _state == State.IDLE;
} }
} }
/**
* @return whether this callback has been {@link #close() closed}
*/
public boolean isClosed() public boolean isClosed()
{ {
try (AutoLock lock = _lock.lock()) try (AutoLock ignored = _lock.lock())
{ {
return _state == State.CLOSED; return _state == State.CLOSED;
} }
} }
/** /**
* @return whether this callback has failed * @return whether this callback has been {@link #failed(Throwable) failed}
*/ */
public boolean isFailed() public boolean isFailed()
{ {
try (AutoLock lock = _lock.lock()) try (AutoLock ignored = _lock.lock())
{ {
return _state == State.FAILED; return _state == State.FAILED;
} }
} }
/** /**
* @return whether this callback has succeeded * @return whether this callback and the overall asynchronous task has been succeeded
*
* @see #onCompleteSuccess()
*/ */
public boolean isSucceeded() public boolean isSucceeded()
{ {
try (AutoLock lock = _lock.lock()) try (AutoLock ignored = _lock.lock())
{ {
return _state == State.SUCCEEDED; return _state == State.SUCCEEDED;
} }
@ -457,15 +507,15 @@ public abstract class IteratingCallback implements Callback
/** /**
* Resets this callback. * Resets this callback.
* <p> * <p>
* A callback can only be reset to IDLE from the * A callback can only be reset to the idle state from the
* SUCCEEDED or FAILED states or if it is already IDLE. * {@link #isSucceeded() succeeded} or {@link #isFailed() failed} states
* </p> * or if it is already idle.
* *
* @return true if the reset was successful * @return true if the reset was successful
*/ */
public boolean reset() public boolean reset()
{ {
try (AutoLock lock = _lock.lock()) try (AutoLock ignored = _lock.lock())
{ {
switch (_state) switch (_state)
{ {
@ -474,8 +524,9 @@ public abstract class IteratingCallback implements Callback
case SUCCEEDED: case SUCCEEDED:
case FAILED: case FAILED:
_iterate = false;
_state = State.IDLE; _state = State.IDLE;
_failure = null;
_iterate = false;
return true; return true;
default: default: