diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index ac000c999c4..2986d9a91e9 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -19,109 +19,118 @@ import org.eclipse.jetty.util.thread.AutoLock; /** * This specialized callback implements a pattern that allows - * a large job to be broken into smaller tasks using iteration - * rather than recursion. + * a large asynchronous task to be broken into smaller + * asynchronous sub-tasks using iteration rather than recursion. *

* A typical example is the write of a large content to a socket, * divided in chunks. Chunk C1 is written by thread T1, which * also invokes the callback, which writes chunk C2, which invokes * the callback again, which writes chunk C3, and so forth. - *

*

- * The problem with the example is that if the callback thread + * The problem with the example above is that if the callback thread * is the same that performs the I/O operation, then the process * is recursive and may result in a stack overflow. * To avoid the stack overflow, a thread dispatch must be performed, * causing context switching and cache misses, affecting performance. - *

*

- * To avoid this issue, this callback uses an AtomicReference to - * record whether success callback has been called during the processing - * of a sub task, and if so then the processing iterates rather than - * recurring. - *

+ * To avoid this issue, this callback atomically records whether + * the callback for an asynchronous sub-task has been called + * during the processing of the asynchronous sub-task, and if so + * then the processing of the large asynchronous task iterates + * rather than recursing. *

- * Subclasses must implement method {@link #process()} where the sub - * task is executed and a suitable {@link IteratingCallback.Action} is - * returned to this callback to indicate the overall progress of the job. - * This callback is passed to the asynchronous execution of each sub - * task and a call the {@link #succeeded()} on this callback represents - * the completion of the sub task. - *

+ * Subclasses must implement method {@link #process()} where the + * asynchronous sub-task is initiated and a suitable {@link Action} + * is returned to this callback to indicate the overall progress of + * the large asynchronous task. + * This callback is passed to the asynchronous sub-task, and a call + * to {@link #succeeded()} on this callback represents the successful + * completion of the asynchronous sub-task, while a call to + * {@link #failed(Throwable)} on this callback represents the + * completion with a failure of the large asynchronous task. */ public abstract class IteratingCallback implements Callback { /** - * The internal states of this callback + * The internal states of this callback. */ private enum State { /** - * This callback is IDLE, ready to iterate. + * This callback is idle, ready to iterate. */ IDLE, /** - * This callback is iterating calls to {@link #process()} and is dealing with - * the returns. To get into processing state, it much of held the lock state - * and set iterating to true. + * This callback is just about to call {@link #process()}, + * or within it, or just exited from it, either normally + * or by throwing. */ PROCESSING, /** - * Waiting for a schedule callback + * Method {@link #process()} returned {@link Action#SCHEDULED} + * and this callback is waiting for the asynchronous sub-task + * to complete. */ PENDING, /** - * Called by a schedule callback + * The asynchronous sub-task was completed successfully + * via a call to {@link #succeeded()} while in + * {@link #PROCESSING} state. */ CALLED, /** - * The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return - * from {@link IteratingCallback#process()} + * The iteration terminated successfully as indicated by + * {@link Action#SUCCEEDED} returned from + * {@link IteratingCallback#process()}. */ SUCCEEDED, /** - * The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Throwable)} + * The iteration terminated with a failure via a call + * to {@link IteratingCallback#failed(Throwable)}. */ FAILED, /** - * This callback has been closed and cannot be reset. + * This callback has been {@link #close() closed} and + * cannot be {@link #reset() reset}. */ CLOSED } /** - * The indication of the overall progress of the overall job that - * implementations of {@link #process()} must return. + * The indication of the overall progress of the iteration + * that implementations of {@link #process()} must return. */ protected enum Action { /** * Indicates that {@link #process()} has no more work to do, - * but the overall job is not completed yet, probably waiting + * but the iteration is not completed yet, probably waiting * for additional events to trigger more work. */ IDLE, /** - * Indicates that {@link #process()} is executing asynchronously - * a sub task, where the execution has started but the callback + * Indicates that {@link #process()} has initiated an asynchronous + * sub-task, where the execution has started but the callback + * that signals the completion of the asynchronous sub-task * may have not yet been invoked. */ SCHEDULED, - /** - * Indicates that {@link #process()} has completed the overall job. + * Indicates that {@link #process()} has completed the whole + * iteration successfully. */ SUCCEEDED } private final AutoLock _lock = new AutoLock(); private State _state; + private Throwable _failure; private boolean _iterate; protected IteratingCallback() @@ -135,11 +144,10 @@ public abstract class IteratingCallback implements Callback } /** - * Method called by {@link #iterate()} to process the sub task. + * Method called by {@link #iterate()} to process the asynchronous sub-task. *

- * Implementations must start the asynchronous execution of the sub task + * Implementations must initiate the asynchronous execution of the sub-task * (if any) and return an appropriate action: - *

* * * @return the appropriate Action - * @throws Throwable if the sub task processing throws + * @throws Throwable if the sub-task processing throws */ protected abstract Action process() throws Throwable; @@ -174,16 +182,18 @@ public abstract class IteratingCallback implements Callback /** * This method must be invoked by applications to start the processing - * of sub tasks. It can be called at any time by any thread, and it's - * contract is that when called, then the {@link #process()} method will - * be called during or soon after, either by the calling thread or by - * another thread. + * of asynchronous sub-tasks. + *

+ * It can be called at any time by any thread, and its contract is that + * when called, then the {@link #process()} method will be called during + * or soon after, either by the calling thread or by another thread, but + * in either case by one thread only. */ public void iterate() { boolean process = false; - try (AutoLock lock = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { switch (_state) { @@ -219,14 +229,15 @@ public abstract class IteratingCallback implements Callback // This should only ever be called when in processing state, however a failed or close call // may happen concurrently, so state is not assumed. - boolean onCompleteSuccess = false; + boolean notifyCompleteSuccess = false; + Throwable notifyCompleteFailure = null; // While we are processing processing: while (true) { // Call process to get the action that we have to take. - Action action; + Action action = null; try { action = process(); @@ -234,52 +245,53 @@ public abstract class IteratingCallback implements Callback catch (Throwable x) { failed(x); - break; + // Fall through to possibly invoke onCompleteFailure(). } // acted on the action we have just received - try (AutoLock lock = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { switch (_state) { case PROCESSING: { - switch (action) + if (action != null) { - case IDLE: + switch (action) { - // Has iterate been called while we were processing? - if (_iterate) + case IDLE: { - // yes, so skip idle and keep processing - _iterate = false; - _state = State.PROCESSING; - continue processing; + // Has iterate been called while we were processing? + if (_iterate) + { + // yes, so skip idle and keep processing + _iterate = false; + continue; + } + + // No, so we can go idle + _state = State.IDLE; + break processing; + } + case SCHEDULED: + { + // we won the race against the callback, so the callback has to process and we can break processing + _state = State.PENDING; + break processing; + } + case SUCCEEDED: + { + // we lost the race against the callback, + _iterate = false; + _state = State.SUCCEEDED; + notifyCompleteSuccess = true; + break processing; + } + default: + { + break; } - - // No, so we can go idle - _state = State.IDLE; - break processing; } - - case SCHEDULED: - { - // we won the race against the callback, so the callback has to process and we can break processing - _state = State.PENDING; - break processing; - } - - case SUCCEEDED: - { - // we lost the race against the callback, - _iterate = false; - _state = State.SUCCEEDED; - onCompleteSuccess = true; - break processing; - } - - default: - break; } throw new IllegalStateException(String.format("%s[action=%s]", this, action)); } @@ -290,12 +302,16 @@ public abstract class IteratingCallback implements Callback throw new IllegalStateException(String.format("%s[action=%s]", this, action)); // we lost the race, so we have to keep processing _state = State.PROCESSING; - continue processing; + continue; } - case SUCCEEDED: case FAILED: case CLOSED: + notifyCompleteFailure = _failure; + _failure = null; + break processing; + + case SUCCEEDED: break processing; case IDLE: @@ -306,20 +322,23 @@ public abstract class IteratingCallback implements Callback } } - if (onCompleteSuccess) + if (notifyCompleteSuccess) onCompleteSuccess(); + else if (notifyCompleteFailure != null) + onCompleteFailure(notifyCompleteFailure); } /** - * Invoked when the sub task succeeds. - * Subclasses that override this method must always remember to call - * {@code super.succeeded()}. + * Method to invoke when the asynchronous sub-task succeeds. + *

+ * Subclasses that override this method must always remember + * to call {@code super.succeeded()}. */ @Override public void succeeded() { boolean process = false; - try (AutoLock lock = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { switch (_state) { @@ -351,15 +370,24 @@ public abstract class IteratingCallback implements Callback } /** - * Invoked when the sub task fails. - * Subclasses that override this method must always remember to call - * {@code super.failed(Throwable)}. + * Method to invoke when the asynchronous sub-task fails, + * or to fail the overall asynchronous task and therefore + * terminate the iteration. + *

+ * Subclasses that override this method must always remember + * to call {@code super.failed(Throwable)}. + *

+ * Eventually, {@link #onCompleteFailure(Throwable)} is + * called, either by the caller thread or by the processing + * thread. + * + * @see #isFailed() */ @Override public void failed(Throwable x) { boolean failure = false; - try (AutoLock lock = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { switch (_state) { @@ -370,12 +398,15 @@ public abstract class IteratingCallback implements Callback case CALLED: // too late!. break; - case PENDING: + { + failure = true; + break; + } case PROCESSING: { _state = State.FAILED; - failure = true; + _failure = x; break; } default: @@ -386,10 +417,19 @@ public abstract class IteratingCallback implements Callback onCompleteFailure(x); } + /** + * Method to invoke to forbid further invocations to {@link #iterate()} + * and {@link #reset()}. + *

+ * When this method is invoked during processing, it behaves like invoking + * {@link #failed(Throwable)}. + * + * @see #isClosed() + */ public void close() { String failure = null; - try (AutoLock lock = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { switch (_state) { @@ -399,12 +439,18 @@ public abstract class IteratingCallback implements Callback _state = State.CLOSED; break; + case PROCESSING: + _failure = new IOException(String.format("Close %s in state %s", this, _state)); + _state = State.CLOSED; + break; + case CLOSED: break; default: failure = String.format("Close %s in state %s", this, _state); _state = State.CLOSED; + break; } } @@ -412,43 +458,47 @@ public abstract class IteratingCallback implements Callback onCompleteFailure(new IOException(failure)); } - /* - * only for testing - * @return whether this callback is idle and {@link #iterate()} needs to be called + /** + * @return whether this callback is idle, and {@link #iterate()} needs to be called */ boolean isIdle() { - try (AutoLock lock = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { return _state == State.IDLE; } } + /** + * @return whether this callback has been {@link #close() closed} + */ public boolean isClosed() { - try (AutoLock lock = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { return _state == State.CLOSED; } } /** - * @return whether this callback has failed + * @return whether this callback has been {@link #failed(Throwable) failed} */ public boolean isFailed() { - try (AutoLock lock = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { return _state == State.FAILED; } } /** - * @return whether this callback has succeeded + * @return whether this callback and the overall asynchronous task has been succeeded + * + * @see #onCompleteSuccess() */ public boolean isSucceeded() { - try (AutoLock lock = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { return _state == State.SUCCEEDED; } @@ -457,15 +507,15 @@ public abstract class IteratingCallback implements Callback /** * Resets this callback. *

- * A callback can only be reset to IDLE from the - * SUCCEEDED or FAILED states or if it is already IDLE. - *

+ * A callback can only be reset to the idle state from the + * {@link #isSucceeded() succeeded} or {@link #isFailed() failed} states + * or if it is already idle. * * @return true if the reset was successful */ public boolean reset() { - try (AutoLock lock = _lock.lock()) + try (AutoLock ignored = _lock.lock()) { switch (_state) { @@ -474,8 +524,9 @@ public abstract class IteratingCallback implements Callback case SUCCEEDED: case FAILED: - _iterate = false; _state = State.IDLE; + _failure = null; + _iterate = false; return true; default: