diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index e8b82724aef..4faab7c0293 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -21,6 +21,8 @@ package org.eclipse.jetty.util; import java.nio.channels.ClosedChannelException; import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.util.thread.SpinLock; + /** * This specialized callback implements a pattern that allows * a large job to be broken into smaller tasks using iteration @@ -92,12 +94,7 @@ public abstract class IteratingCallback implements Callback /** * This callback has been closed and cannot be reset. */ - CLOSED, - - /** - * State is locked while leaving processing state to check the iterate boolean - */ - LOCKED + CLOSED } /** @@ -125,18 +122,19 @@ public abstract class IteratingCallback implements Callback SUCCEEDED } - private final AtomicReference _state; + private SpinLock _lock = new SpinLock(); + private State _state; private boolean _iterate; protected IteratingCallback() { - _state = new AtomicReference<>(State.IDLE); + _state = State.IDLE; } protected IteratingCallback(boolean needReset) { - _state = new AtomicReference<>(needReset ? State.SUCCEEDED : State.IDLE); + _state = needReset ? State.SUCCEEDED : State.IDLE; } /** @@ -183,43 +181,40 @@ public abstract class IteratingCallback implements Callback */ public void iterate() { + boolean process=false; + loop: while (true) { - State state=_state.get(); - switch (state) + try (SpinLock.Lock lock = _lock.lock()) { - case PENDING: - case CALLED: - // process will be called when callback is handled - break loop; - - case IDLE: - if (!_state.compareAndSet(state,State.PROCESSING)) - continue; - processing(); - break loop; - - case PROCESSING: - if (!_state.compareAndSet(state,State.LOCKED)) - continue; - // Tell the thread that is processing that it must iterate again - _iterate=true; - _state.set(State.PROCESSING); - break loop; - - case LOCKED: - Thread.yield(); - continue loop; + switch (_state) + { + case PENDING: + case CALLED: + // process will be called when callback is handled + break loop; - case FAILED: - case SUCCEEDED: - break loop; + case IDLE: + _state=State.PROCESSING; + process=true; + break loop; - case CLOSED: - default: - throw new IllegalStateException("state="+state); + case PROCESSING: + _iterate=true; + break loop; + + case FAILED: + case SUCCEEDED: + break loop; + + case CLOSED: + default: + throw new IllegalStateException("state="+_state); + } } } + if (process) + processing(); } private void processing() @@ -227,6 +222,8 @@ public abstract class IteratingCallback implements Callback // This should only ever be called when in processing state, however a failed or close call // may happen concurrently, so state is not assumed. + boolean on_complete_success=false; + // While we are processing processing: while (true) { @@ -242,13 +239,10 @@ public abstract class IteratingCallback implements Callback break processing; } - // loop until we have successfully acted on the action we have just received - acting: while(true) + // acted on the action we have just received + try(SpinLock.Lock lock = _lock.lock()) { - // action handling needs to know the state - State state=_state.get(); - - switch (state) + switch (_state) { case PROCESSING: { @@ -256,67 +250,56 @@ public abstract class IteratingCallback implements Callback { case IDLE: { - // lock the state - if (!_state.compareAndSet(state,State.LOCKED)) - continue acting; - // Has iterate been called while we were processing? if (_iterate) { // yes, so skip idle and keep processing _iterate=false; - _state.set(State.PROCESSING); + _state=State.PROCESSING; continue processing; } // No, so we can go idle - _state.set(State.IDLE); + _state=State.IDLE; break processing; } - + case SCHEDULED: - { - if (!_state.compareAndSet(state, State.PENDING)) - continue acting; + { // we won the race against the callback, so the callback has to process and we can break processing + _state=State.PENDING; break processing; } - + case SUCCEEDED: { - if (!_state.compareAndSet(state, State.LOCKED)) - continue acting; + // we lost the race against the callback, _iterate=false; - _state.set(State.SUCCEEDED); - onCompleteSuccess(); + _state=State.SUCCEEDED; + on_complete_success=true; break processing; } default: - throw new IllegalStateException("state="+state+" action="+action); + throw new IllegalStateException("state="+_state+" action="+action); } } - + case CALLED: { switch (action) { case SCHEDULED: - { - if (!_state.compareAndSet(state, State.PROCESSING)) - continue acting; + { // we lost the race, so we have to keep processing + _state=State.PROCESSING; continue processing; } default: - throw new IllegalStateException("state="+state+" action="+action); + throw new IllegalStateException("state="+_state+" action="+action); } } - - case LOCKED: - Thread.yield(); - continue acting; case SUCCEEDED: case FAILED: @@ -326,12 +309,15 @@ public abstract class IteratingCallback implements Callback case IDLE: case PENDING: default: - throw new IllegalStateException("state="+state+" action="+action); + throw new IllegalStateException("state="+_state+" action="+action); } } } + + if (on_complete_success) + onCompleteSuccess(); } - + /** * Invoked when the sub task succeeds. * Subclasses that override this method must always remember to call @@ -340,41 +326,36 @@ public abstract class IteratingCallback implements Callback @Override public void succeeded() { - loop: while (true) + boolean process=false; + try(SpinLock.Lock lock = _lock.lock()) { - State state = _state.get(); - switch (state) + switch (_state) { case PROCESSING: { - if (!_state.compareAndSet(state, State.CALLED)) - continue loop; - break loop; + _state=State.CALLED; + break; } case PENDING: { - if (!_state.compareAndSet(state, State.PROCESSING)) - continue loop; - processing(); - break loop; + _state=State.PROCESSING; + process=true; + break; } case CLOSED: case FAILED: { // Too late! - break loop; - } - case LOCKED: - { - Thread.yield(); - continue loop; - } + break; + } default: { - throw new IllegalStateException("state="+state); + throw new IllegalStateException("state="+_state); } } } + if (process) + processing(); } /** @@ -385,73 +366,59 @@ public abstract class IteratingCallback implements Callback @Override public void failed(Throwable x) { - loop: while (true) + boolean failure=false; + try(SpinLock.Lock lock = _lock.lock()) { - State state = _state.get(); - switch (state) + switch (_state) { case SUCCEEDED: case FAILED: case IDLE: case CLOSED: case CALLED: - { // too late!. - break loop; - } - case LOCKED: - { - Thread.yield(); - continue loop; - } + break; + case PENDING: case PROCESSING: { - if (!_state.compareAndSet(state, State.FAILED)) - continue loop; - - onCompleteFailure(x); - break loop; + _state=State.FAILED; + failure=true; + break; } default: - throw new IllegalStateException("state="+state); + throw new IllegalStateException("state="+_state); } } + if (failure) + onCompleteFailure(x); } public void close() { - loop: while (true) + boolean failure=false; + try(SpinLock.Lock lock = _lock.lock()) { - State state = _state.get(); - switch (state) + switch (_state) { case IDLE: case SUCCEEDED: case FAILED: - { - if (!_state.compareAndSet(state, State.CLOSED)) - continue loop; - break loop; - } + case PROCESSING: + _state=State.CLOSED; + break; + case CLOSED: - { - break loop; - } - case LOCKED: - { - Thread.yield(); - continue loop; - } + break; + default: - { - if (!_state.compareAndSet(state, State.CLOSED)) - continue loop; - onCompleteFailure(new ClosedChannelException()); - break loop; - } + _state=State.CLOSED; + failure=true; } } + + if(failure) + onCompleteFailure(new ClosedChannelException()); } /* @@ -460,12 +427,18 @@ public abstract class IteratingCallback implements Callback */ boolean isIdle() { - return _state.get() == State.IDLE; + try(SpinLock.Lock lock = _lock.lock()) + { + return _state == State.IDLE; + } } public boolean isClosed() { - return _state.get() == State.CLOSED; + try(SpinLock.Lock lock = _lock.lock()) + { + return _state == State.CLOSED; + } } /** @@ -473,7 +446,10 @@ public abstract class IteratingCallback implements Callback */ public boolean isFailed() { - return _state.get() == State.FAILED; + try(SpinLock.Lock lock = _lock.lock()) + { + return _state == State.FAILED; + } } /** @@ -481,7 +457,10 @@ public abstract class IteratingCallback implements Callback */ public boolean isSucceeded() { - return _state.get() == State.SUCCEEDED; + try(SpinLock.Lock lock = _lock.lock()) + { + return _state == State.SUCCEEDED; + } } /** @@ -494,31 +473,18 @@ public abstract class IteratingCallback implements Callback */ public boolean reset() { - while (true) + try(SpinLock.Lock lock = _lock.lock()) { - State state=_state.get(); - switch(state) + switch(_state) { case IDLE: return true; - - case SUCCEEDED: - if (!_state.compareAndSet(state, State.LOCKED)) - continue; - _iterate=false; - _state.set(State.IDLE); - return true; - - case FAILED: - if (!_state.compareAndSet(state, State.LOCKED)) - continue; - _iterate=false; - _state.set(State.IDLE); - return true; - case LOCKED: - Thread.yield(); - continue; + case SUCCEEDED: + case FAILED: + _iterate=false; + _state=State.IDLE; + return true; default: return false;